Netty 原理解析
“Netty是由 JBOSS 提供的一个 java开源 框架。Netty提供异步的、 事件驱动 的网络 应用程序 框架和工具,用以快速开发高性能、高可靠性的 网络服务器 和 客户端 程序。 也就是说,Netty 是一个基于NIO的客户,服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户, 服务端 应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。 “快 速”和“简单”并不意味着会让你的最终应用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实现经验,这些协议包括FTP,SMTP,HTTP,各种二进制,文本协议,并经过相当精心设计的项目,最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性 。“
-----来自百度百科
Netty 简介:
Netty 是对 Java NIO (non blocking io) 的进一步封装和优化,它使用了单线程Event Loop 的模式来避免创建新线程和加锁的开销, 利用了Java NIO 提供的 zero copy buffer 技术来提高cpu 的利用率。当然Java NIO 本身就是对传统IO (oio , old fashion io , blocking io) 技术的一种优化, Java NIO 是利用对IO 多路复用 (epoll 或事 kqueue)从而提高IO的处理效率。
epoll 或者 kqueue 的原理是什么?
最近一段时间我一直在研读它的代码,发现它不仅性能非常出色,而且架构设计和代码的质量非常的优秀。每个类设计的都恰到好处,每个类都简短易读,功能易用,几行代码就可以实现很多协议的高性能服务器,使得定制变得非常简单。 业界有相当多的优秀框架使用它来做通信的模块:ElasticSearch, gprc, spark, thrift, Duddo , AsyncHttpClient ...我个人觉得Netty 这个框架真正意义上做到了高性能, 易扩展, 可插拔。
Netty 原理:
Netty 主要有三大类: 1 Channel, 2 ChannelPipeline, 3 EventLoop
1 Channel
/**
* A nexus to a network socket or a component which is capable of I/O
* operations such as read, write, connect, and bind.
* A channel provides a user:
<li>the current state of the channel (e.g. is it open? is it connected?),</li>
* <li>the {@linkplain ChannelConfig configuration parameters} of the channel (e.g. receive buffer size),</li>
* <li>the I/O operations that the channel supports (e.g. read, write, connect, and bind), and</li>
* <li>the {@link ChannelPipeline} which handles all I/O events and requests
* associated with the channel.</li>
* </ul>
Channel 等同于Nio 里面的channel, 但是多了很多其他的特性, 例如可以链式监听channel 里面数据的读取和写入。 每个channel 都会与一个EventLoop 相关联。
NioServerSocketChannel 就是对Java 中ServerSocketChannel 的一种封装, 主要用于接受发来的socket请求,然后把发来的请求变成 NioSocketChannel 转发到Worker EventloopGroup 里面去处理
2 ChannelPipeline
每个Channel 都会有一个ChannelPipeline 用来把回调函数按顺序链起来。
这个Pipeline 其实是一个double linkedlist 也就是双向链表。里面的addlast, addfirst 会把handler 加到这个链表的的最后面 或是最前面。 当有读写监听的事件发生的时候,这个类会从头部开始一个接着一个接着回调这些监听事件的函数。
// add handler to the head
@Override
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
name = filterName(name, handler);
newCtx = newContext(group, name, handler);
addFirst0(newCtx);
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
return this;
callHandlerAdded0(newCtx);
return this;
private void addFirst0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext nextCtx = head.next;
newCtx.prev = head;
newCtx.next = nextCtx;
head.next = newCtx;
nextCtx.prev = newCtx;
// 上面的代码会把handler 加入到链表的末尾。
// 当已经 read 一段数据成功后 开始回调head 的函数
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
//然后执行 channelread
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
} else {
fireChannelRead(msg);
// 然后找到下一个可以读的 handler 也就是inbound handler 继续回调
// 直到结束
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
3 EventLoop 和 EventLoopGroup.
EventLoop 和 EventLoopGroup 应该是整个netty 的精髓。
EventLoopGroup是一个含有多个EventLoop 的集合。每个EventLoop 是一个无限循环的单线程, 它有一个任务对列,会一直无限循环的从任务对列里面取任务执行。
我们构建server主要用的是NioEventLoopGroup, NioEventLoop。 NioEventLoop 在原有的功能基础加上Java NIO Selector 选举Channel 的功能,每个NioEventLoop都有一个Selector, NioEventLoop 会首先处理所有selector keys,也就是发来的socket请求。 如果没有请求进来,然后会去任务对列里面的任务。
public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
// fall through
default:
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
} else {
.......
}
由于每个EventLoop 都是单线程的,从而避免加锁的开销。 Java NIO(Non Blocking IO) 中Selector 模式也是一个单线程对IO 的多重复用,所以并不会阻塞这个EventLoop。 EventLoopGroup 里面多个EventLoop同时执行从而达到并行的效果。但是需要注意的就是在提交任务到这个任务对列的时候或事 加入 channel 的回调函数时候,一定要避免blocking IO 的发生, 如果有需要IO的请求尽可能使用Netty客户端去发NIO ,而不是直接用传统的IO, 要么就需要另外开一个线程去执行。这种特点也导致了在使用Netty 的时候会有很多坑,有可能一不小心阻塞了整个EventLoop
如果想了解更多关于EventLoop 可以参考这篇文章
Netty分享之EventLoop
下面我们从一个例子来讲述,Netty是如何构建NIO Server的,以及里面具体流程是什么
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
p.addLast(new DiscardServerHandler());
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
从上面的例子来看,创建服务器应用的时候需要两个NioEventLoopGroup 一个是boss group, 一个是worker group. Boss group 主要是用于接受ServerSocketChannel 来的io 请求, 然后再把请求具体执行的回调函数转交给worker group 去执行。Netty 可以同时创建出多个服务应用, 不同的服务应用可以去不同的port 监听。如果只有一个服务应用的话, 最好设置boss group 里面的thread 个数为1, 这样就可以使得性能达到最好的效果。
上面的代码会执行以下的流程:
- ServerBootstrap 会利用channelfactory 把NioServerSocketChannel 给利用反射的方式创建出来.
- 然后在对channel 初始化的时候,会在NioServerSocketChannel 的channelPipeline里面 加入了一个ServerBootstrapAcceptor 的回调函数。
- 然后把NioServerSocketChannel和Boss NioEventLoopGroup 中一个NioEventLoop 绑定起来. 而且每个NioEventLoop 也会有一个Java NIO selector, NioEventLoop 会把NioServerSocketChannel 注册到这个selector 里面,这个selector 会利用epoll 或事 kqueue 进行对IO 的多重复用
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
// 注册channel 到boss group 里面
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
@Override
void init(Channel channel) throws Exception {
.......
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
.....
加入 ServerBootstrapAcceptor 回调函数
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
最终会注册到eventloop 里面的selector 里面
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
成功注册之后, 当有请求来的时候, 在Channelpipeline上 里面ServerBootstrapAcceptor 的回调函数会被调用。这个acceptor 会把接受到请求传给children group 也就是worker group
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
private final Runnable enableAutoReadTask;
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// NioSocketChannel 每一个socket 的连接
final Channel child = (Channel) msg;
// childhandler 是我们自己定义的处理每个请求的回调函数
// 会在这里被加入到NioSocketChannel 的channelpipeline 里面
child.pipeline().addLast(childHandler);
try {
//把接受到的socketchannel 注册到childgroup 里面
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
} catch (Throwable t) {
forceClose(child, t);
private static void forceClose(Channel child, Throwable t) {