理解 Java NIO

寫在前面

在瞭解了 Java BIO (blocking I/O) , UNIX I/O 模型後再對 Java NIO 進行學習,個人感覺這種徐徐漸進的學習方式更適合自己。在深入細節之前要儘可能的從大局角度進行了解,在這時候多花些時間是值得的,我覺得這樣會很大程度上提升之後的學習效率。在對 Java I/O 有一個初步系統的理解(理解 Java I/O)前提下, 再延伸到 Java NIO , 對於 NIO 首先有這幾點疑惑:

  • Java NIO 採用哪種 I/O 模型?
  • Java NIO 是如何工作的?

之後的內容也專注於對這幾點疑問答案的尋找。


NIO 採用的 I/O 模型

UNIX I/O 模型有 : 阻塞 I/O (blocking I/O) , 非阻塞 I/O (non-blocking I/O) , 多路複用 I/O (multiplexing I/O) , 信號驅動 I/O (signal-driven I/O) , 異步 I/O (asynchronous I/O) 這5種,Java NIO 究竟是用的是那種模型?我在網上查了一下也沒有得到確切的答案。在 JDK 源碼中找到了答案。確定 Java NIO 使用的是 多路複用 I/O 模型

NIO 的核心類之一是 java.nio.channels.Selector ,通過靜態方法 open() 可以獲取到 Selector 的實例。JDK 文檔中的描述是 "可以通過調用此類的open()方法來創建選擇器,該方法將使用操作系統默認值selector provider 創建一個新的選擇器。還可以通過調用自定義選擇器提供程序(也就是自定義的 selector provider )的 openSelector()方法來創建選擇器。"

Selector open 函數源碼 :

<code>public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}/<code>


SelectorProvider 源碼 :

<code>public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<selectorprovider>() {
public SelectorProvider run() {

// 通過系統屬性實例化 java.nio.channels.spi.SelectorProvider 對象
if (loadProviderFromProperty())
return provider;

// 通過 SPI 機制實例化 java.nio.channels.spi.SelectorProvider 對象
if (loadProviderAsService())
return provider;

// 前兩種方式沒有提供使用系統默認的 java.nio.channels.spi.SelectorProvider 實例
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}


// 通過系統屬性實例化 java.nio.channels.spi.SelectorProvider 對象
private static boolean loadProviderFromProperty() {
String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
if (cn == null)
return false;
try {
Class> c = Class.forName(cn, true,
ClassLoader.getSystemClassLoader());

provider = (SelectorProvider)c.newInstance();
return true;
} catch (ClassNotFoundException x) {
throw new ServiceConfigurationError(null, x);
} catch (IllegalAccessException x) {
throw new ServiceConfigurationError(null, x);
} catch (InstantiationException x) {
throw new ServiceConfigurationError(null, x);
} catch (SecurityException x) {
throw new ServiceConfigurationError(null, x);
}
}

// 通過 SPI 機制實例化 java.nio.channels.spi.SelectorProvider 對象
private static boolean loadProviderAsService() {

ServiceLoader<selectorprovider> sl =
ServiceLoader.load(SelectorProvider.class,
ClassLoader.getSystemClassLoader());
Iterator<selectorprovider> i = sl.iterator();
for (;;) {
try {
if (!i.hasNext())
return false;
provider = i.next();
return true;
} catch (ServiceConfigurationError sce) {
if (sce.getCause() instanceof SecurityException) {
// Ignore the security exception, try the next provider
continue;
}
throw sce;
}
}
}/<selectorprovider>/<selectorprovider>/<selectorprovider>/<code>


DefaultSelectorProvider (這裡使用的是針對Windows系統的 JDK) 反編譯得到的源碼 :

<code>package sun.nio.ch;

import java.nio.channels.spi.SelectorProvider;

public class DefaultSelectorProvider {
private DefaultSelectorProvider() {
}

public static SelectorProvider create() {
return new WindowsSelectorProvider();
}
}
/<code>


WindowsSelectorProvider (這裡使用的是針對Windows系統的 JDK) 反編譯得到的源碼 :

<code>package sun.nio.ch;

import java.io.IOException;
import java.nio.channels.spi.AbstractSelector;

public class WindowsSelectorProvider extends SelectorProviderImpl {
public WindowsSelectorProvider() {
}

public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
}
/<code>


WindowsSelectorImpl (這裡使用的是針對Windows系統的 JDK) 反編譯得到的源碼 :

<code>protected int doSelect(long var1) throws IOException {
if (this.channelArray == null) {
throw new ClosedSelectorException();
} else {
this.timeout = var1;

this.processDeregisterQueue();
if (this.interruptTriggered) {
this.resetWakeupSocket();
return 0;
} else {
this.adjustThreadsCount();
this.finishLock.reset();
this.startLock.startThreads();

try {
this.begin();

try {
this.subSelector.poll();
} catch (IOException var7) {
this.finishLock.setException(var7);
}

if (this.threads.size() > 0) {
this.finishLock.waitForHelperThreads();
}
} finally {
this.end();
}

this.finishLock.checkForException();
this.processDeregisterQueue();
int var3 = this.updateSelectedKeys();
this.resetWakeupSocket();
return var3;
}
}
}

private final class SubSelector {

// ...

private int poll() throws IOException {
return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress, Math.min(WindowsSelectorImpl.this.totalChannels, 1024), this.readFds, this.writeFds, this.exceptFds, WindowsSelectorImpl.this.timeout);
}

private int poll(int var1) throws IOException {
return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress + (long)
(this.pollArrayIndex * PollArrayWrapper.SIZE_POLLFD), Math.min(1024,
WindowsSelectorImpl.this.totalChannels - (var1 + 1) * 1024), this.readFds, this.writeFds,
this.exceptFds, WindowsSelectorImpl.this.timeout);
}

// 調用了操作系統

private native int poll0(long var1, int var3, int[] var4, int[] var5, int[] var6, long
var7);

}
/<code>


DefaultSelectorProvider (這裡使用的是針對 Linux 系統的 JDK) 反編譯得到的源碼 :

<code>package sun.nio.ch;

import java.nio.channels.spi.SelectorProvider;
import java.security.AccessController;
import sun.security.action.GetPropertyAction;

public class DefaultSelectorProvider {
private DefaultSelectorProvider() {
}

private static SelectorProvider createProvider(String var0) {
Class var1;
try {
var1 = Class.forName(var0);
} catch (ClassNotFoundException var4) {
throw new AssertionError(var4);
}

try {
return (SelectorProvider)var1.newInstance();
} catch (InstantiationException | IllegalAccessException var3) {
throw new AssertionError(var3);
}
}


// 根據不同的操作系統獲取不同的 java.nio.channels.spi.SelectorProvider 實例對象
public static SelectorProvider create() {
String var0 = (String)AccessController.doPrivileged(new GetPropertyAction("os.name"));
if (var0.equals("SunOS")) {
return createProvider("sun.nio.ch.DevPollSelectorProvider");
} else {
return (SelectorProvider)(var0.equals("Linux") ? createProvider("sun.nio.ch.EPollSelectorProvider") : new PollSelectorProvider());

}
}
}
/<code>


EPollSelectorProvider (這裡使用的是針對 Linux 系統的 JDK) 反編譯得到的源碼 :

<code>package sun.nio.ch;

import java.io.IOException;
import java.nio.channels.Channel;
import java.nio.channels.spi.AbstractSelector;

public class EPollSelectorProvider extends SelectorProviderImpl {
public EPollSelectorProvider() {
}

public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}

public Channel inheritedChannel() throws IOException {
return InheritedChannel.getChannel();
}
}/<code>


EPollSelectorImpl (這裡使用的是針對 Linux 系統的 JDK) 反編譯得到的源碼 :

<code>protected int doSelect(long var1) throws IOException {
if (this.closed) {
throw new ClosedSelectorException();
} else {
this.processDeregisterQueue();

try {
this.begin();

this.pollWrapper.poll(var1);
} finally {
this.end();
}

this.processDeregisterQueue();
int var3 = this.updateSelectedKeys();
if (this.pollWrapper.interrupted()) {
this.pollWrapper.putEventOps(this.pollWrapper.interruptedIndex(), 0);
Object var4 = this.interruptLock;
synchronized(this.interruptLock) {
this.pollWrapper.clearInterrupted();
IOUtil.drain(this.fd0);
this.interruptTriggered = false;
}
}

return var3;
}
}
/<code>


EPollArrayWrapper (這裡使用的是針對 Linux 系統的 JDK) 反編譯得到的源碼 :

<code>int poll(long var1) throws IOException {
this.updateRegistrations();
this.updated = this.epollWait(this.pollArrayAddress, NUM_EPOLLEVENTS, var1, this.epfd);

for(int var3 = 0; var3 < this.updated; ++var3) {
if (this.getDescriptor(var3) == this.incomingInterruptFD) {
this.interruptedIndex = var3;
this.interrupted = true;
break;
}
}

return this.updated;
}


private native int epollCreate();

private native void epollCtl(int var1, int var2, int var3, int var4);


private native int epollWait(long var1, int var3, long var4, int var6) throws IOException;

private static native int sizeofEPollEvent();

private static native int offsetofData();

private static native void interrupt(int var0);

private static native void init();

static {
FD_OFFSET = DATA_OFFSET;
OPEN_MAX = IOUtil.fdLimit();
NUM_EPOLLEVENTS = Math.min(OPEN_MAX, 8192);
MAX_UPDATE_ARRAY_SIZE = ((Integer)AccessController.doPrivileged(new GetIntegerAction("sun.nio.ch.maxUpdateArraySize", Math.min(OPEN_MAX, 65536)))).intValue();
IOUtil.load();
init();
}/<code>

在對比了 Windows 平臺下和 Linux 平臺下 JDK 的代碼後, 現在就可以確定 Java NIO 使用的是 多路複用 I/O (multiplexing I/O) 模型了 。


poll 函數

poll函數用於監測多個等待事件,若事件未發生,進程睡眠,放棄CPU控制權,若監測的任何一個事件發生,poll將喚醒睡眠的進程,並判斷是什麼等待事件發生,執行相應的操作。 poll 函數沒有最大文件描述符數量的限制 , poll 函數的缺點是,包含大量文件描述符的數組被整體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨著文件描述符數量的增加而線性增大。

<code>#include <poll.h>


// *fdarray : 指向結構數組第一個元素的指針,每個數組都是一個 pollfd 結構,用於指定測試某個給定描述符 fd 的條件。
// nfds : 數組中元素個數。
// timeout : poll 函數返回前等待多長時間 , -1 - 永遠等待 ; 0 - 立即返回,不阻塞; > 0 等待指定的毫秒數
int poll(struct pollfd * fdarray, unsigned long nfds, int timeout)


struct pollfd

{
  int fd; // 每一個 pollfd 結構體指定了一個被監視的文件描述符

  short events; // 指定監測 fd 上發生的事件

  short revents; // 文件描述符的操作結果事件,內核在調用返回時設置這個域。events 域中請求的任何事件都可能在 revents 域中返回.
}

/<poll.h>/<code>

每個結構體的 events 域是由用戶來設置,告訴內核我們關注的是什麼,而 revents 域是返回時內核設置的,以說明對該描述符發生了什麼事件 。成功時,poll() 返回結構體中 revents 域不為 0 的文件描述符個數;如果在超時前沒有任何事件發生,poll()返回 0;失敗時,poll() 返回 -1,並設置 errno 為下列值之一:

EBADF:一個或多個結構體中指定的文件描述符無效。

EFAULT:fds 指針指向的地址超出進程的地址空間。

EINTR:請求的事件之前產生一個信號,調用可以重新發起。

EINVAL:nfds 參數超出 PLIMIT_NOFILE 值。

ENOMEM:可用內存不足,無法完成請求。


epoll 函數

epoll是一種IO多路轉接技術,在LINUX網絡編程中,經常用來做事件觸發,即當有特定事件到來時,能夠檢測到,而不必阻塞進行監聽。epoll有兩種工作方式,ET-水平觸發 和 LT-邊緣觸發(默認工作方式),主要的區別是:LT,內核通知你fd是否就緒,如果沒有處理,則會持續通知。而ET,內核只通知一次。

epoll 的優點 :

1. 支持進程打開大量數目的socket描述符,select支持的進程描述符由FD_SETSIZE設置,默認值為1024,而epoll不受這個限制。

2. epoll的效率,不隨監聽的socket數目增加而線性下降。select採用輪詢的方式,對socket集合的描述符表進行掃描,如果socket數量過大,並且大多數socket屬於idle狀態,select的掃描就做了很多無用功。epoll只會對活躍的socket進行操作,所以,在socket數量比較大,而絕大多數socket屬於idle狀態時,epoll的效率會遠勝於select。如果絕大多數socket是活躍的,由於epoll_ctl的影響,epoll的效率會稍微比select差。

3. 使用mmap加速內核與用戶空間的傳遞。

epoll 主要有三個接口 :

1. int epoll_create( int size ) : 創建一個epoll的句柄,size表示監聽的數目一共有多大。

2. int epoll_ctl( int epfd, int op, int fd, struct epoll_event* event ) : 事件註冊函數,epfd是epoll_create返回的句柄,op是表示做什麼動作,用三個宏表示:EPOLL_CTL_ADD:註冊新的fd到epfd中;EPOLL_CTL_MOD:修改已經註冊的fd的監聽事件;EPOLL_CTL_DEL:從epfd中刪除一個fd; fd表示要監聽的描述符,event表示內核要監聽什麼事,由以下幾個宏表示 : EPOLLIN :表示對應的文件描述符可以讀(包括對端SOCKET正常關閉);EPOLLOUT:表示對應的文件描述符可以寫;EPOLLPRI:表示對應的文件描述符有緊急的數據可讀;EPOLLERR:表示對應的文件描述符發生錯誤;EPOLLHUP:表示對應的文件描述符被掛斷;EPOLLET: 將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來說的。EPOLLONESHOT:只監聽一次事件,當監聽完這次事件之後,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL隊列裡 。

3. int epoll_wait( int epfd, struct epoll_event* events, int maxevents, int time_out ) : 等待事件的發生。events,存儲epoll_wait操作完成後,存儲的事件。maxevents表示當前要監聽的所有socket句柄數。time_out為超時時間。返回值表示需要處理的事件數目,0表示超時。

epoll 實現原理 :

1. epoll_create : 在epoll文件系統建立了個file節點,並開闢epoll自己的內核高速cache區,建立紅黑樹,分配好想要的size的內存對象,建立一個list鏈表,用於存儲準備就緒的事件。

2. epoll_ctl : 把要監聽的socket放到對應的紅黑樹上,給內核中斷處理程序註冊一個回調函數,通知內核,如果這個句柄的數據到了,就把它放到就緒列表。

3. epoll_wait : 觀察就緒列表裡面有沒有數據,並進行提取和清空就緒列表,非常高效。


Java NIO 核心組件

  • Buffer (緩衝區) : 數據存儲的緩衝區,對數據進行存、取的操作。
  • Channel (信道) : 表示與 I/O 設備或能夠執行一個或多個不同 I/O 操作的程序組件實體的開放連接,作用是進行數據的傳輸。
  • Selector (多路複用器) : Selector 是 NIO 得以實現的核心 ,Selector 可以知道當前哪些 Channel 處於就緒狀態 , 作用是獲取處於就緒狀態 Channel 。
  • SelectionKey (選擇鍵) : 選擇鍵封裝了 Channel 與 Selector 的關係,當 Channel 註冊到 Selector 上時會創建一個 SelectionKey 。SelectionKey 封裝了 Channel 感興趣的事件 , 比如 Accept, Connect , Read , Write 。事件驅動就是根據 Channel 當前觸發的事件進行相應的處理。SelectionKey 的作用是 , 記錄 Channel 在 Selector 上註冊了那種事件。


Java NIO 工作過程

根據我個人對 NIO 工作過程的理解整理。


理解 Java NIO


基於操作系統 多路複用 I/O 模型 , Java 也實現了無阻塞的多路複用 I/O ,大幅度的提升了 流 I/O 的性能 , 比起 NIO 出現之前的 BIO 有了很大的進步。只有在調用 select 的時候會阻塞直到函數返回,這是不可避免的。現在對 NIO 有了一個宏觀上的理解和認識 , 之後會深入到 NIO 的使用,原理細節。結合實際情況討論 NIO 的優點、缺點、優化改進等內容。


分享到:


相關文章: