Netty4.x用户指导(1)3个HelloWorld小例子

简介:

最近对netty有了兴趣,现在官方推荐版本是netty4.*,但是纵观网络,大部分都是关于netty3.x的知识。

最好的学习,莫过于通过官方文档进行学习,系统,透彻,权威,缺点是英文。本文,算做自己学习netty的第一篇,总体思路与User guide for 4.x基本一致,本篇文章不是严格意义的翻译文章。开始了...

1.前言

1.1 问题

现 在,我们使用通用的应用程序和程序库,进行互相交流。例如,我们经常使用HTTP client库从web服务器上获取信息,通过web services调用远程接口。然而,通用的协议或实现,有时候不能很好的扩展伸缩。就像我们不能使用通用协议进行部分信息的交换,如:huge files,e-mail,实时信息。我们需要高度优化的协议实现,来完成一些特殊目的。比如,你想实现一款专门的HTTP服务器,以支持基于AJAX的聊天应用,流媒体,大文件传输。你需要设计和实现一个完整的新协议,不可避免需要处理遗留协议,在不影响性能和稳定的情况下,实现新协议速度能有多块?

1.2 解决方案

Netty 致力于提供异步,基于事件驱动的网络应用框架,是一款进行快速开发高性能,可伸缩的协议服务器和客户端工具。换句话说,Netty是一个快速和容易开发的NIO客户端,服务端网络框架,它简化和使用流式方式处理网络编程,如TCP,UDP。

"快速和容易“,并不代表netty存在可维护性和性能问题。它很完美的提供FTP,SMTP,HTTP,二进制和基于文本的协议支持。有的用户可能已经发现了拥有同样有点的其他网络应用框架。你可能想问:netty和他们的区别?这个问题不好回答,Netty被设计成提供最合适的API和实现,会让你的生活更简单。

2.开始

本节使用一些简单的例子让你快速的感知netty的核心结构。阅读本节之后,你可以熟练掌握netty,可以写一个client和server。

准备

JDK 1.6+

最新版本的netty:下载地址

maven依赖

1
< span  style = "background-color:#efefef;" >< dependency >< br >    < groupId >io.netty</ groupId >< br >    < artifactId >netty-all</ artifactId >< br >    < version >${netty.version}</ version >< br ></ dependency >< br ></ span >


2.1写一个Discard服务

最简单的协议,不是'hello world',而是DISCARD。这个协议抛弃接收的数据,没有响应。

协议实现,唯一需要做的一件事就是无视所有接收到的数据,让我们开始吧。

直接上Handler的实现,它处理来自netty的I/O事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package  io.netty.examples.discard;
import  io.netty.buffer.ByteBuf;
import  io.netty.channel.ChannelHandlerContext;
import  io.netty.channel.ChannelInboundHandlerAdapter;
import  io.netty.util.ReferenceCountUtil;
/**
  * Handles a server-side channel.
  */
public  class  DiscardServerHandler  extends  ChannelInboundHandlerAdapter {  // (1)
 
     @Override
     public  void  channelRead(ChannelHandlerContext ctx, Object msg) {  // (2)
         // Discard the received data silently.
         ((ByteBuf) msg).release();  // (3)
         /*
         //也可以这样
          try {        // Do something with msg
               } finally {
                       ReferenceCountUtil.release(msg);
             }        
         */
         
     }
 
 
     @Override
     public  void  exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  // (4)
         // Close the connection when an exception is raised.
         cause.printStackTrace();
         ctx.close();
     }
}


1)DiscardServerHandler继承了ChannelInboundHandlerAdapter

实现了ChannelInboundHandler接口。ChannelInboundHandler提供了多个可重写的事件处理方法。现在,继承了ChannelInboundHandlerAdapter,而不用自己实现handler接口。

2)重写了channelRead(),这个方法在接收到新信息时被调用。信息类型是ByteBuf

3)实现Discard协议,ByteBuf是reference-counted对象,必须显示的调用release(),进行释放。需要记住handler的责任包括释放任意的reference-counted对象。

4)exceptionCaught(),当发生异常时调用,I/O异常或Handler处理异常。一般情况下,在这里可以记录异常日志,和返回错误码。

到了这里,完成了一半,接下来编写main(),启动Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package  io.netty.examples.discard;
 
import  io.netty.bootstrap.ServerBootstrap;
 
import  io.netty.channel.ChannelFuture;
import  io.netty.channel.ChannelInitializer;
import  io.netty.channel.ChannelOption;
import  io.netty.channel.EventLoopGroup;
import  io.netty.channel.nio.NioEventLoopGroup;
import  io.netty.channel.socket.SocketChannel;
import  io.netty.channel.socket.nio.NioServerSocketChannel;
import  io.netty.examples.discard.time.TimeServerHandler;
 
/**
  * Discards any incoming data.
  */
public  class  DiscardServer {
 
     private  int  port;
 
     public  DiscardServer( int  port) {
         this .port = port;
     }
 
     public  void  run()  throws  Exception {
         EventLoopGroup bossGroup =  new  NioEventLoopGroup();  // (1)
         EventLoopGroup workerGroup =  new  NioEventLoopGroup();
         try  {
             ServerBootstrap b =  new  ServerBootstrap();  // (2)
             b.group(bossGroup, workerGroup)
              .channel(NioServerSocketChannel. class // (3)
              .childHandler( new  ChannelInitializer<SocketChannel>() {  // (4)
                  @Override
                  public  void  initChannel(SocketChannel ch)  throws  Exception {
                      ch.pipeline().addLast( new  TimeServerHandler());
                  }
              })
              .option(ChannelOption.SO_BACKLOG,  128 )           // (5)
              .childOption(ChannelOption.SO_KEEPALIVE,  true );  // (6)
 
             // Bind and start to accept incoming connections.
             ChannelFuture f = b.bind(port).sync();  // (7)
 
             // Wait until the server socket is closed.
             // In this example, this does not happen, but you can do that to gracefully
             // shut down your server.
             f.channel().closeFuture().sync();
         finally  {
             workerGroup.shutdownGracefully();
             bossGroup.shutdownGracefully();
         }
     }
 
     public  static  void  main(String[] args)  throws  Exception {
         int  port;
         if  (args.length >  0 ) {
             port = Integer.parseInt(args[ 0 ]);
         else  {
             port =  8080 ;
         }
         new  DiscardServer(port).run();
     }
}
  1. NioEventLoopGroup 是一个多线程的处理I/O操作Event Loop(这是异步机制特点) Netty 提供了多个 EventLoopGroup,支持多种传输方式。在这个例子中,我们使用了2个NioEventLoopGroup


    第一个,叫"boss",接收进入的连接,第二个经常叫“worker",一旦boss接收了连接并注册连接到这个worker,worker就会处理这个连接。多少个线程被使用?,EventLoopGroup拥有多少个Channel?都可以通过构造函数配置

  2. ServerBootstrap是建立server的帮助类。你可以使用Channel直接创建server,注意,这个创建过程非常的冗长,大部分情况,你不需要直接创建。

  3. 指定 NioServerSocketChannel类,当接收新接入连接时,被用在Channel的初始化。

  4. 这个handler,初始化Channel时调用。ChannelInitializer是一个专门用于配置新的Channel的Handler,一般为Channel配置ChannelPipeline。当业务复杂时,会添加更多的handler到pipeline.

  5. 你可以设置影响Channel实现的参数,比如keepAlive..

  6. option()  影响的是接收进入的连接 的NioServerSocketChannel ;childOption()影响的是来自父 ServerChannel分发的Channel, 在本例中是 NioServerSocketChannel 

  7. 保证工作准备好了


通过telnet进行测试。

输入telnet 127.0.0.1 8080

wKioL1dhJzahlRuaAAAJeL7rMjk054.png

由于是Discard协议,一没响应,二服务端也没输出。通过测试,只能确认是服务正常启动了。

调整下Handler逻辑,修改channelRead()方法。

1
2
3
4
5
6
7
8
9
10
11
@Overridepublic  void  channelRead(ChannelHandlerContext ctx, Object msg) {
     ByteBuf in = (ByteBuf) msg;
         try  {
            while  (in.isReadable()) {  // (1)
              System.out.print(( char ) in.readByte());
             System.out.flush();
              }
     finally  {
             ReferenceCountUtil.release(msg);  // (2)
     }
}

修改之后,在使用telnet测试。你在命令行中输入什么,在服务端就能看到什么!(只能是英文)


2.2写一个Echo服务


作为一个服务,没有返回响应信息,明显是不合格的。接下来实现Echo协议,返回响应信息。与前面的Discard服务实现唯一的不同点就是Handler,需要回写接收到的信息,而不是打印输出。

1
2
3
4
5
     @Override
     public  void  channelRead(ChannelHandlerContext ctx, Object msg) {
         ctx.write(msg);  // (1)
         ctx.flush();  // (2)
     }

1)ChannelHandlerContext提供多个方法,触发I/O事件。在这里,不需要像Discard一样释放 接收的信息。

2)ctx.write(Object)不能保证完全写入,底层存在缓存,需要通过ctx.flush()刷新,保证完全写入。


完成。

2.3写一个Time服务

时间协议见 TIME protocol。和前面2个协议不同,服务将发送一个32位的integer值。不需要接收信息,一旦信息发出,将关闭连接。在这个例子中,将学到如何构造和发送信息,并在完成时关闭连接。

由于我们不需要接收信息,所以不需要channelRead()方法,而是使用channelActive()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package  io.netty.example.time; public  class  TimeServerHandler  extends  ChannelInboundHandlerAdapter {     @Override
     public  void  channelActive( final  ChannelHandlerContext ctx) {  // (1)
         final  ByteBuf time = ctx.alloc().buffer( 4 );  // (2)
         time.writeInt(( int ) (System.currentTimeMillis() / 1000L + 2208988800L));         final  ChannelFuture f = ctx.writeAndFlush(time);  // (3)
         f.addListener( new  ChannelFutureListener() {             @Override
             public  void  operationComplete(ChannelFuture future) {                 assert  f == future;
                 ctx.close();
             }
         });  // (4)
     }     @Override
     public  void  exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
         cause.printStackTrace();
         ctx.close();
     }
}

1)channelActive()在连接建立和准备完成时调用。

2) 发送一个新消息,需要分配一个buff.将来写入一个4位的integer值。获取当前的 ByteBufAllocator,可以通过ChannelHandlerContext.alloc()获取。

3)注意没有java.nio.ByteBuffer.flip(),(nio 读写切换时使用),这是因为netty重写了Buffer,维护了2个指示器,分别用来(reader index)读取和( writer index )写入。

注意:ChannelHandlerContext.write() (and writeAndFlush())返回的ChannelFuture 。ChannelFuture 代表一个尚未发生的I/O操作。这一个是异步操作,会触发通知listeners 。这儿的意思是当ChannelFuture完成时,才会调用close()。


最终版的Server Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package  io.netty.examples.time;
 
import  io.netty.buffer.ByteBuf;
import  io.netty.channel.ChannelFuture;
import  io.netty.channel.ChannelFutureListener;
import  io.netty.channel.ChannelHandlerContext;
import  io.netty.channel.ChannelInboundHandlerAdapter;
 
public  class  TimeServerHandler  extends  ChannelInboundHandlerAdapter {
 
     @Override
     public  void  channelActive( final  ChannelHandlerContext ctx) {  // (1)
         final  ByteBuf time = ctx.alloc().buffer( 4 );  // (2)
         time.writeInt(( int ) (System.currentTimeMillis() / 1000L + 2208988800L));
 
         final  ChannelFuture f = ctx.writeAndFlush(time);  // (3)
         f.addListener( new  ChannelFutureListener() {
             @Override
             public  void  operationComplete(ChannelFuture future) {
                 assert  f == future;
                 ctx.close();
             }
         });  // (4)
     }
 
     @Override
     public  void  exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
         cause.printStackTrace();
         ctx.close();
     }
}

接下来实现Time客户端

与Echo和Discard协议不同,人不能很好的将32位的integer,转换为可以理解的日期数据。通过学习本节,可以学习如何写一个Client。这和前面的EchoServer,DiscardServer实现有很大的不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package  io.netty.examples.time;
 
import  io.netty.bootstrap.Bootstrap;
import  io.netty.channel.ChannelFuture;
import  io.netty.channel.ChannelInitializer;
import  io.netty.channel.ChannelOption;
import  io.netty.channel.EventLoopGroup;
import  io.netty.channel.nio.NioEventLoopGroup;
import  io.netty.channel.socket.SocketChannel;
import  io.netty.channel.socket.nio.NioSocketChannel;
 
public  class  TimeClient {
     public  static  void  main(String[] args)  throws  Exception {
         String host =  "127.0.0.1" ;
         int  port = Integer.parseInt( "8080" );
         EventLoopGroup workerGroup =  new  NioEventLoopGroup();
 
         try  {
             Bootstrap b =  new  Bootstrap();  // (1)
             b.group(workerGroup);  // (2)
             b.channel(NioSocketChannel. class );  // (3)
             b.option(ChannelOption.SO_KEEPALIVE,  true );  // (4)
             b.handler( new  ChannelInitializer<SocketChannel>() {
                 @Override
                 public  void  initChannel(SocketChannel ch)  throws  Exception {
                     ch.pipeline().addLast( new  TimeClientHandler());
                 }
             });
 
             // Start the client.
             ChannelFuture f = b.connect(host, port).sync();  // (5)
 
             // Wait until the connection is closed.
             f.channel().closeFuture().sync();
         finally  {
             workerGroup.shutdownGracefully();
         }
     }
}
  1. Bootstrap is similar to ServerBootstrap except that it's for non-server channels such as a client-side or connectionless channel.

  2. 如果只使用一个 EventLoopGroup,它会被当成boss group 和 worker group.  boss worker不会再client使用。

  3. 代替NioServerSocketChannelNioSocketChannel 是一个 客户端的Channel.

  4. 注意没有使用 childOption() ,因为客户端没有上级。

  5. 调用connect(),而不是 bind()

下面是对应的Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package  io.netty.examples.time;
 
import  io.netty.buffer.ByteBuf;
import  io.netty.channel.ChannelHandlerContext;
import  io.netty.channel.ChannelInboundHandlerAdapter;
 
import  java.util.Date;
 
public  class  TimeClientHandler  extends  ChannelInboundHandlerAdapter {
     @Override
     public  void  channelRead(ChannelHandlerContext ctx, Object msg) {
         ByteBuf m = (ByteBuf) msg;  // (1)
         try  {
             long  currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
             ctx.close();
         finally  {
             m.release();
         }
     }
 
     @Override
     public  void  exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
         cause.printStackTrace();
         ctx.close();
     }
}


  1. In TCP/IP, Netty reads the data sent from a peer into a ByteBuf.

测试过程:略。




本文转自 randy_shandong 51CTO博客,原文链接:http://blog.51cto.com/dba10g/1789639,如需转载请自行联系原作者

相关文章
|
7月前
|
监控 Java Linux
由浅入深Netty基础知识NIO网络编程1
由浅入深Netty基础知识NIO网络编程
38 0
|
7月前
|
缓存 安全 Java
由浅入深Netty基础知识NIO三大组件原理实战 2
由浅入深Netty基础知识NIO三大组件原理实战
44 0
|
7月前
|
Java
由浅入深Netty基础知识NIO三大组件原理实战 1
由浅入深Netty基础知识NIO三大组件原理实战
58 0
|
2月前
|
移动开发 编解码 网络协议
用Java的BIO和NIO、Netty来实现HTTP服务器(三) 用Netty实现
用Java的BIO和NIO、Netty来实现HTTP服务器(三) 用Netty实现
|
2月前
|
编解码 网络协议 Java
用Java的BIO和NIO、Netty实现HTTP服务器(一) BIO与绪论
用Java的BIO和NIO、Netty实现HTTP服务器(一) BIO与绪论
|
7月前
|
存储 Java Docker
由浅入深Netty基础知识NIO网络编程 2
由浅入深Netty基础知识NIO网络编程
44 0
|
7月前
|
Java Maven Spring
使用netty实现nio web服务器
使用netty实现nio web服务器
59 0
|
3月前
|
设计模式 网络协议 Java
Java NIO 网络编程 | Netty前期知识(二)
Java NIO 网络编程 | Netty前期知识(二)
73 0
|
4月前
|
编解码 网络协议
Netty基础篇:NIO中缓冲区设置太小
Netty基础篇:NIO中缓冲区设置太小
|
4月前
|
存储 缓存 监控
Netty基础篇:详解Netty底层NIO
Netty基础篇:详解Netty底层NIO