Java Nio 之高級搬運工(FileChannel)二

Java Nio 之高級搬運工(FileChannel)二

are you ok?

前言

前段時間同事分享了一篇文章給我:為什麼Kafka速度這麼快? ,這篇文章相信大家也都看了。這篇文章說Kafka 有個作弊的技能 :直接從文件某個位置處讀取某個長度的字節直接發送給消費者,不需要讀到應用程序裡然後緩存在ByteBuffer 然後再往 客戶端寫;當時就對這項技術很著迷,上網搜了很多資料 很納悶它是怎麼實現的;上週在介紹FileChannel 的時候本來想只寫一篇文章的,後來看到了它的map 、transeferTo以及TranseferFrom 方法就覺得一篇文章寫不完,因為冗長的文章誰都不想看,所以另寫一篇來研究一下 FileChannel 的高性能之處,以及介紹下Kafka是怎麼使用的。

談談零拷貝

牢騷一下

Kafka 的高性能的重要點之一就在零拷貝上。零拷貝不是真的零拷貝,只不過是減少了拷貝的次數,為的不是減少DMA的拷貝次數,而是CPU 的拷貝次數,為啥呢?因為拷貝是個很簡單的操作,佔著CPU 的時間片簡直就是高射炮打蚊子。

傳統 Linux 服務器 傳輸數據 的流程

  • 1.應用程序調用系統方法read(),切換上下文:用戶——內核,操作系統會先檢查頁面緩存裡是否有要read 的內容,如果有則進行第二步,如果沒有則需要讓DMA 從指定磁盤位置上拷貝數據到內核緩衝區中,第一次拷貝由DMA 執行
  • 2.CPU 將數據從內核緩衝區拷貝到用戶緩衝區,read 調用返回,切換上下文:內核——用戶,第二次拷貝由CPU 執行
  • 3.應用程序調用系統write() 函數 ,切換上下文:用戶——內核,CPU 將用戶緩衝區的數據拷貝到socket 緩衝區,第三次拷貝由CPU執行
  • 4.write 調用返回,切換上下文:內核——用戶,然後DMA 異步將socket 緩衝區的數據拷貝到協議引擎中
  • 總結一下,需要4次拷貝,其中有兩次是需要CPU的執行,切換了4次上下文

零拷貝的兩種實現方式

mmap + write 方式

何為mmap 呢—— 將一個文件或者其它對象映射進內存。映射到的這塊內存區域在用戶程序使用的內存空間 和 棧之間不在內核內存空間, 因此內核程序和用戶程序都可以訪問,如下草圖:


Java Nio 之高級搬運工(FileChannel)二


mmap 、 munmap 和msync()函數

void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
  • addr 映射區的開始位置
  • length 映射區的長度
  • prot 期望的內存保護標誌,可由如下幾種方式組合:
  • PROT_EXEC 頁內容可被執行
  • PROT_READ 頁內容可被讀取
  • PROT_WRITE 頁內容可被寫入
  • PROT_NONE 頁內容不可以被訪問
  • flags 影響內存區域的各種特性,可由以下幾種方式組合:
  • MAP_FIXED 使用指定的起始位置,若是addr和length 重疊於現存的映射空間則重疊部分會丟失,不會對地址做出修正,不建議使用該選項。
  • MAP_SHARED 對該映射區域的更改會同步到文件裡,而且允許其它映射該文件的進程共享該映射區域
  • MAP_PRIVATE 對映射區域的寫入操作會產生一個映射文件的複製,即私有的“寫入時複製”(copy on write)對此區域作的任何修改都不會寫回原來的文件內容。
  • MAP_NORESERVE 不要為這個映射保留交換空間。當交換空間被保留,對映射區修改的可能會得到保證。當交換空間不被保留,同時內存不足,對映射區的修改會引起段違例信號
  • MAP_ANONYMOUS 匿名映射,不與任何文件關聯
  • fd 被映射對象,若為匿名映射則為-1
  • offset 被映射對象的起始偏移
int munmap(void *addr, size_t length);
調用該函數可以解除 映射對象與addr 處開始的length 長度的內存空間的映射關係

addr mmap 函數返回的映射區域首地址

length 映射區域的長度

int msync ( void * addr , size_t len, int flags) ;
一般情況下 對映射空間的共享內容更改不會直接寫到文件裡,當然執行完 munmap 函數也可以,除了執行它,還可以執行 msync 函數來將修改的共享內容同步到文件

說說mmap + write 的 流程

  • 用戶程序調用mmap函數,將 文件內容映射到內存映射區域。先由內存空間切換到內核空間,然後由內核空間切換到用戶空間,完成兩次上下文切換,DMA 將文件內容拷貝到內存映射區域
  • 調用write 函數,cpu將 內存映射區域的內容拷貝到 socket緩衝區,程序調用返回然後DMA 異步從socket 緩衝區拷貝到協議引擎的緩衝區
  • 發生 1次cpu 拷貝,2次DMA 拷貝

來看看Java 下的mmap

抽象類 MappedByteBuffer

定義

直接字節緩衝區,其內容是文件的內存映射區域。可由FileChannel#map 方法創建。該類通過增加對內存映射區域的特定操作擴展了ByteFuffer 類。映射字節緩衝區與它所映射的文件直到它自己被垃圾回收之前都是存在的。

tips

映射字節緩衝區的內容任何時候都可以被修改,例如,映射文件對應的區域被當前程序或者其他程序所更改。至於是否發生或者什麼時候發生,都由操作系統來決定。

映射字節緩衝區的部分或者全部在任何時候都會變得不可訪問,例如,映射的文件被截斷了。嘗試訪問不可訪問的映射字節的緩衝區的那一部分,將會有不友好的異常拋出。需要強烈的提醒,避免讓當前程序或者其他程序對這個映射文件進行操作,除了讀或者寫它的內容。

方法

load() 該方法會盡最大可能將映射文件裡的內容加載到物理內存中,可能會在加載的時候導致一些頁面錯誤和IO操作。

isLoad() 返回 映射文件內容是否駐留在物理內存中。

force() 對映射內存區域的寫入,並不會直接同步到文件中, 在解除映射關係的時候修改的內容才會同步到文件中。 調用該方法會將對映射區域的修改同步到磁盤,這就與上面的方法msync方法對應。

FileChannel # map 方法

方法簽名

public abstract MappedByteBuffer map(MapMode mode,long position, long size) throws IOException;

參數小解

mode 為MapMode 中的READ_ONLY,READ_WRITE,PRIVATE中的其中一個,分別表示 只讀,可讀可寫和寫時複製與上述 mmap 方法中flags參數對應

position 從文件的哪裡開始映射,對應上述 mmap 方法 中的offset參數

size 從文件position處開始映射多少個字節

Java 語言實現 mmap+write

簡述:將文件a.txt 中的0到14個字節發給服務端

package zym.netty.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
/**
 * file channel map study
 *
 * @author 24160
 */
public class FileChannelMapStudy {
 public static final String FILE_CHANNEL_MAP_STUDY_TXT = "a.txt";
 public static final int INT_BYTES_LENGTH = 4;
 public static void main(String[] args) {
 prepareEnviroment();
 try (FileChannel fileChannel = FileChannel.open(Paths.get(FILE_CHANNEL_MAP_STUDY_TXT), StandardOpenOption.READ)) {
 long size = fileChannel.size();
 //將a.txt 文件映射到內存緩衝區,從0位置處映射,映射10個字節長度,該映射內存緩衝區只可讀
 MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, 14);
 //創建一個SocketChannel實例
 SocketChannel client = SocketChannel.open();
 //連接服務端
 client.connect(new InetSocketAddress("127.0.0.1", 8080));
 //寫文件內容到服務端
 client.write(mappedByteBuffer);
 //讀取文件內容 網絡協議為 head + body 如6zengyi
 ByteBuffer head = ByteBuffer.allocate(INT_BYTES_LENGTH);
 while (client.read(head) != 0) {}
 //切換讀寫模式
 head.flip();
 //讀取body
 ByteBuffer body = ByteBuffer.allocate(head.getInt());
 while (client.read(body) != 0) {}
 //切換讀寫模式
 body.flip();
 System.out.println(String.format("發送字節成功,服務端返回:%s", new String(body.array())));
 } catch (IOException e) {
 e.printStackTrace();
 }
 }
 private static void prepareEnviroment() {
 try (FileChannel fileChannel = FileChannel.open(Paths.get(FILE_CHANNEL_MAP_STUDY_TXT), StandardOpenOption.CREATE,StandardOpenOption.READ, StandardOpenOption.WRITE)) {
 //將a.txt 映射文件到內存映射區域,模式為可讀可寫
 MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 14);
 //放進去一個int 為10
 mappedByteBuffer.putInt(10);
 mappedByteBuffer.put("zengyiming".getBytes());
 //強制刷盤
 mappedByteBuffer.force();
 } catch (IOException e) {
 e.printStackTrace();
 }
 }
}

服務端代碼詳見:
https://github.com/241600489/homeworks/blob/master/src/main/java/zym/netty/nio/NioServer.java

下面我們來看看kafka 是如何使用mmap,kafka AbstractIndex.scala 代碼片段

 @volatile
 protected var mmap: MappedByteBuffer = {
 val newlyCreated = file.createNewFile()
 val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
 try {
 /* 如果是新創建則給file 預留分配空間 maxIndexSize 不超過50MB 單位為字節 */
 if(newlyCreated) {
 if(maxIndexSize < entrySize)
 throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
 raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
 }
 /* memory-map the file */
 /* 開始內存映射文件*/
 _length = raf.length()
 val idx = {
 if (writable)
 /*如果可寫,則映射模式為可讀可寫*/
 raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
 else
 /*若可讀,則映射模式為可讀*/
 raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
 }
 /* set the position in the index for the next entry */
 /*為下一個條目 設置 buffer 中的position值*/
 if(newlyCreated)
 idx.position(0)
 else
 // if this is a pre-existing index, assume it is valid and set position to last entry
 //如果這是一個預先存在的索引,則假設它有效並將位置設置為最後一個條目
 idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
 idx
 } finally {
 CoreUtils.swallow(raf.close(), AbstractIndex)
 }
 }

kafka 的索引文件是映射到內存映射區域的,對消息偏移量的讀寫都是基於MappedByteBuffer 之上,當然牛逼的kafka 作者們 發明了一個簡單且緩存命中友好的二叉查找算法,這個算法有機會和大家聊下。


分享到:


相關文章: