channel的註冊主要在AbstractBootstrap類下的
<code>ChannelFuture regFuture = config().group().register(channel);/<code>
config方法返回的是ServerBootstrapConfig對象
然後調用下面這個方法:
MultithreadEventLoopGroup
<code>@Override public ChannelFuture register(Channel channel) { return next().register(channel); }/<code>
接下來就是next()方法了:
MultithreadEventExecutorGroup#next
<code>private final EventExecutorChooserFactory.EventExecutorChooser chooser; @Override public EventExecutor next() { return chooser.next(); }/<code>
EventExecutorChooserFactory
<code>@UnstableApi public interface EventExecutorChooserFactory { /** * Returns a new {@link EventExecutorChooser}. */ EventExecutorChooser newChooser(EventExecutor[] executors); /** * Chooses the next {@link EventExecutor} to use. */ @UnstableApi interface EventExecutorChooser { /** * Returns the new {@link EventExecutor} to use. */ EventExecutor next(); } }/<code>
EventExecutorChooserFactory的實現只有一個,跟進去發現其實這個簡單的算法,也就是說會以round-robin的方式(可以看作隨機選一個)選一個executor。
這裡總結一下,也就是說EventLoopGroup可以看作一個獨立的線程池,這個線程池中有很多的executor,現在這個裡面還有一個叫EventExecutorChooserFactory,它的作用僅僅是從線程池中的executor中隨機返回一個出來。
回到MultithreadEventLoopGroup,調用完next之後得到了executor選擇器。
通過debug,可知,返回的這個executor是SingleThreadEventLoop中的register方法,這個loop會講所有的event放到一個線程去執行。
SingleThreadEventLoop#register
<code>@Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }/<code>
unsafe不是sun提供的,而是netty在AbstractNioChannel中定義的unsafe
<code>@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }/<code>
判斷一下當前正在執行方法調用線程和eventLoop所維護的線程是否為同一個線程對象(singleThreadEventExecutor中只有一個線程)。
<code>@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }/<code>
到這裡,就真正調用了底層的註冊方式,就到了NIO的領域了。
本文出自知乎
更多相關內容,Java架構師,軟件開發等學習資料,電子書及視頻還有高級講師公開課免費資源
需要的可以私聊小編髮送【學習】二字