聊聊canal的BooleanMutex

本文主要研究一下canal的BooleanMutex


聊聊canal的BooleanMutex


BooleanMutex

canal-1.1.4/common/src/main/java/com/alibaba/otter/canal/common/utils/BooleanMutex.java

<code>public class BooleanMutex {

  private Sync sync;

  public BooleanMutex(){
      sync = new Sync();
      set(false);
  }

  public BooleanMutex(Boolean mutex){
      sync = new Sync();
      set(mutex);
  }

  /**
    * 阻塞等待Boolean為true
    *
    * @throws InterruptedException
    */
  public void get() throws InterruptedException {
      sync.innerGet();
  }

  /**
    * 阻塞等待Boolean為true,允許設置超時時間
    *
    * @param timeout
    * @param unit
    * @throws InterruptedException
    * @throws TimeoutException
    */
  public void get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
      sync.innerGet(unit.toNanos(timeout));
  }

  /**
    * 重新設置對應的Boolean mutex
    *
    * @param mutex

    */
  public void set(Boolean mutex) {
      if (mutex) {
          sync.innerSetTrue();
      } else {
          sync.innerSetFalse();
      }
  }

  public boolean state() {
      return sync.innerState();
  }

  //......
}/<code>
  • BooleanMutex定義了sync屬性,其get方法執行的是sync.innerGet(),其set方法執行的是sync.innerSetTrue()或者sync.innerSetFalse(),其state方法返回的是sync.innerState()

Sync

canal-1.1.4/common/src/main/java/com/alibaba/otter/canal/common/utils/BooleanMutex.java

<code>    private final class Sync extends AbstractQueuedSynchronizer {

      private static final long serialVersionUID = 2559471934544126329L;
      /** State value representing that TRUE */
      private static final int TRUE             = 1;
      /** State value representing that FALSE */
      private static final int FALSE           = 2;

      private boolean isTrue(int state) {
          return (state & TRUE) != 0;
      }

      /**
        * 實現AQS的接口,獲取共享鎖的判斷
        */
      protected int tryAcquireShared(int state) {
          // 如果為true,直接允許獲取鎖對象
          // 如果為false,進入阻塞隊列,等待被喚醒
          return isTrue(getState()) ? 1 : -1;

      }

      /**
        * 實現AQS的接口,釋放共享鎖的判斷
        */
      protected boolean tryReleaseShared(int ignore) {
          // 始終返回true,代表可以release
          return true;
      }

      boolean innerState() {
          return isTrue(getState());
      }

      void innerGet() throws InterruptedException {
          acquireSharedInterruptibly(0);
      }

      void innerGet(long nanosTimeout) throws InterruptedException, TimeoutException {
          if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException();
      }

      void innerSetTrue() {
          for (;;) {
              int s = getState();
              if (s == TRUE) {
                  return; // 直接退出
              }
              if (compareAndSetState(s, TRUE)) {// cas更新狀態,避免併發更新true操作
                  releaseShared(0);// 釋放一下鎖對象,喚醒一下阻塞的Thread
                  return;
              }
          }
      }

      void innerSetFalse() {
          for (;;) {
              int s = getState();
              if (s == FALSE) {
                  return; // 直接退出
              }
              if (compareAndSetState(s, FALSE)) {// cas更新狀態,避免併發更新false操作
                  return;

              }
          }
      }

  }/<code>
  • Sync繼承了AbstractQueuedSynchronizer,其tryReleaseShared方法始終返回true,其innerGet方法執行的是acquireSharedInterruptibly方法,其innerSetTrue方法執行compareAndSetState(s, TRUE),其innerSetFalse執行compareAndSetState(s, FALSE)

小結

BooleanMutex定義了sync屬性,其get方法執行的是sync.innerGet(),其set方法執行的是sync.innerSetTrue()或者sync.innerSetFalse(),其state方法返回的是sync.innerState()

doc

  • BooleanMutex


分享到:


相關文章: