之前 用Java NIO简单地实现了网络IO通信。 Netty在Java NIO的基础上更进了一步,为应用开发人员提供了基于Reactor模型开发的网络框架。 以下分析参考了《Netty权威指南》中源码分析、《Netty实战》和自己调试(4.1.90.Final)得出的。
这是一个简单的Netty服务端+同进程客户端:
public class NettyInAction {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = null;
EventLoopGroup workerGroup = null;
EventLoopGroup handlerGroup = null;
try {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
handlerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup finalHandlerGroup = handlerGroup;
serverBootstrap
.group(bossGroup, workerGroup) // ①
.channel(NioServerSocketChannel.class) // ②
.option(ChannelOption.SO_BACKLOG, 1024) // ③
.childHandler(new ChannelInitializer<NioSocketChannel>() { // ④
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new MyHandler(finalHandlerGroup));
}
});
ChannelFuture f = serverBootstrap.bind(8080).sync();
connect();
f.channel().closeFuture().sync();
} finally {
if (bossGroup != null) {
Future<?> bossFuture = bossGroup.shutdownGracefully();
bossFuture.syncUninterruptibly();
}
if (workerGroup != null) {
Future<?> workerFuture = workerGroup.shutdownGracefully();
workerFuture.syncUninterruptibly();
}
if (handlerGroup != null) {
Future<?> handlerFuture = handlerGroup.shutdownGracefully();
handlerFuture.syncUninterruptibly();
}
}
}
private static class MyHandler extends ChannelInboundHandlerAdapter {
private EventLoopGroup group;
public MyHandler(EventLoopGroup group) {
this.group = group;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
String m = buf.toString(CharsetUtil.UTF_8);
group.execute(() -> System.out.println(m)); // 阻塞操作用另外的线程执行
ctx.write(buf);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
}
public static void connect() throws Exception {
EventLoopGroup group = null;
FileChannel fileChannel = null;
try {
fileChannel = FileChannel
.open(Paths.get(NettyInAction.class.getClassLoader().getResource("Hello").toURI()), StandardOpenOption.READ);
group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup finalGroup = group;
FileChannel finalFileChannel = fileChannel;
bootstrap
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
FileRegion fileRegion = new DefaultFileRegion(finalFileChannel, 0, finalFileChannel.size());
ctx.writeAndFlush(fileRegion);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
String m = buf.toString(CharsetUtil.UTF_8);
finalGroup.execute(() -> System.out.println(m));
// 即使这个handler是逻辑上的最后一个,也建议把msg传递给tail。涉及到msg的内存管理,如果没传递给下一个handler(ctx.writeAndFlush需最终传递给head,ctx.fireChannelRead需最终传递给tail),那么就需要在这里释放
ctx.fireChannelRead(msg);
}
});
ChannelFuture f = bootstrap.connect("127.0.0.1", 8080).sync();
f.channel().closeFuture().sync();
} finally {
if (group != null) {
Future<?> future = group.shutdownGracefully();
future.syncUninterruptibly();
}
if (fileChannel != null) {
fileChannel.close();
}
}
}
}
ServerBootstrap是Netty的启动辅助类,除去Builder构造ServerBootstrap(①~④)的步骤,执行的第一步是bind(int),调用链会涉及以下方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel(); // ⑤
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel); // ⑥
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
p.addLast(new ChannelInitializer<Channel>() { // ⑦
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler(); // ⑧
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
⑥从NioEventLoopGroup继承树中MultithreadEventLoopGroup的register(Channel)开始:
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
public EventLoop next() {
return (EventLoop) super.next();
}
public EventExecutor next() {
return this.chooser.next();
}
最终可以得出取得NioEventLoopGroup中的一个NioEventLoop执行注册,在NioEventLoopGroup的构造过程中,会执行newChild得到NioEventLoop
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
SelectorProvider selectorProvider = (SelectorProvider) args[0];
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;
int argsLength = args.length;
if (argsLength > 3) {
taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
因而,通过NioEventLoop(也即SingleThreadEventLoop)执行注册,接着会落到AbstractChannel的
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
EventLoop的execute
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}
try {
// Run all remaining tasks and shutdown hooks. At this point the event loop
// is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
// graceful shutdown with quietPeriod.
for (;;) {
if (confirmShutdown()) {
break;
}
}
// Now we want to make sure no more tasks can be added from this point. This is
// achieved by switching the state. Any new tasks beyond this point will be rejected.
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
break;
}
}
// We have the final set of tasks in the queue now, no more can be added, run all remaining.
// No need to loop here, this is the final pass.
confirmShutdown();
} finally {
try {
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown();
int numUserTasks = drainTasks();
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + numUserTasks + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
由ThreadPerTaskExecutor执行run(),激活这个loop。
由于注册时main线程不在loop中,则通过loop提交register0任务,这样对于一个Channel的操作就都在loop中了。接着doRegister
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
这里感兴趣的事件是0,基于以下考虑:
selectionKey修改interestOps的地方比较难找,幸运的是IntelliJ IDEA比较新的版本都支持 异步堆栈跟踪 。
完整的堆栈略,修改interestOps的操作位于AbstractNioMessageChannel:
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
super.doBeginRead();
}
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
从Channel的构造器可以看出,其interestOps为SelectionKey.OP_ACCEPT
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
③指定了socket排队的最大连接数,对于给定的socket,内核维护两个队列:未握手的和已握手的。accept从已握手队列头部取出。backlog为两队列长度和。
loop开始执行select(可以通过jstack观察到),在接收到OP_ACCEPT后,执行的核心逻辑位于NioServerSocketChannel父类AbstractNioMessageChannel的内部类NioMessageUnsafe
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i)); // ⑨
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
fireChannelRead时,ServerBootstrapAcceptor开始工作。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg; // ⑩
child.pipeline().addLast(childHandler); // ⑪
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
⑩强制转换从⑨获得的SocketChannel,在SocketChannel的pipeline中加入了④传递的childHandler,执行初始化。 初始化将用户自定义的MyHandler加入pipeline。 从NioSocketChannel的构造器来看,它感兴趣的是SelectionKey.OP_READ事件。
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
注册完成后,MyHandler等开始处理OP_READ事件。
另外,为什么④需要多此一举通过ChannelInitializer初始化时加入而不是在⑪直接加入MyHandler呢? 这是因为ChannelInitializer可以把多个Handler加入到pipeline中。
PPS:还有一些不太方便的地方是,从ServerBootstrap的构造来看,NioEventLoopGroup只会搭配NioServerSocketChannel和NioSocketChannel,在编辑器或IDE编写代码时需要开发人员凭背景知识来选择。 只有在启动时、第一个accept发生时才能发现错误。
public void connect() throws Exception {
EventLoopGroup group = null;
try {
group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new MyHandler());
}
});
ChannelFuture f = bootstrap.connect("127.0.0.1", 8080).sync();
f.channel().closeFuture().sync();
} finally {
if (group != null) {
group.shutdownGracefully();
}
}
}
总体和服务端建立连接差不多,不同点在于
void init(Channel channel) {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
}
无需设置child相关参数,即不会构造新的Channel。
bind方法相应地变为connect方法。connect时
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
connected有两种状态
在connected == false时,loop执行select得到SelectionKey.OP_CONNECT后,进行与connected == true一样的处理,方式与SelectionKey.OP_ACCEPT类似,将SelectionKey.OP_READ加入感兴趣的事件中。
public final void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
assert eventLoop().inEventLoop();
try {
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && active) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
与accept类似,在loop中,select到OP_READ后,NioByteUnsafe(AbstractNioByteChannel)
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// DON'T CHANGE
// Duplex handlers implements both out/in interfaces causing a scalability issue
// see https://bugs.openjdk.org/browse/JDK-8180450
final ChannelHandler handler = handler();
final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
if (handler == headContext) {
headContext.channelRead(this, msg);
} else if (handler instanceof ChannelDuplexHandler) {
((ChannelDuplexHandler) handler).channelRead(this, msg);
} else {
((ChannelInboundHandler) handler).channelRead(this, msg);
}
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelRead(msg);
}
}
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg);
}
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.next;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}
这里会根据INBOUND的方向在InBoundHandler之间传递msg,head context只是简单的将msg传递给下一个(invokeChannelRead执行当前handler,fireChannelRead传递给下一个)
write与read不太一样,通常是调用Context的write方法(或writeAndFlush)将msg传递,此时会跳过后面的的OutBoundHandler,在flush传递到head context时,核心的方法
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
// All written so clear OP_WRITE
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
// Ensure the pending writes are made of ByteBufs only.
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();
// Always use nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
writeSpinCount -= doWrite0(in);
break;
case 1: {
// Only one ByteBuf so use non-gathering write
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// We limit the max amount to int above so cast is safe
long attemptedBytes = in.nioBufferSize();
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
与read不一样的地方还在于,只有写入未完成时才将SelectionKey.OP_WRITE加入感兴趣的事件中。
protected final void setOpWrite() {
final SelectionKey key = selectionKey();
// Check first if the key is still valid as it may be canceled as part of the deregistration
// from the EventLoop
// See https://github.com/netty/netty/issues/2104
if (!key.isValid()) {
return;
}
final int interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}
在分析close前,看一下SocketChannel的close:
public final void close() throws IOException {
synchronized (closeLock) {
if (!open)
return;
open = false;
implCloseChannel();
}
}
protected final void implCloseChannel() throws IOException {
implCloseSelectableChannel();
synchronized (keyLock) {
int count = (keys == null) ? 0 : keys.length;
for (int i = 0; i < count; i++) {
SelectionKey k = keys[i];
if (k != null)
k.cancel();
}
}
}
// AbstractInterruptibleChannel synchronizes invocations of this method
// using AbstractInterruptibleChannel.closeLock, and also ensures that this
// method is only ever invoked once. Before we get to this method, isOpen
// (which is volatile) will have been set to false.
//
protected void implCloseSelectableChannel() throws IOException {
synchronized (stateLock) {
isInputOpen = false;
isOutputOpen = false;
// Close the underlying file descriptor and dup it to a known fd
// that's already closed. This prevents other operations on this
// channel from using the old fd, which might be recycled in the
// meantime and allocated to an entirely different channel.
//
if (state != ST_KILLED)
nd.preClose(fd);
// Signal native threads, if needed. If a target thread is not
// currently blocked in an I/O operation then no harm is done since
// the signal handler doesn't actually do anything.
//
if (readerThread != 0)
NativeThread.signal(readerThread);
if (writerThread != 0)
NativeThread.signal(writerThread);
// If this channel is not registered then it's safe to close the fd
// immediately since we know at this point that no thread is
// blocked in an I/O operation upon the channel and, since the
// channel is marked closed, no thread will start another such
// operation. If this channel is registered then we don't close
// the fd since it might be in use by a selector. In that case
// closing this channel caused its keys to be cancelled, so the
// last selector to deregister a key for this channel will invoke
// kill() to close the fd.
//
if (!isRegistered())
kill();
}
}
同时观察到有以下两个方法:
public abstract SocketChannel shutdownInput() throws IOException; // 半关闭
public abstract SocketChannel shutdownOutput() throws IOException;
而在Netty中,两种方式都会使用。
作为应用层开发人员,面对TCP这种流协议,首当其冲要面对的问题大概就是粘包、拆包问题了。
Netty提供了两个抽象基类,来帮助我们进行数据包处理:MessageToByteEncoder(略)和ByteToMessageDecoder。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
selfFiredChannelRead = true;
CodecOutputList out = CodecOutputList.newInstance();
try {
first = cumulation == null;
cumulation = cumulator.cumulate(ctx.alloc(),
first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
try {
cumulation.release();
} catch (IllegalReferenceCountException e) {
//noinspection ThrowFromFinallyBlock
throw new IllegalReferenceCountException(
getClass().getSimpleName() + "#decode() might have released its input buffer, " +
"or passed it down the pipeline without a retain() call, " +
"which is not allowed.", e);
}
cumulation = null;
} else if (++numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes, so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
firedChannelRead |= out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
} finally {
out.recycle();
}
}
} else {
ctx.fireChannelRead(msg);
}
}
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
if (cumulation == in) {
// when the in buffer is the same as the cumulation it is doubly retained, release it once
in.release();
return cumulation;
}
if (!cumulation.isReadable() && in.isContiguous()) {
// If cumulation is empty and input buffer is contiguous, use it directly
cumulation.release();
return in;
}
try {
final int required = in.readableBytes();
if (required > cumulation.maxWritableBytes() ||
required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1 ||
cumulation.isReadOnly()) {
// Expand cumulation (by replacing it) under the following conditions:
// - cumulation cannot be resized to accommodate the additional data
// - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
// assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
return expandCumulation(alloc, cumulation, in);
}
cumulation.writeBytes(in, in.readerIndex(), required);
in.readerIndex(in.writerIndex());
return cumulation;
} finally {
// We must release in all cases as otherwise it may produce a leak if writeBytes(...) throw
// for whatever release (for example because of OutOfMemoryError)
in.release();
}
}
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
final int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
}
int oldInputLength = in.readableBytes();
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (out.isEmpty()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
decode(ctx, in, out);
} finally {
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
fireChannelRead(ctx, out, out.size());
out.clear();
handlerRemoved(ctx);
}
}
}
处理模式与MapReduce有相似之处。FixedLengthFrameDecoder的decode实现
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < frameLength) {
return null;
} else {
return in.readRetainedSlice(frameLength);
}
}
A decoder that splits the received ByteBufs by the fixed number of bytes. For example, if you received the following four fragmented packets:
+---+----+------+----+
| A | BC | DEFG | HI |
+---+----+------+----+
A FixedLengthFrameDecoder(3) will decode them into the following three packets with the fixed length:
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+
由于ByteToMessageDecoder持有状态,所以必须不是@Sharable的。
同时,有方法
/**
* Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
* {@link #channelInactive(ChannelHandlerContext)} was triggered.
*
* By default, this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
* override this for some special cleanup operation.
*/
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.isReadable()) {
// Only call decode() if there is something left in the buffer to decode.
// See https://github.com/netty/netty/issues/4386
decodeRemovalReentryProtection(ctx, in, out);
}
}
除了一般的DirectByteBuf,我们还可以利用FileRegion实现sendfile零拷贝技术。 在flush时
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.isReadable()) {
in.remove();
return 0;
}
final int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (!buf.isReadable()) {
in.remove();
}
return 1;
}
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
if (region.transferred() >= region.count()) {
in.remove();
return 0;
}
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (region.transferred() >= region.count()) {
in.remove();
}
return 1;
}
} else {
// Should not reach here.
throw new Error();
}
return WRITE_STATUS_SNDBUF_FULL;
}
protected long doWriteFileRegion(FileRegion region) throws Exception {
final long position = region.transferred();
return region.transferTo(javaChannel(), position);
}
Channel识别到FileRegion,调用transferTo。
这里 粗略地研究了SSL协议,而Netty也提供了对SSL的支持——SslHandler。 在客户端连接成功后
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
setOpensslEngineSocketFd(ctx.channel());
if (!startTls) {
startHandshakeProcessing(true);
}
ctx.fireChannelActive();
}
private void startHandshakeProcessing(boolean flushAtEnd) {
if (!isStateSet(STATE_HANDSHAKE_STARTED)) {
setState(STATE_HANDSHAKE_STARTED);
if (engine.getUseClientMode()) {
// Begin the initial handshake.
// channelActive() event has been fired already, which means this.channelActive() will
// not be invoked. We have to initialize here instead.
handshake(flushAtEnd);
}
applyHandshakeTimeout();
} else if (isStateSet(STATE_NEEDS_FLUSH)) {
forceFlush(ctx);
}
}
private void handshake(boolean flushAtEnd) {
if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
// Not all SSLEngine implementations support calling beginHandshake multiple times while a handshake
// is in progress. See https://github.com/netty/netty/issues/4718.
return;
}
if (handshakePromise.isDone()) {
// If the handshake is done already lets just return directly as there is no need to trigger it again.
// This can happen if the handshake(...) was triggered before we called channelActive(...) by a
// flush() that was triggered by a ChannelFutureListener that was added to the ChannelFuture returned
// from the connect(...) method. In this case we will see the flush() happen before we had a chance to
// call fireChannelActive() on the pipeline.
return;
}
// Begin handshake.
final ChannelHandlerContext ctx = this.ctx;
try {
engine.beginHandshake();
wrapNonAppData(ctx, false);
} catch (Throwable e) {
setHandshakeFailure(ctx, e);
} finally {
if (flushAtEnd) {
forceFlush(ctx);
}
}
}
engine从SslHandler的构造犯法传入
public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
this.engine = ObjectUtil.checkNotNull(engine, "engine");
this.delegatedTaskExecutor = ObjectUtil.checkNotNull(delegatedTaskExecutor, "delegatedTaskExecutor");
engineType = SslEngineType.forEngine(engine);
this.startTls = startTls;
this.jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
setCumulator(engineType.cumulator);
}
而SSLEngine构造方式有
对于netty的构造方式,如未指定,优先使用SslProvider.OPENSSL(如有)构造OpenSslClientContext,从而构造OpenSslEngine。
private static SslProvider defaultProvider() {
if (OpenSsl.isAvailable()) {
return SslProvider.OPENSSL;
} else {
return SslProvider.JDK;
}
}
构造后的握手,由engine执行,关于engine的beginHandshake接口
/**
* Initiates handshaking (initial or renegotiation) on this SSLEngine.
* <P>
* This method is not needed for the initial handshake, as the
* <code>wrap()</code> and <code>unwrap()</code> methods will
* implicitly call this method if handshaking has not already begun.
* <P>
* Note that the peer may also request a session renegotiation with
* this <code>SSLEngine</code> by sending the appropriate
* session renegotiate handshake message.
* <P>
* Unlike the {@link SSLSocket#startHandshake()
* SSLSocket#startHandshake()} method, this method does not block
* until handshaking is completed.
* <P>
* To force a complete SSL/TLS session renegotiation, the current
* session should be invalidated prior to calling this method.
* <P>
* Some protocols may not support multiple handshakes on an existing
* engine and may throw an <code>SSLException</code>.
*/
public abstract void beginHandshake() throws SSLException;
在《Netty权威指南》中,我们看到了通信模型的进化: 从一个服务进程只服务一个客户端,到一个服务进程多个线程、每个线程服务一个客户端,再到线程池服务多个客户端(伪异步IO),再到Netty的Reactor。
在伪异步IO模型中,作者分析了阻塞的弊端,同时在《Netty实战》中,作者多次阐述了非阻塞的重要性,这是因为: 在伪异步IO模型达到极限,线程被阻塞时,系统切换线程将带来开销,以及线程池有线程数的限制造成级联阻塞。因而用响应式的编码风格能更进一步的提升机器的极限(并最终从量变引发质变)。