初識CAS的實現原理

在Java併發領域,總會提到原子操作,而Java作為一門高級語言,為了實現原子操作,提供了兩種解決方案:1)加鎖;2)通過CAS來實現,同時JDK在1.5開始,在JUC包下,併發編程大神Doug Lea提供了很多原子類,這些原子類都是通過CAS來實現的。接下來本文將主要介紹CAS相關的知識。

1. CAS簡介

  • CAS的全稱是Compare And Swap,翻譯過來就是比較並交換。假設內存中數據的值為V,舊的預期值為A,新的修改值為B。那麼CAS操作可以分為三個步驟:1)將舊的預期值A與內存中的值V比較;2)如果A與V的值相等,那麼就將V的值設置為B;3)返回操作是否成功。
  • 在多處理器的機器上,當有多個線程對內存中的數據進行CAS操作時,處理器能保證只會有一個線程能修改成功。

2. CAS的實現原理

在Java中可以通過Unsafe類實現CAS操作,而Unsafe類最終調用的是native方法,即具體實現是由JVM中的方法實現的。而JVM中通過C++調用處理器的指令cmpxchg來實現的。

2.1 JAVA中的CAS實現

  • 在Java的java.util.concurrent.atomic包中提供了很多原子類,這些類的操作均是原子操作,它們都是根據Unsafe類的CAS方法來實現原子操作的,如:compareAndSwapInt()、compareAndSwapLong()、compareAndSwapObject()這三個方法均是CAS方法。
  • 例如如下示例,通過AtomicInteger類的CAS操作實現了count線程安全的累加操作。
 1public class Demo {
2
3 // 創建一個Integer類型的原子類
4 private static AtomicInteger count = new AtomicInteger(0);
5
6 public static void main(String[] args) {
7 List<thread> threadList = new ArrayList<>(10);
8 // 創建10個線程,每個線程中的run()方法將count累加10000次
9 for (int i = 0; i < 10; i++) {
10 threadList.add(new Thread(()-> {
11 // 每個線程累加10000次
12 for (int j = 0; j < 10000; j++) {
13 // 實現遞增,incrementAndGet()方法等價於非原子操作裡面的 count++
14 count.incrementAndGet();
15 }
16 }));
17 }
18 // 將10個線程啟動
19 for (int i = 0; i < 10; i++) {
20 threadList.get(i).start();
21 }
22
23 // 讓主線程休眠10秒鐘,休眠10秒鐘是為了讓前面的10個線程全部執行完成,如果10秒鐘不夠,可以設置得更加長一點
24 try {
25 Thread.sleep(10000);
26 } catch (InterruptedException e) {
27 e.printStackTrace();
28 }
29
30 // 打印count的值
31 // 最終發現從控制檯打印出來的結果為100000

32 System.out.println(count.get());
33
34 }
35}
/<thread>
  • 當上面10個線程同時併發的對count進行累加時,使用了AtomicInteger.incrementAndGet()方法來累加,這個方法保證了原子性。所以10個線程,每個線程累加10000次,最終count的結果為100000。接下來我們通過源碼來看看AtomicInteger類是如何來實現原子操作的。
  • AtomicInteger類的部分源碼如下。從源碼中我們發現,AtomicInteger調用了Unsafe類的getAndAddInt()方法。
1public class AtomicInteger extends Number implements java.io.Serializable {
2
3 private static final Unsafe unsafe = Unsafe.getUnsafe();
4
5 public final int incrementAndGet() {
6 return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
7 }
8}
  • Unsafe類的部分源碼如下。從源碼中可以發現,getAndAddInt()方法會循環調用compareAndSwapInt()方法(即CAS方法),如果CAS失敗,則會通過調用getIntVolatile()方法從內存中獲取新值,然後再進行CAS操作,直到成功。compareAndSwapInt()方法是一個native方法,具體的實現是由JVM來完成的。
 1public final class Unsafe {
2
3 private static final Unsafe theUnsafe = new Unsafe();
4
5 public final int getAndAddInt(Object o, long offset, int delta) {

6 int v;
7 do {
8 v = getIntVolatile(o, offset);
9 // 循環調用compareAndSwapInt(),直到設置成功
10 // 如果設置失敗,則通過getIntVolatile()方法從內存中獲取新的值,然後再進行CAS操作
11 } while (!compareAndSwapInt(o, offset, v, v + delta));
12 return v;
13 }
14
15 // native方法由JVM實現
16 public final native boolean compareAndSwapInt(Object o, long offset,
17 int expected,
18 int x);
19}
  • 要想看JVM的源碼,就需要下載OpenJDK的源碼,可以從github下載(https://github.com/openjdk/jdk),進入github後,可以選擇下載tag分支下不同版本的OpenJDK,這裡筆者下載的是jdk8-b120版本的OpenJDK。
  • Java中的native方法是通過JNI的方式調用到JVM中的,對於Unsafe類,在JVM的源碼中會對應到unsafe.cpp文件,該文件的路徑是:jdk-jdk8-b120/hotspot/src/share/vm/prims/unsafe.cpp。在unsafe.cpp文件中接近末尾的地方,可以看到如下代碼(只截取了部分源碼)。
 1static JNINativeMethod methods_18[] = {
2 {CC"getObject", CC"("OBJ"J)"OBJ"", FN_PTR(Unsafe_GetObject)},
3 {CC"putObject", CC"("OBJ"J"OBJ")V", FN_PTR(Unsafe_SetObject)},
4 {CC"getObjectVolatile",CC"("OBJ"J)"OBJ"", FN_PTR(Unsafe_GetObjectVolatile)},
5 {CC"putObjectVolatile",CC"("OBJ"J"OBJ")V", FN_PTR(Unsafe_SetObjectVolatile)},
6
7 {CC"getAddress", CC"("ADR")"ADR, FN_PTR(Unsafe_GetNativeAddress)},
8 {CC"putAddress", CC"("ADR""ADR")V", FN_PTR(Unsafe_SetNativeAddress)},
9
10 {CC"allocateMemory", CC"(J)"ADR, FN_PTR(Unsafe_AllocateMemory)},
11 {CC"reallocateMemory", CC"("ADR"J)"ADR, FN_PTR(Unsafe_ReallocateMemory)},

12 {CC"freeMemory", CC"("ADR")V", FN_PTR(Unsafe_FreeMemory)},
13
14 {CC"objectFieldOffset", CC"("FLD")J", FN_PTR(Unsafe_ObjectFieldOffset)},
15 {CC"staticFieldOffset", CC"("FLD")J", FN_PTR(Unsafe_StaticFieldOffset)},
16 {CC"staticFieldBase", CC"("FLD")"OBJ, FN_PTR(Unsafe_StaticFieldBaseFromField)},
17 {CC"ensureClassInitialized",CC"("CLS")V", FN_PTR(Unsafe_EnsureClassInitialized)},
18 {CC"arrayBaseOffset", CC"("CLS")I", FN_PTR(Unsafe_ArrayBaseOffset)},
19 {CC"arrayIndexScale", CC"("CLS")I", FN_PTR(Unsafe_ArrayIndexScale)},
20 {CC"addressSize", CC"()I", FN_PTR(Unsafe_AddressSize)},
21 {CC"pageSize", CC"()I", FN_PTR(Unsafe_PageSize)},
22
23 {CC"defineClass", CC"("DC_Args")"CLS, FN_PTR(Unsafe_DefineClass)},
24 {CC"allocateInstance", CC"("CLS")"OBJ, FN_PTR(Unsafe_AllocateInstance)},
25 {CC"monitorEnter", CC"("OBJ")V", FN_PTR(Unsafe_MonitorEnter)},
26 {CC"monitorExit", CC"("OBJ")V", FN_PTR(Unsafe_MonitorExit)},
27 {CC"tryMonitorEnter", CC"("OBJ")Z", FN_PTR(Unsafe_TryMonitorEnter)},
28 {CC"throwException", CC"("THR")V", FN_PTR(Unsafe_ThrowException)},
29 {CC"compareAndSwapObject", CC"("OBJ"J"OBJ""OBJ")Z", FN_PTR(Unsafe_CompareAndSwapObject)},
30 // 對應Unsafe.java文件中的compareAndSwapInt()方法,Unsafe_CompareAndSwapInt()方法時對應的C++方法
31 {CC"compareAndSwapInt", CC"("OBJ"J""I""I"")Z", FN_PTR(Unsafe_CompareAndSwapInt)},
32 {CC"compareAndSwapLong", CC"("OBJ"J""J""J"")Z", FN_PTR(Unsafe_CompareAndSwapLong)},
33 {CC"putOrderedObject", CC"("OBJ"J"OBJ")V", FN_PTR(Unsafe_SetOrderedObject)},
34 {CC"putOrderedInt", CC"("OBJ"JI)V", FN_PTR(Unsafe_SetOrderedInt)},
35 {CC"putOrderedLong", CC"("OBJ"JJ)V", FN_PTR(Unsafe_SetOrderedLong)},
36 {CC"park", CC"(ZJ)V", FN_PTR(Unsafe_Park)},
37 {CC"unpark", CC"("OBJ")V", FN_PTR(Unsafe_Unpark)}
38}
  • 從上面的代碼中可以發現,Unsafe類中的native方法,都會在這兒找到對應的c++方法。如上面我們提到的compareAndSwapInt()、compareAndSwapLong()、compareAndSwapObject()這三個方法,分別對應unsafe.cpp文件中的Unsafe_CompareAndSwapInt()、Unsafe_CompareAndSwapLong()、Unsafe_SetOrderedObject()。所以當我們調用Unsafe.compareAndSwapInt()方法時,最終會調用到unsafe.cpp文件中的Unsafe_CompareAndSwapInt()方法。Unsafe_CompareAndSwapInt()源碼如下:
1UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
2 UnsafeWrapper("Unsafe_CompareAndSwapInt");
3 oop p = JNIHandles::resolve(obj);
4 jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
5 // 核心代碼是Atomic::cmpxchg()
6 return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
7UNSAFE_END

  • Unsafe_CompareAndSwapInt()的核心代碼是Atomic::cmpxchg(x, addr, e),它的作用最終是調用處理器的CMPXCHG指令。針對不同的操作系統,Atomic::cmpxchg()有不同的實現。例如針對64位linux的系統,它最終調用的是jdk-jdk8-b120/hotspot/src/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hpp文件中的Atomic::cmpxchg()方法;針對windows操作系統,它調用的是jdk-jdk8-b120/hotspot/src/os_cpu/windows_x86/vm/atomic_windows_x86.inline.hpp目錄下的Atomic::cmpxchg()方法。
  • 通常我們的服務器都是64位linux系統,所以下面以64位的linux系統為例,Atomic::cmpxchg()的源碼如下。
1inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
2 int mp = os::is_MP();
3 __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
4 : "=a" (exchange_value)
5 : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
6 : "cc", "memory");
7 return exchange_value;
8}
9
  • __asm__表示的是嵌入一段彙編代碼,LOCK_IF_MP(%4)表示的是向緩存或者總線加鎖,cmpxchgl是彙編語言中比較並交換,能保證原子操作。(關於硬件層面的指令,這裡就不深入研究了,有興趣的朋友可以去了解下)
  • 所以最終在Java層面,是通過Unsafe類中提供的幾個native修飾的CAS方法來實現CAS操作,然後這幾個native方法會調用到JVM中的方法,最終通過處理器中的CMPXCHG指令來實現原子操作。

2.2 處理器中的原子操作的實現

CAS操作的目的是為了實現原子操作,在Java中通過CAS實現的原子操作,實際上最終還是落地在處理器上,由處理器來實現原子操作。那麼處理器是如何實現原子操作的呢?

  • 不同型號的處理器,針對原子操作有不同的實現方案。下面以x86架構的處理器舉例。
  • 對於單處理器的單核系統而言,要實現原子操作除了保證內存的讀寫操作需要地址對齊以外,還需要保證操作指令序列不被打斷。對於簡單的原子操作,CPU會提供單條指令如INC等指令,但是對於複雜的原子操作,需要執行多條指令,這個時候如果出現上下文切換,如:任務切換等行為,會影響到原子操作。因此此時需要採用自旋鎖來保證操作指令不被中斷。
  • 對於多處理器而言,除了要保證單處理的原子操作以外,還要保證不受其他處理器的影響。由於每個CPU均有自己的L1、L2、L3緩存,所以當多個CPU同時向內存中寫同一個數據時,最終會造成數據的不準確性。


初識CAS的實現原理


  • 如上圖,對於i++的場景,進行兩次i++操作,當2個CPU同時將i寫入到內存時,最終內存中的結果是2,而我們期望的應該是3。顯然這種操作結果是不正確的。
  • 那麼多處理器是如何實現原子操作的呢?對於x86架構的處理器,提供了總線加鎖或者緩存加鎖來實現原子操作。
  • (1)總線加鎖。對於上面i++操作的例子,最終導致結果不正確的原因是因為同時有多個CPU向內存中寫共享變量。那麼如果我們保證當一個CPU在向內存中改寫共享變量時,其他的CPU不會向內存中改寫共享變量,就能保證結果的正確性。眾所周知,CPU和內存之間交換數據是通過總線(bus)進行的。當CPU1向內存中改寫共享變量時,會執行一個Lock前綴的指令,這個指令會發出一個LOCK#信號,這個信號會鎖住總線,總線鎖住期間,其他的CPU不能訪問總線,不能訪問總線,就意味著其他的CPU就不能通過總線向內存中讀寫數據,所以此時總線鎖住期間,發出LOCK#信號的CPU能獨佔共享內存,這樣就能保證原子操作了。
初識CAS的實現原理


  • (2)使用緩存鎖來保證原子性。總線加鎖雖然能保證原子操作,但是它的效率太低了,因為總線鎖住期間,其他的CPU都不能訪問總線,這大大降低了多處理器的性能。實際上我們在保證原子性操作時,只需要保證的是某一個共享變量內存地址的原子操作即可,所以使用緩存鎖定來實現原子操作。緩存鎖定是指它會鎖住這塊內存區域的緩存並回寫到內存,並使用緩存一致性機制來確保修改的原子性,緩存一致性機制會阻止同時修改由兩個以上處理器緩存的內存區域數據。(這一句話引用的是《Java併發編程的藝術》一書彙總的第二章第2.1節第10頁)這個時候由於不會鎖住總線,即保證了原子操作,也保證了性能。目前處理器在某些場景下會使用緩存鎖定來代替總線鎖定。

3. CAS產生的問題

CAS雖然解決了原子性,但同時它也存在著三大問題。

  • (1)ABA問題。在CAS操作時會先檢查值有沒有變化,如果沒有變化則執行更新操作,如果有變化則不執行更新操作。假設原來的值為A,後來更新成了B,然後又更新成了A,這個時候去執行CAS的檢查操作時,內存中的值還是A,就會誤以為內存中的值沒有變化,然後執行更新操作,實際上,這個時候內存中的值發生過變化。那麼怎麼解決ABA的問題呢?可以在每次更新值的時候添加一個版本號,那麼A->B->A就變為了1A->2B->3A,這個時候就不會出現ABA的問題了。在JDK1.5開始,JUC包下提供了AtomicStampedReference類來解決ABA的問題。這個類的compareAndSet()方法會首先檢查當前引用是否等於預期的引用,然後檢查當前標誌是都等於預期標誌,如果都相等,才會調用casPair()方法執行更新操作。casPair()方法最終也是調用了Unsafe類中的CAS方法。AtomicStampedReference類的compareAndSet()方法的源碼如下。
 1public boolean compareAndSet(V expectedReference,
2 V newReference,
3 int expectedStamp,
4 int newStamp) {
5 Pair current = pair;
6 // 先檢查期望的引用是否等於當前引用,期望的標識是否等於當前的標識
7 // 然後才執行CAS操作(casPair()也是調用了CAS方法)
8 return
9 expectedReference == current.reference &&
10 expectedStamp == current.stamp &&
11 ((newReference == current.reference &&
12 newStamp == current.stamp) ||
13 casPair(current, Pair.of(newReference, newStamp)));
14}
1private boolean casPair(Pair cmp, Pair val) {
2 return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
3}
  • (2)性能問題。CAS會採用循環的方式來實現原子操作,如果長時間的循環設置不成功,就會一直佔用CPU,給CPU帶來很大的執行開銷,降低應用程序的性能。
  • (3)只能保證一個共享變量的原子操作。對一個共享共享變量執行CAS操作時,能保證原子操作,但是如果同時對多個共享變量執行操作時,CAS就無法同時保證這多個共享變量的原子性。這個時候可以使用將多個共享變量封裝到一個對象中,然後使用JUC包下提供的AtomicReference類來實現原子操作。另外一種方案就是使用鎖。

4. 總結

  • 本文以一個Demo開始,結合AtomicInteger類,逐漸深入源碼,介紹了Java中CAS操作的實現;然後又介紹了處理器中原子操作的實現。
  • 最後本文總結了CAS操作帶來的3大問題,以及目前針對這些問題的解決方案。
  • 在Java中要實現CAS操作,就脫離不開Unsafe類的使用,本文只是簡單介紹了Unsafe類的使用,下一篇文章將詳細分析Unsafe類的源碼以及使用場景。
  1. 《Java併發編程的藝術》


分享到:


相關文章: