圍觀:基於事件機制的內部解耦之心路歷程

每篇文章都有屬於它自己的故事,沒有故事的文章是沒有靈魂的文章。而我就是這個靈魂擺渡人。

主人公張某某,這邊不方便透露姓名,就叫小張吧。小張在一家小型的互聯網創業團隊中就職。

職位是Java後端開發,所以整體和業務代碼打交道在所難免。

之前有個搜索相關的需求,而且數量量也算比較大,就採用了ElasticSearch來做搜索。第一版由於時間比較趕,做的比較粗糙。越到後面發現代碼越難寫下去了,主要是在更新索引數據的場景沒處理好,才有了今天的故事。

基礎入門

Spring Event

Spring的事件就是觀察者設計模式,一個任務結束後需要通知任務進行下一步的操作,就可以使用事件來驅動。

在Spring中使用事件機制的步驟如下:

  • 自定義事件對象,繼承 ApplicationEvent
  • 自定義事件監聽器,實現 ApplicationListener 或者通過 @EventListener 註解到方法上實現監聽
  • 自定義發佈者,通過 applicationContext.publishEvent()發佈事件

Spring Event在很多開源框架中都有使用案例,比如Spring Cloud中的Eureka裡面就有使用

event包

圍觀:基於事件機制的內部解耦之心路歷程

定義Event

圍觀:基於事件機制的內部解耦之心路歷程

發佈Event

圍觀:基於事件機制的內部解耦之心路歷程

Guava EventBus

EventBus是Guava的事件處理機制,在使用層面和Spring Event差不多。這裡不做過多講解,今天主要講Spring Event。

業務背景

所有的數據都會有一個定時任務去同步數據到ElasticSearch中,業務中直接從ElasticSearch查詢數據返回給調用方。

之所以把所有數據都存入ElasticSearch是為了方便,如果只存儲搜索的字段,那麼搜索出來後就還需要去數據庫查詢其他信息進行組裝。

就是由於所有數據都會存儲ElasticSearch中,所以當這些數據發生變更的時候我們需要去刷新ElasticSearch中的數據,這個就是我們今天文章的核心背景。

假設我們ElasticSearch中的數據是文章信息,也就是我們經常看的技術文章,這個文章中存儲了訪問量,點贊量,評論量等信息。

當這些動作發生的時候,都需要去更新ElasticSearch的數據才行,我們默認的操作都是更新數據庫中的數據,ElasticSearch是由定時任務去同步的,同步會有周期,做不到毫秒別更新。

實現方案-倔強青銅

倔強青銅就是在每個會涉及到數據變更的地方,去手動調用代碼進行數據的刷新操作,弊端在於每個地方都要去調用,這還是簡單的場景,有複雜的業務場景,一個業務操作可能會涉及到很多數據的刷新,也就是需要調用很多次,模擬代碼如下:

<code>// 瀏覽
public void visit() {
    articleIndexService.reIndex(articleId);
    XXXIndexService.reIndex(articleId);
    ........
}
    
// 評論
public void comment() {
    articleIndexService.reIndex(articleId);
}
/<code>

實現方案-秩序白銀

倔強青銅的弊端在於不解耦,而且是同步調用,如果在事務中會加長事務的時間。所以我們需要一個異步的方案來執行重建索引的邏輯。

經過大家激烈的討論,而項目也是以Spring Boot為主,所以選擇了Spring Event來作為異步方案。

定義一個重建文章索引的Event,代碼如下:

<code>public class ArticleReIndexEvent extends ApplicationEvent {
    private String id;
    
    public ArticleReIndexEvent(Object source, String id) {
        super(source);
        this.id = id;
    }
    
    public String getId() {
        return id;
    }
    
}
/<code>

然後寫一個EventListener來監聽事件,進行業務邏輯處理,代碼如下:

<code>@Component
public class MyEventListener {
    
    @EventListener
    public void onEvent(ArticleReIndexEvent event) {
        System.out.println(event.getId());
    }
}
/<code>

使用的地方只需要發佈一個Event就可以,這個動作默認是同步的,如果我們想讓這個操作不會阻塞,變成異步只需要在@EventListener上面再增加一個@Async註解。

<code>// 瀏覽
public void visit() {
    applicationContext.publishEvent(new ArticleReIndexEvent(this, articleId));
    applicationContext.publishEvent(new XXXReIndexEvent(this, articleId));
}
    
// 評論
public void comment() {
    applicationContext.publishEvent(new ArticleReIndexEvent(this, articleId));
}
/<code>

實現方案-榮耀黃金

秩序白銀的方案在代碼層面確實解耦了,但是使用者發佈事件需要關注的點太多了,也就是我改了某個表的數據,我得知道有哪些索引會用到這張表的數據,我得把這些相關的事件都發送出去,這樣數據才會異步進行刷新。

當業務複雜後或者有新來的同事,不是那麼的瞭解業務,壓根不可能知道說我改了這個數據對其他那些索引有影響,所以這個方案還是有優化的空間。

榮耀黃金的方案是將所有的事件都統一為一個,然後在事件里加屬性來區分修改的數據是哪裡的。每個數據需要同步變更的索引都有自己的監聽器,去監聽這個統一的事件,這樣對於發佈者來說我只需要發送一個事件告訴你,我這邊改數據了,你要不要消費,要不要更新索引我並不關心。

定義一個數據表發生修改的事件,代碼如下:

<code>public class TableUpdateEvent extends ApplicationEvent {
    private String table;
    private String id;
    
    public TableUpdateEvent(Object source, String id, String table) {
        super(source);
        this.id = id;
        this.table = table;
    }
    
    public String getId() {
        return id;
    }
    
    public String getTable() {
        return table;
    }
    
}
/<code>

然後每個索引都需要消費這個事件,只需要關注這個索引中數據的來源表有沒有變動,如果有變動則去刷新索引。

比如索引A的數據都是article表中過來的,所以只要是article表中的數據發生了變更,索引A都要做對應的處理,所以索引A的監聽器只需要關注article表有沒有修改即可。

<code>@Component
public class MyEventListener {
    
    private List consumerTables = Arrays.asList("article");
    
    @Async
    @EventListener
    public void onEvent(TableUpdateEvent event) {
        System.out.println(event.getId() + "\t" + event.getTable());
        if (consumerTables.contains(event.getTable())) {
            System.out.println("消費自己關注的表數據變動,然後處理。。。");
        }
    }
    
}
/<code>

比如索引B的數據是從comment和comment_reply兩個表中過來的,所以只要是comment和comment_reply兩個表的數據發生了變更,索引B都需要做對應的處理,所以索引B的監聽器只需要關注comment和comment_reply兩個表有沒有修改即可。

<code>@Component
public class MyEventListener2 {
    
    private List consumerTables = Arrays.asList("comment", "comment_replay");
    
    @Async
    @EventListener
    public void onEvent(TableUpdateEvent event) {
        System.out.println(event.getId() + "\t" + event.getTable());
        if (consumerTables.contains(event.getTable())) {
            System.out.println("消費自己關注的表數據變動,然後處理。。。");
        }
    }
    
}
/<code>

實現方案-尊貴鉑金

榮耀黃金的方案已經很完美了,代碼解耦不說,使用者關注點也少了,不容易出錯。

但還有一個致命的問題就是所有涉及到業務修改的方法中,得手動往外發送一個事件,從代碼解耦的場景來說還殘留了一點瑕疵,至少還是有那麼一行代碼來發送事件。

尊貴鉑金的方案將完全解耦,不需要寫代碼的時候手動去發送事件。我們將通過訂閱MySql的binlog來統一發送事件。

binlog是MySQL數據庫的二進制日誌,用於記錄用戶對數據庫操作的SQL語句信息,MySQL的主從同步也是基於binlog來實現的,對於我們這種數據異構的場景再合適不過了。

binlog訂閱的方式有很多種,開源的框架一般都是用canal來實現。

canal:https://github.com/alibaba/canal

如果你買的雲數據庫,像阿里雲就有dts數據訂閱服務,跟canal一樣。

之後的方案圖如下:

圍觀:基於事件機制的內部解耦之心路歷程

實現方案-永恆鑽石

沒有什麼方案和架構是永恆的,跟著業務的變更而演進,符合當前業務的需求才是王道。越後面考慮的東西越多,畢竟最後是要升級到最強王者的,哈哈。

關於作者:尹吉歡,簡單的技術愛好者,《Spring Cloud微服務-全棧技術與案例解析》, 《Spring Cloud微服務 入門 實戰與進階》作者, 公眾號

猿天地 發起人。


圍觀:基於事件機制的內部解耦之心路歷程


分享到:


相關文章: