Netty零拷貝機制

零拷貝機制是Netty高性能的一個原因,之前都是說netty的線程模型,責任鏈,說說netty底層的優化,優化就是netty自己的一個緩衝區。

常見的方法定義

  1. 隨機訪問索引 getByte
  2. 順序讀 read*
  3. 順序寫 write*
  4. 清除已讀內容discardReadBytes
  5. 清除緩衝區 clear
  6. 搜索操作
  7. 標記和重置
  8. 引用計數和釋放
  • ④ 緩衝區是如何被兩個指針分割成三個區域的
  1. discardable bytes 已讀可丟棄區域
  2. readable bytes 可讀區域
  3. writable bytes 待寫區域
Netty零拷貝機制

  • ⑤ 實例
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Test;

import java.util.Arrays;

/**
* bytebuf的常規API操作示例
*/
public class ByteBufDemo {
@Test
public void apiTest() {
// +-------------------+------------------+------------------+
// | discardable bytes | readable bytes | writable bytes |
// | | (CONTENT) | |
// +-------------------+------------------+------------------+
// | | | |
// 0 <= readerIndex <= writerIndex <= capacity

// 1.創建一個非池化的ByteBuf,大小為10個字節
ByteBuf buf = Unpooled.buffer(10);
System.out.println("原始ByteBuf為====================>" + buf.toString());
System.out.println("1.ByteBuf中的內容為===============>" + Arrays.toString(buf.array()) + "\\n");

// 原始ByteBuf為====================>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
// 1.ByteBuf中的內容為===============>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]



// 2.寫入一段內容
byte[] bytes = {1, 2, 3, 4, 5};
buf.writeBytes(bytes);
System.out.println("寫入的bytes為====================>" + Arrays.toString(bytes));
System.out.println("寫入一段內容後ByteBuf為===========>" + buf.toString());
System.out.println("2.ByteBuf中的內容為===============>" + Arrays.toString(buf.array()) + "\\n");

// 寫入的bytes為====================>[1, 2, 3, 4, 5]
// 寫入一段內容後ByteBuf為===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 5, cap: 10)

// 2.ByteBuf中的內容為===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]



// 3.讀取一段內容
byte b1 = buf.readByte();
byte b2 = buf.readByte();
System.out.println("讀取的bytes為====================>" + Arrays.toString(new byte[]{b1, b2}));
System.out.println("讀取一段內容後ByteBuf為===========>" + buf.toString());
System.out.println("3.ByteBuf中的內容為===============>" + Arrays.toString(buf.array()) + "\\n");

// 讀取的bytes為====================>[1, 2]
// 讀取一段內容後ByteBuf為===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 2, widx: 5, cap: 10)
// 3.ByteBuf中的內容為===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]



// 4.將讀取的內容丟棄
buf.discardReadBytes();
System.out.println("將讀取的內容丟棄後ByteBuf為========>" + buf.toString());
System.out.println("4.ByteBuf中的內容為===============>" + Arrays.toString(buf.array()) + "\\n");

// 將讀取的內容丟棄後ByteBuf為========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
// 4.ByteBuf中的內容為===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]



// 5.清空讀寫指針
buf.clear();
System.out.println("將讀寫指針清空後ByteBuf為==========>" + buf.toString());
System.out.println("5.ByteBuf中的內容為===============>" + Arrays.toString(buf.array()) + "\\n");

// 將讀寫指針清空後ByteBuf為==========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
// 5.ByteBuf中的內容為===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]



// 6.再次寫入一段內容,比第一段內容少

byte[] bytes2 = {1, 2, 3};
buf.writeBytes(bytes2);
System.out.println("寫入的bytes為====================>" + Arrays.toString(bytes2));
System.out.println("寫入一段內容後ByteBuf為===========>" + buf.toString());
System.out.println("6.ByteBuf中的內容為===============>" + Arrays.toString(buf.array()) + "\\n");

// 寫入的bytes為====================>[1, 2, 3]
// 寫入一段內容後ByteBuf為===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
// 6.ByteBuf中的內容為===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]



// 7.將ByteBuf清零
buf.setZero(0, buf.capacity());
System.out.println("將內容清零後ByteBuf為==============>" + buf.toString());
System.out.println("7.ByteBuf中的內容為================>" + Arrays.toString(buf.array()) + "\\n");

// 將內容清零後ByteBuf為==============>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
// 7.ByteBuf中的內容為================>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


// 8.再次寫入一段超過容量的內容
byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
buf.writeBytes(bytes3);
System.out.println("寫入的bytes為====================>" + Arrays.toString(bytes3));
System.out.println("寫入一段內容後ByteBuf為===========>" + buf.toString());
System.out.println("8.ByteBuf中的內容為===============>" + Arrays.toString(buf.array()) + "\\n");


// 寫入的bytes為====================>[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
// 寫入一段內容後ByteBuf為===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 14, cap: 64)
// 8.ByteBuf中的內容為===============>[0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

// 隨機訪問索引 getByte
// 順序讀 read*
// 順序寫 write*
// 清除已讀內容 discardReadBytes
// 清除緩衝區 clear

// 搜索操作
// 標記和重置
// 完整代碼示例:參考
// 搜索操作 讀取指定位置 buf.getByte(1);
//
}

}


Netty零拷貝機制

Netty零拷貝機制

  • ⑥ ByteBuf 動態擴容

capacity 默認值:256字節,最大值:Integer.MAX_VALUE(2GB)

write 方法調用時,通過AbstractByteBuf.ensureWritable進行檢查。
容量計算方法:AbstractByteBufAllocator.calculateNewCapacity(新capacity的最小要求,capacity最大值)

根據新的capacity的最小值要求,對應有兩套計算方法

沒超過4兆:從64字節開發,每次增加一倍,直至計算出來的newCapacity滿足新容量最小要求。示例:當前大小256,已寫250,繼續寫10字節數據,需要的容量最小要求是261,則新容量是6422*2=512

超過4兆:新容量 = 新容量最小要求/4兆 * 4兆 +4兆
示例:當前大小3兆,已寫3兆,繼續寫2兆數據,需要的容量最小要求是5兆, 則新容量是9兆(不能超過最大值)

  • ⑦ 選擇合適的 ByteBuf 實現

在實際使用中都是通過 ByteBufAllocator 分配器進行申請,同時分配器具有內存管理的功能。

Netty零拷貝機制

  1. unsafe 用到了 Unsafe 工具類,Unsafe 是 Java 保留的一個底層工具包,safe 則沒有用到 unsafe 工具類。
  2. unsafe 意味著不安全的操作,但是更底層的操作會帶來性能提升和特殊功能,Netty 中會盡力使用 unsafe。
  3. Java 語言很重要的特性是“一次編寫導出運行”,所以它針對底層的內存或其他操作,做了很多封裝。而 unsafe 提供了一系列操作底層的方法,可能會導致不兼容或者不可知的異常。

unpool 每次申請緩衝區時會新建一個,並不會複用,使用 Unpooled 工具類可以創建 unpool 的緩衝區。

Netty 沒有給出很便捷的 pool 類型的緩衝區的創建方法。使用 ChannelConfig.getAllocator() 時,獲取到的分配器是默認支持內存複用的。

  • ⑧ pooledByteBuf對象、內存

PoolThreadCache: PooledByteBufAllocator 實例維護了一個線程變量。
多種分類的MemoryRegionCache數組用作內存緩存,MemoryRegionCache內部是鏈表,隊列裡面存Chunk。
Pool Chunk裡面維護了內存引用,內存複用的做法就是把buf的memory指向Chunck的memory。

(二)零拷貝

  • ① 介紹

Netty 的零拷貝機制,是一種應用層的實現。

  • ② 拷貝方式

一般的數組合並,會創建一個大的數組,然後將需要合併的數組放進去。
Netty 的 CompositeButyBuf 將多個 ByteBuf 合併為一個邏輯上的 ByteBuf,避免了各個 ByteBuf 之間的拷貝。

CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
CompositeByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2);

wrappedBuffer 方法將 byte[] 數組包裝成 ByteBuf 對象

ByteBuf byteBuf = Unpooled.wrappedBuffer(new Byte[]{1, 2, 3, 4, 5});

slice 方法將一個 ByteBuf 對象切分成多個 ByteBuf 對象

 ByteBuf buffer1 = Unpooled.wrappedBuffer("hello".getBytes());
ByteBuf newBuffer = buffer1.slice(1, 2);
newBuffer.unwrap();
  • ③ 實例
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;

import java.nio.charset.Charset;

/**

* 零拷貝示例
*/
public class ZeroCopyTest {
@org.junit.Test
public void wrapTest() {
byte[] arr = {1, 2, 3, 4, 5};
ByteBuf byteBuf = Unpooled.wrappedBuffer(arr);
System.out.println(byteBuf.getByte(4));
arr[4] = 6;
System.out.println(byteBuf.getByte(4));
}

@org.junit.Test
public void sliceTest() {
ByteBuf buffer1 = Unpooled.wrappedBuffer("hello".getBytes());
ByteBuf newBuffer = buffer1.slice(1, 2);
newBuffer.unwrap();
System.out.println(newBuffer.toString());
}

@org.junit.Test
public void compositeTest() {
ByteBuf buffer1 = Unpooled.buffer(3);
buffer1.writeByte(1);
ByteBuf buffer2 = Unpooled.buffer(3);
buffer2.writeByte(4);
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
CompositeByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2);
System.out.println(newBuffer);
}

}

PS:API操作便捷性,動態擴容,多種ByteBuf實現,高效的零拷貝機制(邏輯上邊的設計)上邊的所有就是nettyByteBuf所做的工作,性能提升,操作性增強。有了理論下節開始實戰netty。


分享到:


相關文章: