Reactor 模型的实现

NIO 常用的编程模型是 Reactor,在 Doug Lea 的 Scalable IO in Java 的 PPT 中对其进行了介绍,Reactor 的特点是 I/O 多路复用和事件驱动,基本处理过程为:

  • 处理程序声明感兴趣的 I/O 事件,这些事件表示在特定套接字上准备读取的情况
  • 事件通知器等待事件
  • 一个事件发生并唤醒通知器,通知器调用适当的处理程序
  • 事件处理程序执行实际的读取操作,并进行处理,然后重新声明关注的 I/O 事件,并将控制权返回给调度程序

其中通知器,就是 Selector。Reactor模型主要有以下几种版本:

  • 单线程Reactor,单线程处理器
  • 单线程Reactor,多线程处理器
  • 多线程主从Reactor,单线程处理器
  • 多线程主从Reactor,多线程处理器

单线程版本

Reactor 模型的实现

核心代码:

public void run() {

try {

if (state == READING) read();

else if (state == SENDING) send();

} catch (IOException ex) { /* ... */ }

}

void read() throws IOException {

socket.read(input);

if (inputIsComplete()) {

process();

state = SENDING;

// Normally also do first write now

sk.interestOps(SelectionKey.OP_WRITE);

}

}

void send() throws IOException {

socket.write(output);

if (outputIsComplete()) sk.cancel();

}

单线程的特点是:只有一个 Reactor 线程,即只有一个 Selector 事件通知器,也就是说,字节的读取 I/O 和后续的业务处理(process() 方法),均由 Reactor 线程来做,很显然业务的处理影响后续事件的分发,所以引出多线程版本进行优化。

多线程版本

Reactor 模型的实现

核心代码:

static PooledExecutor pool = new PooledExecutor(...);

static final int PROCESSING = 3;

// ...

synchronized void read() { // ...

socket.read(input);

if (inputIsComplete()) {

state = PROCESSING;

pool.execute(new Processer());

}

}

synchronized void processAndHandOff() {

process();

state = SENDING; // or rebind attachment

sk.interest(SelectionKey.OP_WRITE);

}

class Processer implements Runnable {

public void run() { processAndHandOff(); }

}

多线程版本的特点是:一个 Reactor 线程和多个处理线程,将业务处理(process 交给线程池)进行了分离,Reactor 线程,只关注事件分发和字节的发送和读取(I/O)。注意,实际的发送和读取还是由 Reactor 处理,那么在高并发下,有可能连接来不及接收,继续优化,采用主从 Reactor。

主从 Reactor

Reactor 模型的实现

核心代码:

Selector[] selectors; // also create threads

int next = 0;

class Acceptor { // ...

public synchronized void run() { ...

Socket connection = serverSocket.accept();

if (connection != null)

new Handler(selectors[next], connection);

if (++next == selectors.length) next = 0;

}

}

主从 Reactor 特点是:使用一个 Selector 池,通常有一个 主Reactor 用于处理接收连接事件,多个 从Reactor 处理实际的 I/O,整体来看,分工合作,分而治之,非常高效。

在真正实现时,有些细节需要注意,完整代码下载:https://github.com/rmwheel/reactor

其中包含对 Doug Lea 的 Scalable IO in Java 的 翻译,欢迎 star :)


分享到:


相關文章: