Redis客戶端Lettuce源碼「三」Lettuce是如何發送Command命令的

lettuce-core版本: 5.1.7.RELEASE

在上一篇介紹了Lettuce是如何基於Netty與Redis建立連接的,其中提到了一個很重要的CommandHandler類,這一期主要解決兩個問題

1.CommandHandler是如何在發送Command時發揮作用的

2.Lettuce是如何實現多線程共享同一個物理連接的。


回顧一下我們的之前示例代碼,這一篇主要是跟進去sync.get方法看看Lettuc是如何發送get命令到Redis以及是如何讀取Redis的命令的。

<code>/**
* @author xiaobing
* @date 2019/12/20
*/
public class LettuceSimpleUse {
private void testLettuce() throws ExecutionException, InterruptedException {
//構建RedisClient對象,RedisClient包含了Redis的基本配置信息,可以基於RedisClient創建RedisConnection
RedisClient client = RedisClient.create("redis://localhost");

//創建一個線程安全的StatefulRedisConnection,可以多線程併發對該connection操作,底層只有一個物理連接.
StatefulRedisConnection<string> connection = client.connect();

//獲取SyncCommand。Lettuce支持SyncCommand、AsyncCommands、ActiveCommand三種command
RedisStringCommands<string> sync = connection.sync();
String value = sync.get("key");
System.out.println("get redis value with lettuce sync command, value is :" + value);

//獲取SyncCommand。Lettuce支持SyncCommand、AsyncCommands、ActiveCommand三種command
RedisAsyncCommands<string> async = connection.async();
RedisFuture<string> getFuture = async.get("key");
value = getFuture.get();
System.out.println("get redis value with lettuce async command, value is :" + value);

}

public static void main(String[] args) throws ExecutionException, InterruptedException {
new LettuceSimpleUse().testLettuce();
}
}/<string>/<string>/<string>/<string>/<code>

在看sync.get方法之前先看一下RedisStringCommands是如何生成生成的,從下面的代碼可以看到RedisStringCommands其實是對RedisAsyncCommands<string>方法調用的同步阻塞版本。/<string>

<code>    //創建一個sync版本的RedisCommand
protected RedisCommands newRedisSyncCommandsImpl() {
//async()方法返回的就是該Connection對應的RedisAsyncCommand
return syncHandler(async(), RedisCommands.class, RedisClusterCommands.class);
}
//返回一個動態代理類,代理類的實現在FutureSyncInvocationHandler類中
protected T syncHandler(Object asyncApi, Class>... interfaces) {
FutureSyncInvocationHandler h = new FutureSyncInvocationHandler((StatefulConnection, ?>) this, asyncApi, interfaces);
//基於FutureSyncInvocationHandler生成動態代理類
return (T) Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, h);
}
//異步轉同步的關鍵
class FutureSyncInvocationHandler extends AbstractInvocationHandler {

...

@Override
@SuppressWarnings("unchecked")
protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {

try {

Method targetMethod = this.translator.get(method);
Object result = targetMethod.invoke(asyncApi, args);

// RedisAsyncCommand返回的大部分對象類型都是RedisFuture類型的
if (result instanceof RedisFuture>) {

RedisFuture> command = (RedisFuture>) result;

if (isNonTxControlMethod(method.getName()) && isTransactionActive(connection)) {
return null;
}
//獲取配置的超時時間
long timeout = getTimeoutNs(command);
//阻塞的等待RedisFuture返回結果
return LettuceFutures.awaitOrCancel(command, timeout, TimeUnit.NANOSECONDS);
}

return result;
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
}
}
...
/<code>

所以sync.get操作最終調用的還是async.get操作,接下來看async.get是怎麼做的。還是先看一張時序圖,心裡有一個概念。


Redis客戶端Lettuce源碼「三」Lettuce是如何發送Command命令的

lettuce發送command流程圖


AbstractRedisAsyncCommands

<code>    @Override
public RedisFuture get(K key) {
return dispatch(commandBuilder.get(key));
}
/<code>

commandBuilder.get(key)

這一步驟主要是根據用戶的輸入參數key、命令類型get、序列化方式來生成一個command對象。而這個command對象會按照Redis的協議格式把命令序列化成字符串。

<code>    Command get(K key) {
notNullKey(key);
//Valueoutput基於序列化
return createCommand(GET, new ValueOutput<>(codec), key);
}

protected Command createCommand(CommandType type, CommandOutput output, K key) {
CommandArgs args = new CommandArgs(codec).addKey(key);
return createCommand(type, output, args);
}

protected Command createCommand(CommandType type, CommandOutput output, CommandArgs args) {
return new Command(type, output, args);
}
/<code>

AbstractRedisAsyncCommands.dispatch

<code>public  AsyncCommand dispatch(RedisCommand cmd) {
//用AsyncCommand對RedisCommand做一個包裝處理,這個AsyncCommand實現了RedisFuture接口,最後返回給調用方的就是這個對象。當Lettuce收到Redis的返回結果時會調用AsyncCommand的complete方法,異步的方式返回數據。
AsyncCommand asyncCommand = new AsyncCommand<>(cmd);
//調用connection的dispatch方法把Command發送給Redis,這個connection就是上一篇中說的那個StatefulRedisConnectionImpl
RedisCommand dispatched = connection.dispatch(asyncCommand);
if (dispatched instanceof AsyncCommand) {
return (AsyncCommand) dispatched;
}
return asyncCommand;
}
/<code>

StatefulRedisConnectionImpl.dispatch

<code>    @Override
public RedisCommand
dispatch(RedisCommand command) {
//對command做預處理,當前主要是根據不同的命令配置一些異步處理,如:auth命令之後成功之後把password寫入到相應變量中,select db操作成功之後把db值寫入到相應變量中等等。
RedisCommand toSend = preProcessCommand(command);

try {
//真正的dispatch是在父類實現的
return super.dispatch(toSend);
} finally {
if (command.getType().name().equals(MULTI.name())) {
multi = (multi == null ? new MultiOutput<>(codec) : multi);
}
}
}
//父類RedisChannelHandler的dispatch方法
protected RedisCommand dispatch(RedisCommand cmd) {

if (debugEnabled) {
logger.debug("dispatching command {}", cmd);
}
//tracingEnable的代碼先不用看
if (tracingEnabled) {

RedisCommand commandToSend = cmd;
TraceContextProvider provider = CommandWrapper.unwrap(cmd, TraceContextProvider.class);

if (provider == null) {
commandToSend = new TracedCommand<>(cmd, clientResources.tracing()
.initialTraceContextProvider().getTraceContext());
}

return channelWriter.write(commandToSend);
}
//其實就是直接調用channelWriter.write方法,而這個channelWriter就是上一節說的那個屏蔽底層channel實現的DefaultEndpoint類

return channelWriter.write(cmd);
}
/<code>

DefaultEndpoint.write

<code>    @Override
public RedisCommand write(RedisCommand command) {

LettuceAssert.notNull(command, "Command must not be null");

try {
//sharedLock是Lettuce自己實現的一個共享排他鎖。incrementWriters相當於獲取一個共享鎖,當channel狀態發生變化的時候,如斷開連接時會獲取排他鎖執行一些清理操作。
sharedLock.incrementWriters();
// validateWrite是驗證當前操作是否可以執行,Lettuce內部維護了一個保存已經發送但是還沒有收到Redis消息的Command的stack,可以配置這個stack的長度,防止Redis不可用時stack太長導致內存溢出。如果這個stack已經滿了,validateWrite會拋出異常
validateWrite(1);
//autoFlushCommands默認為true,即每執行一個Redis命令就執行Flush操作發送給Redis,如果設置為false,則需要手動flush。由於flush操作相對較重,在某些場景下需要繼續提升Lettuce的吞吐量可以考慮設置為false。
if (autoFlushCommands) {
if (isConnected()) {
//寫入channel並執行flush操作,核心在這個方法的實現中

writeToChannelAndFlush(command);
} else {
// 如果當前channel連接已經斷開就先放入Buffer中,直接返回AsyncCommand,重連之後會把Buffer中的Command再次嘗試通過channel發送到Redis中
writeToDisconnectedBuffer(command);
}

} else {
writeToBuffer(command);
}
} finally {
//釋放共享鎖
sharedLock.decrementWriters();
if (debugEnabled) {
logger.debug("{} write() done", logPrefix());
}
}

return command;
}
/<code>

DefaultEndpoint.writeToChannelAndFlush

<code>    private void writeToChannelAndFlush(RedisCommand, ?, ?> command) {
//queueSize字段做cas +1操作
QUEUE_SIZE.incrementAndGet(this);

ChannelFuture channelFuture = channelWriteAndFlush(command);
//Lettuce的可靠性:保證最多一次。由於Lettuce的保證是基於內存的,所以並不可靠(系統crash時內存數據會丟失)
if (reliability == Reliability.AT_MOST_ONCE) {
// cancel on exceptions and remove from queue, because there is no housekeeping
channelFuture.addListener(AtMostOnceWriteListener.newInstance(this, command));
}
//Lettuce的可靠性:保證最少一次。由於Lettuce的保證是基於內存的,所以並不可靠(系統crash時內存數據會丟失)
if (reliability == Reliability.AT_LEAST_ONCE) {

// commands are ok to stay within the queue, reconnect will retrigger them
channelFuture.addListener(RetryListener.newInstance(this, command));
}
}

//可以看到最終還是調用了channle的writeAndFlush操作,這個Channel就是netty中的NioSocketChannel
private ChannelFuture channelWriteAndFlush(RedisCommand, ?, ?> command) {

if (debugEnabled) {
logger.debug("{} write() writeAndFlush command {}", logPrefix(), command);
}

return channel.writeAndFlush(command);
}/<code>
<code>public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {

if (debugEnabled) {
logger.debug("{} write(ctx, {}, promise)", logPrefix(), msg);
}

if (msg instanceof RedisCommand) {
//如果是單個的RedisCommand就直接調用writeSingleCommand返回
writeSingleCommand(ctx, (RedisCommand, ?, ?>) msg, promise);
return;
}

if (msg instanceof List) {

List<rediscommand>> batch = (List<rediscommand>>) msg;

if (batch.size() == 1) {

writeSingleCommand(ctx, batch.get(0), promise);
return;
}
//批量寫操作,暫不關心
writeBatch(ctx, batch, promise);
return;
}

if (msg instanceof Collection) {
writeBatch(ctx, (Collection<rediscommand>>) msg, promise);
}
}/<rediscommand>/<rediscommand>/<rediscommand>/<code>

writeSingleCommand 核心在這裡

Lettuce使用單一連接支持多線程併發向Redis發送Command,那Lettuce是怎麼把請求Command與Redis返回的結果對應起來的呢,秘密就在這裡。

<code>private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand, ?, ?> command, ChannelPromise promise)
{

if (!isWriteable(command)) {
promise.trySuccess();
return;
}
//把當前command放入一個特定的棧中,這一步是關鍵
addToStack(command, promise);
// Trace操作,暫不關心
if (tracingEnabled && command instanceof CompleteableCommand) {
...
}
//調用ChannelHandlerContext把命令真正發送給Redis,當然在發送給Redis之前會由CommandEncoder類對RedisCommand進行編碼後寫入ByteBuf
ctx.write(command, promise);

private void addToStack(RedisCommand, ?, ?> command, ChannelPromise promise) {

try {
//再次驗證隊列是否滿了,如果滿了就拋出異常
validateWrite(1);
//command.getOutput() == null意味這個這個Command不需要Redis返回影響。一般不會走這個分支
if (command.getOutput() == null) {
// fire&forget commands are excluded from metrics
complete(command);
}
//這個應該是用來做metrics統計用的,暫時先不考慮
RedisCommand, ?, ?> redisCommand = potentiallyWrapLatencyCommand(command);
//無論promise是什麼類型的,最終都會把command放入到stack中,stack是一個基於數組實現的雙向隊列

if (promise.isVoid()) {
//如果promise不是Future類型的就直接把當前command放入到stack
stack.add(redisCommand);
} else {
//如果promise是Future類型的就等future完成後把當前command放入到stack中,當前場景下就是走的這個分支
promise.addListener(AddToStack.newInstance(stack, redisCommand));
}
} catch (Exception e) {
command.completeExceptionally(e);
throw e;
}
}
}/<code>

那麼Lettuce收到Redis的回覆消息之後是怎麼通知RedisCommand,並且把結果與RedisCommand對應上的呢。Netty在收到Redis服務端返回的消息之後就會回調CommandHandler的channelRead方法

<code>public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

ByteBuf input = (ByteBuf) msg;

...

try {
...
//重點在這裡
decode(ctx, buffer);
} finally {
input.release();
}
}

protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
//如果stack為空,則直接返回,這個時候一般意味著返回的結果找到對應的RedisCommand了
if (pristine && stack.isEmpty() && buffer.isReadable()) {

...

return;
}

while (canDecode(buffer)) {
//重點來了。從stack的頭上取第一個RedisCommand
RedisCommand, ?, ?> command = stack.peek();
if (debugEnabled) {
logger.debug("{} Stack contains: {} commands", logPrefix(), stack.size());
}

pristine = false;

try {
//直接把返回的結果buffer給了stack頭上的第一個RedisCommand。
//decode操作實際上拿到RedisCommand的commandoutput對象對Redis的返回結果進行反序列化的。
if (!decode(ctx, buffer, command)) {
return;
}
} catch (Exception e) {

ctx.close();
throw e;
}

if (isProtectedMode(command)) {
onProtectedMode(command.getOutput().getError());
} else {

if (canComplete(command)) {
stack.poll();

try {
complete(command);
} catch (Exception e) {
logger.warn("{} Unexpected exception during request: {}", logPrefix, e.toString(), e);
}
}
}

afterDecode(ctx, command);
}

if (buffer.refCnt() != 0) {
buffer.discardReadBytes();
}

}/<code>

從上面的代碼可以看出來,當Lettuce收到Redis的回覆消息時就從stack的頭上取第一個RedisCommand,這個RedisCommand就是與該Redis返回結果對應的RedisCommand。為什麼這樣就能對應上呢,是因為Lettuce與Redis之間只有一條tcp連接,在Lettuce端放入stack時是有序的,tcp協議本身是有序的,redis是單線程處理請求的,所以Redis返回的消息也是有序的。這樣就能保證Redis中返回的消息一定對應著stack中的第一個RedisCommand。當然如果連接斷開又重連了,這個肯定就對應不上了,Lettuc對斷線重連也做了特殊處理,防止對應不上。

Command.encode

<code>public void encode(ByteBuf buf) {

buf.writeByte('*');
//寫入參數的數量
CommandArgs.IntegerArgument.writeInteger(buf, 1 + (args != null ? args.count() : 0));
//換行
buf.writeBytes(CommandArgs.CRLF);
//寫入命令的類型,即get
CommandArgs.BytesArgument.writeBytes(buf, type.getBytes());

if (args != null) {
//調用Args的編碼,這裡面就會使用我們之前配置的codec序列化,當前使用的是String.UTF8
args.encode(buf);
}
}/<code>


分享到:


相關文章: