12.25 Redis客戶端Lettuce源碼「二」Lettuce是如何基於Netty建立連接的

lettuce-core版本: 5.1.7.RELEASE

先看一下Lettuce的基本使用方法,使用Lettuce大概分為如下幾步:

  1. 基於Redis連接信息創建RedisClient
  2. 基於RedisClient創建StatefulRedisConnection
  3. 從Connection中獲取Command,基於Command執行Redis命令操作。
<code>/**
* @author xiaobing
* @date 2019/12/20
*/
public class LettuceSimpleUse {
private void testLettuce() throws ExecutionException, InterruptedException {
//構建RedisClient對象,RedisClient包含了Redis的基本配置信息,可以基於RedisClient創建RedisConnection
RedisClient client = RedisClient.create("redis://localhost");

//創建一個線程安全的StatefulRedisConnection,可以多線程併發對該connection操作,底層只有一個物理連接.
StatefulRedisConnection<string> connection = client.connect();

//獲取SyncCommand。Lettuce支持SyncCommand、AsyncCommands、ActiveCommand三種command
RedisStringCommands<string> sync = connection.sync();
String value = sync.get("key");
System.out.println("get redis value with lettuce sync command, value is :" + value);

//獲取SyncCommand。Lettuce支持SyncCommand、AsyncCommands、ActiveCommand三種command
RedisAsyncCommands<string> async = connection.async();
RedisFuture<string> getFuture = async.get("key");
value = getFuture.get();
System.out.println("get redis value with lettuce async command, value is :" + value);
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
new LettuceSimpleUse().testLettuce();
}
}/<string>/<string>/<string>/<string>/<code>

先看一張建立連接的時序圖,有一個直觀的印象。

Redis客戶端Lettuce源碼「二」Lettuce是如何基於Netty建立連接的

lettuce源碼--建立redis連接


RedisClient

一個可擴展、線程安全的RedisClient,支持sync、async、reactor執行模式。
RedisClient.create只是傳入了一些配置信息,此時並沒有創建連接。

<code>// 使用默認的ClientResource
public static RedisClient create(String uri) {
LettuceAssert.notEmpty(uri, "URI must not be empty");
return new RedisClient(null, RedisURI.create(uri));
}
// ClientResources中包含了一些配置和線程池信息,是一個比較重的資源,多個RedisClient可以共享同一個ClientResource
protected RedisClient(ClientResources clientResources, RedisURI redisURI) {
super(clientResources);
assertNotNull(redisURI);
this.redisURI = redisURI;
setDefaultTimeout(redisURI.getTimeout());
}/<code>

RedisClient.connnect

可以看到connect方法有一些重載方法,默認的是用UTF8 String對key和value序列化,通過傳入RedisCodec支持自定義的對Key和Value的序列化方式。

<code>    public StatefulRedisConnection<string> connect() {
return connect(newStringStringCodec());
}

public StatefulRedisConnection connect(RedisCodec codec) {


checkForRedisURI();
//connectStandaloneAsync是異步創建connection,返回的是Future對象,通過getConnection轉為同步操作
return getConnection(connectStandaloneAsync(codec, this.redisURI, timeout));
}
//異步轉同步操作
protected T getConnection(ConnectionFuture connectionFuture) {
try {
return connectionFuture.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), e);
} catch (Exception e) {

if (e instanceof ExecutionException) {
throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), e.getCause());
}
throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), e);
}
}
/<string>/<code>

RedisClient.connectStandaloneAsync

<code>    private  ConnectionFuture<statefulredisconnection>> connectStandaloneAsync(RedisCodec codec,
RedisURI redisURI, Duration timeout) {

assertNotNull(codec);
checkValidRedisURI(redisURI);

logger.debug("Trying to get a Redis connection for: " + redisURI);
//創建一個有狀態的EndPoint用於抽象底層channel的實現,DefaultEndpoint內部封裝斷線重連、重連後成功後回放連接失敗期間的command。同時封裝了AT_MOST_ONCE、AT_LEAST_ONCE的可靠性實現(該邏輯是基於內存的,所以並不可靠)。

DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions, clientResources);
RedisChannelWriter writer = endpoint;

//進一步封裝,添加支持過期時間的執行命令
if (CommandExpiryWriter.isSupported(clientOptions)) {
writer = new CommandExpiryWriter(writer, clientOptions, clientResources);
}
//創建StatefulRedisConnectionImpl對象,StatefulRedisConnectionImpl對外提供RedisCommand對象,內部基於writer發送命令。此時並沒有真正的創建物理連接,該類本身是無狀態、線程安全的。
StatefulRedisConnectionImpl connection = newStatefulRedisConnection(writer, codec, timeout);
//異步創建Redis物理連接,返回future對象。後面可以看到future中返回的對象其實還是上面的connection
ConnectionFuture<statefulredisconnection>> future = connectStatefulAsync(connection, codec, endpoint, redisURI,
() -> new CommandHandler(clientOptions, clientResources, endpoint));

future.whenComplete((channelHandler, throwable) -> {

if (throwable != null) {
connection.close();
}
});

return future;
}
//StatefulRedisConnectionImpl的構造函數,此時已經創建了sync、async、reactive三種類型的RedisCommand。基於RedisCodec對key和value序列化,通過write把命令真正的發出去。
public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec codec, Duration timeout) {

super(writer, timeout);

this.codec = codec;
this.async = newRedisAsyncCommandsImpl();
this.sync = newRedisSyncCommandsImpl();
this.reactive = newRedisReactiveCommandsImpl();
}
/<statefulredisconnection>
/<statefulredisconnection>
/<code>

RedisClient.connectStatefulAsync

<code>    private  ConnectionFuture connectStatefulAsync(StatefulRedisConnectionImpl connection,
RedisCodec codec, Endpoint endpoint,
RedisURI redisURI, Supplier<commandhandler> commandHandlerSupplier) {
//構建ConnectionBuidler,通過ConnectionBuilder來創建connection
ConnectionBuilder connectionBuilder;
if (redisURI.isSsl()) {
SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();
sslConnectionBuilder.ssl(redisURI);
connectionBuilder = sslConnectionBuilder;
} else {
connectionBuilder = ConnectionBuilder.connectionBuilder();
}
//填充StatefulRedisConnectionImpl
connectionBuilder.connection(connection);
//控制RedisClient行為的一些配置參數
connectionBuilder.clientOptions(clientOptions);
//ClientResource包含了一些EventLoopGroup信息
connectionBuilder.clientResources(clientResources);
//配置commandHandlerSupplier,這個commandHandler很重要,是實現StatefulRedisConnectionImpl線程安全的關鍵,後面會詳細講。
connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);
//connectionBuilder填充Bootstrap等更多的信息
//getSocketAddressSupplier是根據redisURI獲取真正的Redis連接信息,如:sentinel模式下,需要從sentinel獲取到真實的redis連接地址
connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);
//配置netty的channeltype
channelType(connectionBuilder, redisURI);

if (clientOptions.isPingBeforeActivateConnection()) {
if (hasPassword(redisURI)) {
connectionBuilder.enableAuthPingBeforeConnect();
} else {
connectionBuilder.enablePingBeforeConnect();
}

}
//初始化channel,在這一步才真正的異步的去創建物理連接
ConnectionFuture<redischannelhandler>> future = initializeChannelAsync(connectionBuilder);
ConnectionFuture> sync = future;

if (!clientOptions.isPingBeforeActivateConnection() && hasPassword(redisURI)) {
//連接成功之後發送auth命令,做密碼的驗證
sync = sync.thenCompose(channelHandler -> {

CommandArgs args = new CommandArgs<>(codec).add(redisURI.getPassword());
return connection.async().dispatch(CommandType.AUTH, new StatusOutput<>(codec), args);
});
}
//設置clientName,從Redis服務端執行client list可以看到clientname
if (LettuceStrings.isNotEmpty(redisURI.getClientName())) {
sync = sync.thenApply(channelHandler -> {
connection.setClientName(redisURI.getClientName());
return channelHandler;
});
}
//選擇db
if (redisURI.getDatabase() != 0) {

sync = sync.thenCompose(channelHandler -> {

CommandArgs args = new CommandArgs<>(codec).add(redisURI.getDatabase());
return connection.async().dispatch(CommandType.SELECT, new StatusOutput<>(codec), args);
});
}
//返回connection對象
return sync.thenApply(channelHandler -> (S) connection);
}
/<redischannelhandler>/<commandhandler>
/<code>

RedisClient.connectionBuilder

<code>//為ConnectionBuidler填充更多的信息,如Bootstrap、channelGroup 

protected void connectionBuilder(Mono<socketaddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,
RedisURI redisURI) {
//創建Netty客戶端的Bootstrap對象
Bootstrap redisBootstrap = new Bootstrap();
//Bootstrap的一些配置參數,具體可以參考Netty的相關書籍(Netty權威指南)
redisBootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
redisBootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
redisBootstrap.option(ChannelOption.ALLOCATOR, BUF_ALLOCATOR);

SocketOptions socketOptions = getOptions().getSocketOptions();

redisBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
Math.toIntExact(socketOptions.getConnectTimeout().toMillis()));

if (LettuceStrings.isEmpty(redisURI.getSocket())) {
//keepAlive參數,默認為true
redisBootstrap.option(ChannelOption.SO_KEEPALIVE, socketOptions.isKeepAlive());
//tcp_nodelay參數,默認為true
redisBootstrap.option(ChannelOption.TCP_NODELAY, socketOptions.isTcpNoDelay());
}

connectionBuilder.timeout(redisURI.getTimeout());
connectionBuilder.password(redisURI.getPassword());
//把構建出來的bootStrap對象賦值給connectionBuidler,由connectionBuilder創建連接
connectionBuilder.bootstrap(redisBootstrap);

//Netty的相關參數配置,待研究
connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents).timer(timer);
//配置socket地址提供者
connectionBuilder.socketAddressSupplier(socketAddressSupplier);
}/<socketaddress>/<code>

RedisClient.initializeChannelAsync

<code>//初始化redis連接,返回ChannelFuture對象
protected > ConnectionFuture initializeChannelAsync(
ConnectionBuilder connectionBuilder) {

Mono<socketaddress> socketAddressSupplier = connectionBuilder.socketAddress();


if (clientResources.eventExecutorGroup().isShuttingDown()) {
throw new IllegalStateException("Cannot connect, Event executor group is terminated.");
}
//創建socketAddressFuture 對象,當socketAddressSupplier異步獲取SocketAddress成功之後會把SocketAddress數據放入該對象中
CompletableFuture<socketaddress> socketAddressFuture = new CompletableFuture<>();
//創建channelReadyFuture,當連接建立成功之後會把Channel對象放入該對象中
CompletableFuture<channel> channelReadyFuture = new CompletableFuture<>();

//配置獲取SocketAddress異步操作之後的操作:
//1. 把SocketAddress對象放入socketAddressFuture中
//2. 基於SocketAddress調用initializeChannelAsync0方法真正去建立連接
socketAddressSupplier.doOnError(socketAddressFuture::completeExceptionally).doOnNext(socketAddressFuture::complete)
.subscribe(redisAddress -> {

if (channelReadyFuture.isCancelled()) {
return;
}
//異步建立真正的連接,如果建立成功會把生產的Channel對象放入channelReadyFuture中
initializeChannelAsync0(connectionBuilder, channelReadyFuture, redisAddress);
}, channelReadyFuture::completeExceptionally);
//建立連接成功之後返回的還是connectionBuilder的connection對象,即StatefulRedisConnectionImpl
return new DefaultConnectionFuture<>(socketAddressFuture, channelReadyFuture.thenApply(channel -> (T) connectionBuilder
.connection()));
}/<channel>/<socketaddress>/<socketaddress>
/<code>

RedisClient.initializeChannelAsync0

<code>//真正的去建立Redis物理連接,這裡面有很多基於Future的異步操作,如果看不太懂,建議先看看Future的相關知識,多看幾遍。
private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, CompletableFuture<channel> channelReadyFuture,
SocketAddress redisAddress) {


logger.debug("Connecting to Redis at {}", redisAddress);

Bootstrap redisBootstrap = connectionBuilder.bootstrap();
//創建PlainChannelInitializer對象,PlainChannelIntializer對象會在Channel初始化的時候添加很多Handlers(Netty的Handler概念可以參考Netty權威指南),如:CommandEncoder、CommandHandler(非常重要的Handler)、ConnectionWatchdog(實現斷線重連)
RedisChannelInitializer initializer = connectionBuilder.build();
//RedisChannelInitializer配置到Bootstrap中
redisBootstrap.handler(initializer);

//調用一些通過ClientResources自定義的回調函數
clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);
//獲取initFuture 對象,如果Channel初始化完成,可以通過該對象獲取到初始化的結果
CompletableFuture<boolean> initFuture = initializer.channelInitialized();
//真正的通過Netty異步的方式去建立物理連接,返回ChannelFuture對象
ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);
//配置異常處理
channelReadyFuture.whenComplete((c, t) -> {

if (t instanceof CancellationException) {
connectFuture.cancel(true);
initFuture.cancel(true);
}
});

connectFuture.addListener(future -> {
//異常處理
if (!future.isSuccess()) {

logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause());
connectionBuilder.endpoint().initialState();
//賦值channelReadyFuture告知出現異常了
channelReadyFuture.completeExceptionally(future.cause());
return;
}
//當Channel初始化完成之後,根據初始化的結果做判斷

initFuture.whenComplete((success, throwable) -> {
//如果異常為空,則初始化成功。
if (throwable == null) {

logger.debug("Connecting to Redis at {}: Success", redisAddress);
RedisChannelHandler, ?> connection = connectionBuilder.connection();
connection.registerCloseables(closeableResources, connection);
//把成功之後的結果賦值給channelReadyFuture對象
channelReadyFuture.complete(connectFuture.channel());
return;
}

//如果初始化Channel的過程中出現異常的處理邏輯
logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable);
connectionBuilder.endpoint().initialState();
Throwable failure;

if (throwable instanceof RedisConnectionException) {
failure = throwable;
} else if (throwable instanceof TimeoutException) {
failure = new RedisConnectionException("Could not initialize channel within "
+ connectionBuilder.getTimeout(), throwable);
} else {
failure = throwable;
}
//賦值channelReadyFuture告知出現異常了
channelReadyFuture.completeExceptionally(failure);
});
});
}/<boolean>/<channel>/<code>

至此,Redis的Connection的建立連接的主流程就結束了,具體的一些邏輯如:斷線重連是如何實現的,Redis模式下是怎麼基於Sentinel獲取Redis實際連接的等等會在後續的文章中介紹。


分享到:


相關文章: