序
本文主要研究一下canal的ClientIdentity
ClientIdentity
canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/ClientIdentity.java
<code>public class ClientIdentity implements Serializable { private static final long serialVersionUID = -8262100681930834834L; private String destination; private short clientId; private String filter; public ClientIdentity(){ } public ClientIdentity(String destination, short clientId){ this.clientId = clientId; this.destination = destination; } public ClientIdentity(String destination, short clientId, String filter){ this.clientId = clientId; this.destination = destination; this.filter = filter; } public Boolean hasFilter() { if (filter == null) { return false; } return StringUtils.isNotBlank(filter); } //...... }/<code>
- ClientIdentity定義了destination、clientId、filter屬性
CanalServerWithEmbedded
canal-1.1.4/server/src/main/java/com/alibaba/otter/canal/server/embedded/CanalServerWithEmbedded.java
<code>public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService { private static final Logger logger = LoggerFactory.getLogger(CanalServerWithEmbedded.class); private Map canalInstances; // private Map lastRollbackPostions; private CanalInstanceGenerator canalInstanceGenerator; private int metricsPort; private CanalMetricsService metrics = NopCanalMetricsService.NOP; private String user; private String passwd; //...... @Override public void subscribe(ClientIdentity clientIdentity) throws CanalServerException { checkStart(clientIdentity.getDestination()); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); if (!canalInstance.getMetaManager().isStart()) { canalInstance.getMetaManager().start(); } canalInstance.getMetaManager().subscribe(clientIdentity); // 執行一下meta訂閱 Position position = canalInstance.getMetaManager().getCursor(clientIdentity); if (position == null) { position = canalInstance.getEventStore().getFirstPosition();// 獲取一下store中的第一條 if (position != null) { canalInstance.getMetaManager().updateCursor(clientIdentity, position); // 更新一下cursor } logger.info("subscribe successfully, {} with first position:{} ", clientIdentity, position); } else { logger.info("subscribe successfully, use last cursor position:{} ", clientIdentity, position); } // 通知下訂閱關係變化 canalInstance.subscribeChange(clientIdentity); } /** * 取消訂閱 */ @Override public void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException { CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); canalInstance.getMetaManager().unsubscribe(clientIdentity); // 執行一下meta訂閱 logger.info("unsubscribe successfully, {}", clientIdentity); } /** * 查詢所有的訂閱信息 */ public List listAllSubscribe(String destination) throws CanalServerException { CanalInstance canalInstance = canalInstances.get(destination); return canalInstance.getMetaManager().listAllSubscribeInfo(destination); } //...... }/<code>
- CanalServerWithEmbedded提供了subscribe、unsubscribe、listAllSubscribe方法;其subscribe方法接收clientIdentity參數,然後使用canalInstance.getMetaManager().getCursor(clientIdentity)獲取position,若position為null則使用canalInstance.getEventStore().getFirstPosition()獲取,然後通過canalInstance.getMetaManager().updateCursor(clientIdentity, position)更新cursor,最後執行canalInstance.subscribeChange(clientIdentity);unsubscribe方法則執行canalInstance.getMetaManager().unsubscribe(clientIdentity);listAllSubscribe方法則執行canalInstance.getMetaManager().listAllSubscribeInfo(destination)
小結
ClientIdentity定義了destination、clientId、filter屬性;CanalServerWithEmbedded提供了subscribe、unsubscribe、listAllSubscribe方法;其中subscribe、unsubscribe方法接收clientIdentity參數,而listAllSubscribe方法返回ClientIdentity列表
doc
- ClientIdentity
關鍵字: public destination position