久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

并發計數類LongAdder怎么用

186次閱讀
沒有評論

共計 6365 個字符,預計需要花費 16 分鐘才能閱讀完成。

本篇內容介紹了“并發計數類 LongAdder 怎么用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓丸趣 TV 小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

AtomicLong 是通過 CAS(即 Compare And Swap) 原理來完成原子遞增遞減操作,在并發情況下不會出現線程不安全結果。AtomicLong 中的 value 是使用 volatile 修飾,并發下各個線程對 value 最新值均可見。我們以 incrementAndGet() 方法來深入。

 
public final long incrementAndGet() {

        return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;

    }

這里是調用了 unsafe 的方法

    public final long getAndAddLong(Object var1, long var2, long var4) {

        long var6;

        do {

            var6 = this.getLongVolatile(var1, var2);

        } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

        return var6;

    }

方法中 this.compareAndSwapLong() 有 4 個參數,var1 是需要修改的類對象,var2 是需要修改的字段的內存地址,var6 是修改前字段的值,var6+var4 是修改后字段的值。compareAndSwapLong 只有該字段實際值和 var6 值相當的時候,才可以成功設置其為 var6+var4。

再繼續往深一層去看

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

這里 Unsafe.compareAndSwapLong 是 native 方法,底層通過 JNI(Java Native Interface) 來完成調用,實際就是借助 C 來調用 CPU 指令來完成。

實現中使用了 do-while 循環,如果 CAS 失敗,則會繼續重試,直到成功為止。并發特別高的時候,雖然這里可能會有很多次循環,但是還是可以保證線程安全的。不過如果自旋 CAS 操作長時間不成功,競爭較大,會帶 CPU 帶來極大的開銷,占用更多的執行資源,可能會影響其他主業務的計算等。

LongAdder 怎么優化 AtomicLong

Doug Lea 在 jdk1.5 的時候就針對 HashMap 進行了線程安全和并發性能的優化,推出了分段鎖實現的 ConcurrentHashMap。一般 Java 面試,基本上離不開 ConcurrentHashMap 這個網紅問題。另外在 ForkJoinPool 中,Doug Lea 在其工作竊取算法上對 WorkQueue 使用了細粒度鎖來較少并發的競爭, 更多細節可參考我的原創文章 ForkJoin 使用和原理剖析。如果已經對 ConcurrentHashMap 有了較為深刻的理解,那么現在來看 LongAdder 的實現就會相對簡單了。

來看 LongAdder 的 increase() 方法實現,

    public void add(long x) {

        Cell[] as; long b, v; int m; Cell a;

        // 第一個 if 進行了兩個判斷,(1) 如果 cells 不為空,則直接進入第二個 if 語句中。(2) 同樣會先使用 cas 指令來嘗試 add,如果成功則直接返回。如果失敗則說明存在競爭,需要重新 add

        if ((as = cells) != null || !casBase(b = base, b + x)) {

            boolean uncontended = true;

            if (as == null || (m = as.length – 1) 0 ||

                (a = as[getProbe() m]) == null ||

                !(uncontended = a.cas(v = a.value, v + x)))

                longAccumulate(x, null, uncontended);

        }

    }

這里用到了 Cell 類對象,Cell 對象是 LongAdder 高并發實現的關鍵。在 casBase 沖突嚴重的時候,就會去創建 Cell 對象并添加到 cells 中,下面會詳細分析。

    @sun.misc.Contended static final class Cell {

        volatile long value;

        Cell(long x) {value = x;}

        // 提供 CAS 方法修改當前 Cell 對象上的 value

        final boolean cas(long cmp, long val) {

            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);

        }

        // Unsafe mechanics

        private static final sun.misc.Unsafe UNSAFE;

        private static final long valueOffset;

        static {

            try {

                UNSAFE = sun.misc.Unsafe.getUnsafe();

                Class ? ak = Cell.class;

                valueOffset = UNSAFE.objectFieldOffset

                    (ak.getDeclaredField( value));

            } catch (Exception e) {

                throw new Error(e);

            }

        }

    }

而這一句 a = as[getProbe() m] 其實就是通過 getProbe() 拿到當前 Thread 的 threadLocalRandomProbe 的 probe Hash 值。這個值其實是一個隨機值,這個隨機值由當前線程 ThreadLocalRandom.current() 產生。不用 Rondom 的原因是因為這里已經是高并發了,多線程情況下 Rondom 會極大可能得到同一個隨機值。因此這里使用 threadLocalRandomProbe 在高并發時會更加隨機,減少沖突。更多 ThreadLocalRandom 信息想要深入了解可關注這篇文章并發包中 ThreadLocalRandom 類原理淺嘗。拿到 as 數組中當前線程的 Cell 對象,然后再進行 CAS 的更新操作, 我們在源碼上進行分析。longAccumulate() 是在父類 Striped64.java 中。

    final void longAccumulate(long x, LongBinaryOperator fn,

                              boolean wasUncontended) {

        int h;

        if ((h = getProbe()) == 0) {

       
  // 如果當前線程的隨機數為 0,則初始化隨機數

            ThreadLocalRandom.current(); // force initialization

            h = getProbe();

            wasUncontended = true;

        }

        boolean collide = false;                // True if last slot nonempty

        for (;;) {

            Cell[] as; Cell a; int n; long v;

            // 如果當前 cells 數組不為空

            if ((as = cells) != null (n = as.length) 0) {

           
// 如果線程隨機數對應的 cells 對應數組下標的 Cell 元素不為空,

                if ((a = as[(n – 1) h]) == null) {

               
// 當使用到 LongAdder 的 Cell 數組相關的操作時,需要先獲取全局的 cellsBusy 的鎖,才可以進行相關操作。如果當前有其他線程的使用,則放棄這一步,繼續 for 循環重試。

                    if (cellsBusy == 0) {       // Try to attach new Cell

                   
//Cell 的初始值是 x,創建完畢則說明已經加上

                        Cell r = new Cell(x);   // Optimistically create

                        //casCellsBusy 獲取鎖,cellsBusy 通過 CAS 方式獲取鎖,當成功設置 cellsBusy 為 1 時,則獲取到鎖。

                        if (cellsBusy == 0 casCellsBusy()) {

                            boolean created = false;

                            try {               // Recheck under lock

                                Cell[] rs; int m, j;

                                if ((rs = cells) != null

                                    (m = rs.length) 0

                                    rs[j = (m – 1) h] == null) {

                                    rs[j] = r;

                                    created = true;

                                }

                            } finally {

                           
  //finally 里面釋放鎖

                                cellsBusy = 0;

                            }

                            if (created)

                                break;

                            continue;           // Slot is now non-empty

                        }

                    }

                    collide = false;

                }

                else if (!wasUncontended)       // CAS already known to fail

                    wasUncontended = true;      // Continue after rehash

                // 如果 a 不為空,則對 a 進行 cas 增 x 操作,成功則返回    

                else if (a.cas(v = a.value, ((fn == null) ? v + x :

                                             fn.applyAsLong(v, x))))

                    break;

                //cells 的長度 n 已經大于 CPU 數量,則繼續擴容沒有意義,因此直接標記為不沖突

                else if (n = NCPU || cells != as)

                    collide = false;            // At max size or stale

                else if (!collide)

                    collide = true;

                // 到這一步則說明 a 不為空但是 a 上進行 CAS 操作也有多個線程在競爭,因此需要擴容 cells 數組,其長度為原長度的 2 倍

                else if (cellsBusy == 0 casCellsBusy()) {

                    try {

                        if (cells == as) {      // Expand table unless stale

                            Cell[] rs = new Cell[n 1];

                            for (int i = 0; i ++i)

                                rs[i] = as[i];

                            cells = rs;

                        }

                    } finally {

                        cellsBusy = 0;

                    }

                    collide = false;

                    continue;                   // Retry with expanded table

                }

                // 繼續使用新的隨機數,避免在同一個 Cell 上競爭

                h = advanceProbe(h);

            }

            // 如果 cells 為空,則需要先創建 Cell 數組。初始長度為 2.(個人理解這個 if 放在前面會比較好一點,哈哈)

            else if (cellsBusy == 0 cells == as casCellsBusy()) {

                boolean init = false;

                try {                           // Initialize table

                    if (cells == as) {

                        Cell[] rs = new Cell[2];

                        rs[h 1] = new Cell(x);

                        cells = rs;

                        init = true;

                    }

                } finally {

                    cellsBusy = 0;

                }

                if (init)

                    break;

            }

            // 如果在 a 上競爭失敗,且擴容競爭也失敗了,則在 casBase 上嘗試增加數量

            else if (casBase(v = base, ((fn == null) ? v + x :

                                        fn.applyAsLong(v, x))))

                break;                          // Fall back on using base

        }

    }

最后是求 LongAdder 的總數,這一步就非常簡單了,把 base 的值和所有 cells 上的 value 值加起來就是總數了。

    public long sum() {

        Cell[] as = cells; Cell a;

        long sum = base;

        if (as != null) {

            for (int i = 0; i as.length; ++i) {

                if ((a = as[i]) != null)

                    sum += a.value;

            }

        }

        return sum;

    }

思考

源碼中 Cell 數組的會控制在不超過 NCPU 的兩倍,原因是 LongAdder 其實在底層是依賴 CPU 的 CAS 指令來操作,如果多出太多,即使在代碼層面沒有競爭,在底層 CPU 的競爭會更多,所以這里會有一個數量的限制。所以在 LongAdder 的設計中,由于使用到 CAS 指令操作,瓶頸在于 CPU 上。

YY 一下,那么有沒有方式可以突破這個瓶頸呢?我個人覺得是有的,但是有前提條件,應用場景極其有限。基于 ThreadLocal 的設計,假設統計只在一個固定的線程池中進行,假設線程池中的線程不會銷毀 (異常補充線程的就暫時不管了),則可以認為線程數量是固定且不變的,那么統計則可以依賴于只在當前線程中進行,那么即使是高并發,就轉化為 ThreadLocal 這種單線程操作了,完全可以擺脫 CAS 的 CPU 指令操作的限制,那么性能將極大提升。

“并發計數類 LongAdder 怎么用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注丸趣 TV 網站,丸趣 TV 小編將為大家輸出更多高質量的實用文章!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-04發表,共計6365字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 富宁县| 桐梓县| 乌苏市| 滨州市| 禹城市| 延津县| 绥棱县| 平潭县| 稷山县| 岑巩县| 彭山县| 鸡泽县| 富源县| 大兴区| 广德县| 航空| 芜湖市| 清原| 榆社县| 枣阳市| 旬阳县| 布尔津县| 根河市| 高唐县| 巢湖市| 泰顺县| 万年县| 荆州市| 靖安县| 五莲县| 施甸县| 仙桃市| 安吉县| 阿图什市| 遂宁市| 洪泽县| 宁强县| 文登市| 宜兰县| 沙洋县| 青龙|