lettuce-core版本: 5.1.7.RELEASE
在上一篇介紹了Lettuce是如何基於Netty與Redis建立連接的,其中提到了一個很重要的CommandHandler類,這一期主要解決兩個問題
1.CommandHandler是如何在發送Command時發揮作用的
2.Lettuce是如何實現多線程共享同一個物理連接的。
回顧一下我們的之前示例代碼,這一篇主要是跟進去sync.get方法看看Lettuc是如何發送get命令到Redis以及是如何讀取Redis的命令的。
<code>/**
* @author xiaobing
* @date 2019/12/20
*/
public class LettuceSimpleUse {
private void testLettuce() throws ExecutionException, InterruptedException {
//構建RedisClient對象,RedisClient包含了Redis的基本配置信息,可以基於RedisClient創建RedisConnection
RedisClient client = RedisClient.create("redis://localhost");
//創建一個線程安全的StatefulRedisConnection,可以多線程併發對該connection操作,底層只有一個物理連接.
StatefulRedisConnection<string> connection = client.connect();
//獲取SyncCommand。Lettuce支持SyncCommand、AsyncCommands、ActiveCommand三種command
RedisStringCommands<string> sync = connection.sync();
String value = sync.get("key");
System.out.println("get redis value with lettuce sync command, value is :" + value);
//獲取SyncCommand。Lettuce支持SyncCommand、AsyncCommands、ActiveCommand三種command
RedisAsyncCommands<string> async = connection.async();
RedisFuture<string> getFuture = async.get("key");
value = getFuture.get();
System.out.println("get redis value with lettuce async command, value is :" + value);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
new LettuceSimpleUse().testLettuce();
}
}/<string>/<string>/<string>/<string>/<code>
在看sync.get方法之前先看一下RedisStringCommands是如何生成生成的,從下面的代碼可以看到RedisStringCommands其實是對RedisAsyncCommands<string>方法調用的同步阻塞版本。/<string>
<code> //創建一個sync版本的RedisCommand
protected RedisCommandsnewRedisSyncCommandsImpl() { /<code>
//async()方法返回的就是該Connection對應的RedisAsyncCommand
return syncHandler(async(), RedisCommands.class, RedisClusterCommands.class);
}
//返回一個動態代理類,代理類的實現在FutureSyncInvocationHandler類中
protectedT syncHandler(Object asyncApi, Class>... interfaces) {
FutureSyncInvocationHandler h = new FutureSyncInvocationHandler((StatefulConnection, ?>) this, asyncApi, interfaces);
//基於FutureSyncInvocationHandler生成動態代理類
return (T) Proxy.newProxyInstance(AbstractRedisClient.class.getClassLoader(), interfaces, h);
}
//異步轉同步的關鍵
class FutureSyncInvocationHandler extends AbstractInvocationHandler {
...
@Override
@SuppressWarnings("unchecked")
protected Object handleInvocation(Object proxy, Method method, Object[] args) throws Throwable {
try {
Method targetMethod = this.translator.get(method);
Object result = targetMethod.invoke(asyncApi, args);
// RedisAsyncCommand返回的大部分對象類型都是RedisFuture類型的
if (result instanceof RedisFuture>) {
RedisFuture> command = (RedisFuture>) result;
if (isNonTxControlMethod(method.getName()) && isTransactionActive(connection)) {
return null;
}
//獲取配置的超時時間
long timeout = getTimeoutNs(command);
//阻塞的等待RedisFuture返回結果
return LettuceFutures.awaitOrCancel(command, timeout, TimeUnit.NANOSECONDS);
}
return result;
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
}
}
...
所以sync.get操作最終調用的還是async.get操作,接下來看async.get是怎麼做的。還是先看一張時序圖,心裡有一個概念。
AbstractRedisAsyncCommands
<code> @Override
public RedisFutureget(K key) { /<code>
return dispatch(commandBuilder.get(key));
}
commandBuilder.get(key)
這一步驟主要是根據用戶的輸入參數key、命令類型get、序列化方式來生成一個command對象。而這個command對象會按照Redis的協議格式把命令序列化成字符串。
<code> Commandget(K key) { /<code>
notNullKey(key);
//Valueoutput基於序列化
return createCommand(GET, new ValueOutput<>(codec), key);
}
protectedCommand createCommand(CommandType type, CommandOutput output, K key) {
CommandArgsargs = new CommandArgs (codec).addKey(key);
return createCommand(type, output, args);
}
protectedCommand createCommand(CommandType type, CommandOutput output, CommandArgs args) {
return new Command(type, output, args);
}
AbstractRedisAsyncCommands.dispatch
<code>publicAsyncCommand /<code>dispatch(RedisCommand cmd) {
//用AsyncCommand對RedisCommand做一個包裝處理,這個AsyncCommand實現了RedisFuture接口,最後返回給調用方的就是這個對象。當Lettuce收到Redis的返回結果時會調用AsyncCommand的complete方法,異步的方式返回數據。
AsyncCommandasyncCommand = new AsyncCommand<>(cmd);
//調用connection的dispatch方法把Command發送給Redis,這個connection就是上一篇中說的那個StatefulRedisConnectionImpl
RedisCommanddispatched = connection.dispatch(asyncCommand);
if (dispatched instanceof AsyncCommand) {
return (AsyncCommand) dispatched;
}
return asyncCommand;
}
StatefulRedisConnectionImpl.dispatch
<code> @Override
publicRedisCommand /<code>dispatch(RedisCommand command) {
//對command做預處理,當前主要是根據不同的命令配置一些異步處理,如:auth命令之後成功之後把password寫入到相應變量中,select db操作成功之後把db值寫入到相應變量中等等。
RedisCommandtoSend = preProcessCommand(command);
try {
//真正的dispatch是在父類實現的
return super.dispatch(toSend);
} finally {
if (command.getType().name().equals(MULTI.name())) {
multi = (multi == null ? new MultiOutput<>(codec) : multi);
}
}
}
//父類RedisChannelHandler的dispatch方法
protectedRedisCommand dispatch(RedisCommand cmd) {
if (debugEnabled) {
logger.debug("dispatching command {}", cmd);
}
//tracingEnable的代碼先不用看
if (tracingEnabled) {
RedisCommandcommandToSend = cmd;
TraceContextProvider provider = CommandWrapper.unwrap(cmd, TraceContextProvider.class);
if (provider == null) {
commandToSend = new TracedCommand<>(cmd, clientResources.tracing()
.initialTraceContextProvider().getTraceContext());
}
return channelWriter.write(commandToSend);
}
//其實就是直接調用channelWriter.write方法,而這個channelWriter就是上一節說的那個屏蔽底層channel實現的DefaultEndpoint類
return channelWriter.write(cmd);
}
DefaultEndpoint.write
<code> @Override
publicRedisCommand /<code>write(RedisCommand command) {
LettuceAssert.notNull(command, "Command must not be null");
try {
//sharedLock是Lettuce自己實現的一個共享排他鎖。incrementWriters相當於獲取一個共享鎖,當channel狀態發生變化的時候,如斷開連接時會獲取排他鎖執行一些清理操作。
sharedLock.incrementWriters();
// validateWrite是驗證當前操作是否可以執行,Lettuce內部維護了一個保存已經發送但是還沒有收到Redis消息的Command的stack,可以配置這個stack的長度,防止Redis不可用時stack太長導致內存溢出。如果這個stack已經滿了,validateWrite會拋出異常
validateWrite(1);
//autoFlushCommands默認為true,即每執行一個Redis命令就執行Flush操作發送給Redis,如果設置為false,則需要手動flush。由於flush操作相對較重,在某些場景下需要繼續提升Lettuce的吞吐量可以考慮設置為false。
if (autoFlushCommands) {
if (isConnected()) {
//寫入channel並執行flush操作,核心在這個方法的實現中
writeToChannelAndFlush(command);
} else {
// 如果當前channel連接已經斷開就先放入Buffer中,直接返回AsyncCommand,重連之後會把Buffer中的Command再次嘗試通過channel發送到Redis中
writeToDisconnectedBuffer(command);
}
} else {
writeToBuffer(command);
}
} finally {
//釋放共享鎖
sharedLock.decrementWriters();
if (debugEnabled) {
logger.debug("{} write() done", logPrefix());
}
}
return command;
}
DefaultEndpoint.writeToChannelAndFlush
<code> private void writeToChannelAndFlush(RedisCommand, ?, ?> command) {
//queueSize字段做cas +1操作
QUEUE_SIZE.incrementAndGet(this);
ChannelFuture channelFuture = channelWriteAndFlush(command);
//Lettuce的可靠性:保證最多一次。由於Lettuce的保證是基於內存的,所以並不可靠(系統crash時內存數據會丟失)
if (reliability == Reliability.AT_MOST_ONCE) {
// cancel on exceptions and remove from queue, because there is no housekeeping
channelFuture.addListener(AtMostOnceWriteListener.newInstance(this, command));
}
//Lettuce的可靠性:保證最少一次。由於Lettuce的保證是基於內存的,所以並不可靠(系統crash時內存數據會丟失)
if (reliability == Reliability.AT_LEAST_ONCE) {
// commands are ok to stay within the queue, reconnect will retrigger them
channelFuture.addListener(RetryListener.newInstance(this, command));
}
}
//可以看到最終還是調用了channle的writeAndFlush操作,這個Channel就是netty中的NioSocketChannel
private ChannelFuture channelWriteAndFlush(RedisCommand, ?, ?> command) {
if (debugEnabled) {
logger.debug("{} write() writeAndFlush command {}", logPrefix(), command);
}
return channel.writeAndFlush(command);
}/<code>
<code>public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (debugEnabled) {
logger.debug("{} write(ctx, {}, promise)", logPrefix(), msg);
}
if (msg instanceof RedisCommand) {
//如果是單個的RedisCommand就直接調用writeSingleCommand返回
writeSingleCommand(ctx, (RedisCommand, ?, ?>) msg, promise);
return;
}
if (msg instanceof List) {
List<rediscommand>> batch = (List<rediscommand>>) msg;
if (batch.size() == 1) {
writeSingleCommand(ctx, batch.get(0), promise);
return;
}
//批量寫操作,暫不關心
writeBatch(ctx, batch, promise);
return;
}
if (msg instanceof Collection) {
writeBatch(ctx, (Collection<rediscommand>>) msg, promise);
}
}/<rediscommand>/<rediscommand>/<rediscommand>/<code>
writeSingleCommand 核心在這裡
Lettuce使用單一連接支持多線程併發向Redis發送Command,那Lettuce是怎麼把請求Command與Redis返回的結果對應起來的呢,秘密就在這裡。
<code>private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand, ?, ?> command, ChannelPromise promise)
{
if (!isWriteable(command)) {
promise.trySuccess();
return;
}
//把當前command放入一個特定的棧中,這一步是關鍵
addToStack(command, promise);
// Trace操作,暫不關心
if (tracingEnabled && command instanceof CompleteableCommand) {
...
}
//調用ChannelHandlerContext把命令真正發送給Redis,當然在發送給Redis之前會由CommandEncoder類對RedisCommand進行編碼後寫入ByteBuf
ctx.write(command, promise);
private void addToStack(RedisCommand, ?, ?> command, ChannelPromise promise) {
try {
//再次驗證隊列是否滿了,如果滿了就拋出異常
validateWrite(1);
//command.getOutput() == null意味這個這個Command不需要Redis返回影響。一般不會走這個分支
if (command.getOutput() == null) {
// fire&forget commands are excluded from metrics
complete(command);
}
//這個應該是用來做metrics統計用的,暫時先不考慮
RedisCommand, ?, ?> redisCommand = potentiallyWrapLatencyCommand(command);
//無論promise是什麼類型的,最終都會把command放入到stack中,stack是一個基於數組實現的雙向隊列
if (promise.isVoid()) {
//如果promise不是Future類型的就直接把當前command放入到stack
stack.add(redisCommand);
} else {
//如果promise是Future類型的就等future完成後把當前command放入到stack中,當前場景下就是走的這個分支
promise.addListener(AddToStack.newInstance(stack, redisCommand));
}
} catch (Exception e) {
command.completeExceptionally(e);
throw e;
}
}
}/<code>
那麼Lettuce收到Redis的回覆消息之後是怎麼通知RedisCommand,並且把結果與RedisCommand對應上的呢。Netty在收到Redis服務端返回的消息之後就會回調CommandHandler的channelRead方法
<code>public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf input = (ByteBuf) msg;
...
try {
...
//重點在這裡
decode(ctx, buffer);
} finally {
input.release();
}
}
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
//如果stack為空,則直接返回,這個時候一般意味著返回的結果找到對應的RedisCommand了
if (pristine && stack.isEmpty() && buffer.isReadable()) {
...
return;
}
while (canDecode(buffer)) {
//重點來了。從stack的頭上取第一個RedisCommand
RedisCommand, ?, ?> command = stack.peek();
if (debugEnabled) {
logger.debug("{} Stack contains: {} commands", logPrefix(), stack.size());
}
pristine = false;
try {
//直接把返回的結果buffer給了stack頭上的第一個RedisCommand。
//decode操作實際上拿到RedisCommand的commandoutput對象對Redis的返回結果進行反序列化的。
if (!decode(ctx, buffer, command)) {
return;
}
} catch (Exception e) {
ctx.close();
throw e;
}
if (isProtectedMode(command)) {
onProtectedMode(command.getOutput().getError());
} else {
if (canComplete(command)) {
stack.poll();
try {
complete(command);
} catch (Exception e) {
logger.warn("{} Unexpected exception during request: {}", logPrefix, e.toString(), e);
}
}
}
afterDecode(ctx, command);
}
if (buffer.refCnt() != 0) {
buffer.discardReadBytes();
}
}/<code>
從上面的代碼可以看出來,當Lettuce收到Redis的回覆消息時就從stack的頭上取第一個RedisCommand,這個RedisCommand就是與該Redis返回結果對應的RedisCommand。為什麼這樣就能對應上呢,是因為Lettuce與Redis之間只有一條tcp連接,在Lettuce端放入stack時是有序的,tcp協議本身是有序的,redis是單線程處理請求的,所以Redis返回的消息也是有序的。這樣就能保證Redis中返回的消息一定對應著stack中的第一個RedisCommand。當然如果連接斷開又重連了,這個肯定就對應不上了,Lettuc對斷線重連也做了特殊處理,防止對應不上。
Command.encode
<code>public void encode(ByteBuf buf) {
buf.writeByte('*');
//寫入參數的數量
CommandArgs.IntegerArgument.writeInteger(buf, 1 + (args != null ? args.count() : 0));
//換行
buf.writeBytes(CommandArgs.CRLF);
//寫入命令的類型,即get
CommandArgs.BytesArgument.writeBytes(buf, type.getBytes());
if (args != null) {
//調用Args的編碼,這裡面就會使用我們之前配置的codec序列化,當前使用的是String.UTF8
args.encode(buf);
}
}/<code>
閱讀更多 楊同學2019 的文章