初识Netty

本篇简单介绍一下Netty,使用Netty简单做一个实例程序。

概述

官方的介绍:
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

创建Maven项目

添加依赖

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.63.Final</version>
</dependency>

创建服务端

创建服务端启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class MyServer {
public static void main(String[] args) throws InterruptedException {
// 创建服务端所需的两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建服务端启动对象并设置所需参数
ServerBootstrap bootstrap = new ServerBootstrap();
// 设置两个线程组,bossGroup和workerGroup
bootstrap.group(bossGroup, workerGroup)
// 设置服务端通道实现类型
.channel(NioServerSocketChannel.class)
// 设置线程队列链接数
.option(ChannelOption.SO_BACKLOG,128)
// 这是保持活动链接状态
.childOption(ChannelOption.SO_KEEPALIVE,true)
// 使用匿名内部类初始化通道对象
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
// 设置自定义信息处理器
socketChannel.pipeline().addLast(new MyServerHandler());
}
});
System.out.println("----Netty服务端准备就绪---");
// 绑定端口号并启动
ChannelFuture channelFuture = bootstrap.bind(5555).sync();
// 对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

创建服务端信息处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class MyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 收到消息处理
* @param ctx 通道上下文
* @param msg 消息
* @throws Exception 异常抛出
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取客户端发送过来的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务端收到客户端" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 消息处理结束方法,相应给客户端
* @param ctx 通道上下文
* @throws Exception 异常抛出
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//发送消息给客户端
ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到消息", CharsetUtil.UTF_8));
}
/**
* 事件出发
* @param ctx 上下文
* @param evt 事件对象
* @throws Exception 异常抛出
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent event=(IdleStateEvent) evt;
System.out.println(event.state());
}
super.userEventTriggered(ctx, evt);
}
/**
* 出现异常后的处理
* @param ctx 上下文
* @param cause 错误信息
* @throws Exception 异常抛出
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 回收通道上下文
ctx.close();
}
}

创建客户端

创建客户端启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class MyClient {
public static void main(String[] args) throws InterruptedException {
// 创建所需线程组
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try{
// 创建bootstrap对象
Bootstrap bootstrap = new Bootstrap();
// 设置线程组
bootstrap.group(eventExecutors)
// 客户端消息实现类
.channel(NioSocketChannel.class)
// 构建消息处理器
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
// 设置自定义消息处理器
socketChannel.pipeline().addLast(new MyClientHandler());
}
});
System.out.println("=====客户端准备就绪=====");
// 链接服务端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",5555).sync();
// 监听通道关闭
channelFuture.channel().closeFuture().sync();
}finally {
// 关闭线程组
eventExecutors.shutdownGracefully();
}
}
}

创建客户端消息处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class MyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 渠道准备就绪后触发--自动发送一条消息
* @param ctx 上下文
* @throws Exception 异常抛出
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("这里是来自客户端的消息", CharsetUtil.UTF_8));
}
/**
* 收到消息后的处理
* @param ctx 上下文
* @param msg 消息对象
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("客户端收到服务端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
}

Bootstrap启动流程图

Netty的特性与重要组件

TaskQueue任务队列

如果Handler处理器有长时间的业务处理,可以使用TaskQueue异步处理。

测试 & 结果

  1. MyServer启动后,启动MyServer的控制台会输出----Netty服务端准备就绪---
  2. MyClient启动后
    1. MyClient控制台打印=====客户端准备就绪=====
    2. MyServer控制台打印服务端收到客户端/127.0.0.1:53000发送的消息:这里是来自客户端的消息 (由于客户端通道就绪后自动发送消息,故服务端收到了客户端的消息)
    3. MyClient控制台打印客户端收到服务端/127.0.0.1:5555的消息:服务端已收到消息 (此消息为服务端收到消息之后的响应)

附录