Netty源碼學習(4)-- Channel註冊流程


Netty源碼學習(4)-- Channel註冊流程


channel的註冊主要在AbstractBootstrap類下的

<code>ChannelFuture regFuture = config().group().register(channel);/<code>

config方法返回的是ServerBootstrapConfig對象

然後調用下面這個方法:

MultithreadEventLoopGroup

<code>@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}/<code>

接下來就是next()方法了:

MultithreadEventExecutorGroup#next

<code>private final EventExecutorChooserFactory.EventExecutorChooser chooser;
@Override
public EventExecutor next() {
    return chooser.next();
}/<code>

EventExecutorChooserFactory

<code>@UnstableApi
public interface EventExecutorChooserFactory {

    /**
     * Returns a new {@link EventExecutorChooser}.
     */
    EventExecutorChooser newChooser(EventExecutor[] executors);

    /**
     * Chooses the next {@link EventExecutor} to use.
     */
    @UnstableApi
    interface EventExecutorChooser {

        /**
         * Returns the new {@link EventExecutor} to use.
         */
        EventExecutor next();
    }
}/<code>

EventExecutorChooserFactory的實現只有一個,跟進去發現其實這個簡單的算法,也就是說會以round-robin的方式(可以看作隨機選一個)選一個executor。

這裡總結一下,也就是說EventLoopGroup可以看作一個獨立的線程池,這個線程池中有很多的executor,現在這個裡面還有一個叫EventExecutorChooserFactory,它的作用僅僅是從線程池中的executor中隨機返回一個出來。

回到MultithreadEventLoopGroup,調用完next之後得到了executor選擇器。

通過debug,可知,返回的這個executor是SingleThreadEventLoop中的register方法,這個loop會講所有的event放到一個線程去執行。

SingleThreadEventLoop#register

<code>@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}/<code>

unsafe不是sun提供的,而是netty在AbstractNioChannel中定義的unsafe

<code>@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}/<code>

判斷一下當前正在執行方法調用線程和eventLoop所維護的線程是否為同一個線程對象(singleThreadEventExecutor中只有一個線程)。


<code>@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}/<code>

到這裡,就真正調用了底層的註冊方式,就到了NIO的領域了。

本文出自知乎

Netty源碼學習(4)-- Channel註冊流程

更多相關內容,Java架構師,軟件開發等學習資料,電子書及視頻還有高級講師公開課免費資源

需要的可以私聊小編髮送【學習】二字


分享到:


相關文章: