线程同步机制二:CAS原理和JUC原子类

Published on 2024-09-27 10:56 in 分类: 博客 with 狂盗一枝梅
分类: 博客

一、CAS原理

CAS(Compare And Swap),也就是“比较并交换”,它是一种CPU指令级的原子操作。CAS 操作常用于解决并发环境下的竞态条件和实现线程安全的数据操作。它的基本操作流程如下:

  1. 比较阶段:CAS 首先会比较内存位置的当前值与预期值是否相等。

  2. 交换阶段:如果比较结果为真,则将新值写入内存位置;如果比较结果为假,则不做任何操作。

这个过程通俗的说就是:改一个数之前先检查下这个数有没有被人改过(比较),如果发现这个值已经被改过了(期望值和待修改的旧值不相同),那我就不改了,否则我就更新它(新值替换旧值)。

CAS 操作是原子的,即在执行过程中不会被中断,保证整个比较和交换的过程是不可分割的。

以多个线程同时操作一个变量x自增1为例,使用CAS如何保证线程安全性呢?看下面的流程图:

image-20240922125819060

肯定会有疑问,两个线程同时进入CAS操作区域,难道不会都成功了?

答案是不会,CAS是原子操作,对同一个内存地址的CAS操作在同一时刻只能执行一个。多个线程对同一个内存地址CAS的结果只有两个:成功或者失败。两个线程对同一个变量CAS出现了竞态条件,只会有一个成功,这是现代处理器硬件底层来保证的。

二、Unsafe类

Unsafe是位于sun.misc包下的一个类,主要提供一些用于执行低级别、不安全的底层操作,如直接访问系统内存资源、自主管理内存资源等。Unsafe大量的方法都是native方法,基于C++语言实现,这些方法在提升Java运行效率、增强Java语言底层资源操作能力方面起到了很大的作用。

Unsafe类提供了 CAS(Compare and Swap)原子操作,用于实现并发控制。实际上Unsafe提供的CAS方法直接通过native方式(封装C++代码)调用了底层的CPU指令cmpxchg。

1、Unsafe类的CAS操作方法

Unsafe类中有三个重要的CAS操作方法如下所示

/**
*  定义在Unsafe类中的三个“比较并交换”原子方法
*/
public final native boolean compareAndSwapObject(
       Object o, long offset,  Object expected, Object update);

public final native boolean compareAndSwapInt(
       Object o, long offset, int expected,int update);

public final native boolean compareAndSwapLong(
       Object o, long offset, long expected, long update);

说下这三个方法的每个参数代表什么意思

  1. Object o:这是要在其上执行 CAS 操作的对象。
  2. long offset:这是要执行 CAS 操作的对象中字段的偏移量,单位为字节,表示字段距离对象的起始位置偏移了多少个字节,通过这个偏移量,Unsafe 类能够准确地定位到对象中的某个字段。
  3. Object expected:这是预期值。CAS 操作将比较对象中位于给定偏移量处的字段的当前值与这个预期值进行比较。
  4. Object update:这是更新值。如果对象中位于给定偏移量处的字段的当前值与预期值匹配,CAS 操作将尝试使用这个更新值来更新字段的值。
  5. 返回值:这个方法返回一个布尔值,表示 CAS 操作是否成功。如果 CAS 操作成功(即对象中位于给定偏移量处的字段的当前值与预期值匹配),则返回 true;否则返回 false

2、使用Unsafe类实现多线程自增

为了更深入的了解CAS方法的应用,接下来使用Unsafe类提供的CAS方法实现一个小功能:十个线程对初始值为0的变量自增100万次,最后输出该变量的值。

第一步:获取Unsafe类的实例

Unsafe类的实例维护在Unsafe类内部,其get方法如下

image-20240924172250621

它会判定调用者是否是外部的,如果是自己写的代码调用该方法,就会抛出SecurityException异常;Unsafe类的构造函数是私有的,因此不能直接通过new Unsafe()来实例化一个Unsafe对象。所以要想获取该实例,需要通过反射

public static Unsafe getUnsafe() {
    try {
        Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
        theUnsafe.setAccessible(true);
        return (Unsafe) theUnsafe.get(null);
    } catch (Exception e) {
        throw new AssertionError(e);
    }
}

第二步:计算字段偏移量

根据上一步,我们已经获取到了Unsafe实例,根据Unsafe类的CAS方法

public final native boolean compareAndSwapObject(
       Object o, long offset,  Object expected, Object update);

参数中有个偏移量offset,如何获取?

Unsafe类中还有一个方法

public native long objectFieldOffset(Field field);  

该方法能够通过给定的类直接计算出来该字段在对象中的偏移量,也就是相对于对象头偏移的字节数,只需要给出Field,不需要对象就能计算出来。说到这里,大家对之前说的Unsafe类可以“直接访问系统内存资源”的定位,想必要明白一些了。

举例说明下,有个类如下所示

class OptimisticLockingPlus {
 private volatile int value;   
}

通过Unsafe类计算下value字段的偏移量:

valueOffset = unsafe.objectFieldOffset(
                        OptimisticLockingPlus.class.getDeclaredField("value"));
log.info("Unsafe 计算的 valueOffset={}", valueOffset);

查看下结果:

2024-09-24 17:40:28.582 [INFO ] [main      ] - Unsafe 计算的 valueOffset=12

接下来验证下改偏移值计算的对不对,根据上一篇文章:深入理解Java对象结构 中的知识,可以使用JOL工具查看对象的内部结构

OptimisticLockingPlus cas = new OptimisticLockingPlus();
log.info("JOL解析工具输出对象结构:{}", ClassLayout.parseInstance(cas).toPrintable());

其输出结果如下所示

image-20240924174337981

可以看到,value字段距离对象头正好12个字节,这就验证了Unsafe类计算的偏移量是正确的。

第三步:代码实现

使用CAS进行无锁编程的步骤大致如下:

(1)获得字段的期望值(oldValue)。

(2)计算出需要替换的新值(newValue)。

(3)通过CAS将新值(newValue)放在字段的内存地址上,如果CAS失败就重复第(1)步到第(2)步,一直到CAS成功,这种重复俗称CAS自旋

其伪代码如下所示

do{
   获得字段的期望值(oldValue);
   计算出需要替换的新值(newValue);
} while (!CAS(内存地址,oldValue,newValue))

现在就根据上述逻辑实现十个线程对一个变量自增100万次功能的最终代码:

import lombok.extern.slf4j.Slf4j;
import org.openjdk.jol.info.ClassLayout;
import sun.misc.Unsafe;

import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 基于CAS无锁实现的安全自增
 */
@Slf4j
public class OptimisticLockingPlus {

    //并发数量
    private static final int THREAD_COUNT = 10;

    //内部值,使用volatile保证线程可见性
    private volatile int value;

    //不安全类
    private static final Unsafe unsafe = getUnsafe();

    //value 的内存偏移(相对于对象头部的偏移,不是绝对偏移)
    private static final long valueOffset;

    //统计失败的次数
    private static final AtomicLong failure = new AtomicLong(0);

    static {
        try {
            //取得value属性的内存偏移
            valueOffset = unsafe.objectFieldOffset(
                    OptimisticLockingPlus.class.getDeclaredField("value"));

            log.info("Unsafe 计算的 valueOffset={}", valueOffset);
        } catch (Exception ex) {
            throw new Error(ex);
        }
    }

    //通过CAS原子操作,进行“比较并交换”
    public final boolean unSafeCompareAndSet(int oldValue, int newValue) {
        //原子操作:使用unsafe的“比较并交换”方法进行value属性的交换
        return unsafe.compareAndSwapInt(
                this,
                valueOffset,
                oldValue,
                newValue
        );
    }

    //使用无锁编程实现安全的自增方法
    public void selfPlus() {
        int oldValue = value;
        //通过CAS原子操作,如果操作失败就自旋,一直到操作成功
        int i = 0;
        do {
            // 获取旧值
            oldValue = value;
            //统计无效的自旋次数
            if (i++ > 1) {
                //记录失败的次数
                failure.incrementAndGet();
            }

        } while (!unSafeCompareAndSet(oldValue, oldValue + 1));
    }

    //测试用例入口方法
    public static void main(String[] args) throws InterruptedException {
        final OptimisticLockingPlus cas = new OptimisticLockingPlus();
        log.info("JOL解析工具输出对象结构:{}", ClassLayout.parseInstance(cas).toPrintable());
        //倒数闩,需要倒数THREAD_COUNT次
        CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
        for (int i = 0; i < THREAD_COUNT; i++) {
            // 提交10个任务
            CompletableFuture.runAsync(() -> {
                log.info("正在计数");
                //每个任务累加1000次
                for (int j = 0; j < 1000000; j++) {
                    cas.selfPlus();
                }
                latch.countDown();  // 执行完一个任务,倒数闩减少一次
            });
        }
        latch.await(); //主线程等待倒数闩倒数完毕
        log.info("累加之和:" + cas.value);
        log.info("失败次数:" + OptimisticLockingPlus.failure.get());
    }
    
    //自定义地获取Unsafe实例的辅助方法
    public static Unsafe getUnsafe() {
        try {
            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafe.setAccessible(true);
            return (Unsafe) theUnsafe.get(null);
        } catch (Exception e) {
            throw new AssertionError(e);
        }
    }
}

输出结果:

image-20240925095815787

可以看到线程竞争非常激烈,一共累加了一千万次,竞争失败的就有一千九百万次。。。经过多次运行,每次失败的次数并不相同,甚至相差一个数量级,竞争失败的次数应该和当前处理器是否空闲有关系。

三、juc包中的原子类

在多线程并发执行时,诸如“++”或“--”类的运算不具备原子性,不是线程安全的操作。通常情况下,大家会使用synchronized将这些线程不安全的操作变成同步操作,但是这样会降低并发程序的性能。所以,JDK为这些类型不安全的操作提供了一些原子类,与synchronized同步机制相比,JDK原子类是基于CAS轻量级原子操作的实现,使得程序运行效率变得更高

我们口中常说的juc包指的是”java.util.concurrent“包,看名字就知道这个包下的类都是关于java并发相关的类,实际上也确实如此。juc包中的原子类都在”java.util.concurrent.atomic“子包中,atomic是”原子的“意思,该包下的类名几乎都是是以Atomic开头,所以统称这些类为”原子类“。

image-20240925152809440

JDK8中一共有上述17个类,除去最后一个Stripe64抽象类,一共16个原子类,我将其分类成了6大类

image-20240925153028558

下面看看每个类的使用方法

1、基础类型原子类

基础类型原子类是基础类型包装类的原子类,也是原子类中使用频率最高的,它包含三个原子类:AtomicBoolean、AtomicInteger、AtomicLong,分别对应着布尔类型、整型和长整型类型的原子类。下面以AtomicInteger为例讲解用法和原理。

先看下用法,AtomicInteger提供了几种方法

方法名 方法描述
public final int get() 获取当前对象的值
public final int getAndSet(int newValue) 设置新值并返回旧值
public final boolean compareAndSet(int expect, int update) CAS方式设置新值
public final int getAndIncrement() 自增并返回旧值
public final int getAndDecrement() 自减并返回旧值
public final int getAndAdd(int delta) 加法并返回旧值
public final int addAndGet(int delta) 加法并返回新值

其它还有很多方法,这些方法都有些规律可循:getAndxxx就是返回旧值的意思,xxxAndGet就是先更新,然后返回更新后的结果的意思。接下来使用AtomicInteger实现方法实现自增案例:

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author kdyzm
 * @date 2024/9/25
 */
@Slf4j
@Data
public class AtomicIntegerTest implements Runnable {

    private AtomicInteger count = new AtomicInteger(0);
    private CountDownLatch countDownLatch = new CountDownLatch(10);

    public static void main(String[] args) throws InterruptedException {
        AtomicIntegerTest atomicIntegerTest = new AtomicIntegerTest();
        for (int i = 0; i < 10; i++) {
            CompletableFuture.runAsync(atomicIntegerTest);
        }
        atomicIntegerTest.getCountDownLatch().await();
        log.info("十个线程共自增十万次结果:{}", atomicIntegerTest.getCount().get());
    }

    @Override
    public void run() {
        log.info("正在执行任务:{}", Thread.currentThread().getName());
        for (int j = 0; j < 10000; j++) {
            count.incrementAndGet();
        }
        countDownLatch.countDown();
    }
}

运行结果:

image-20240925175517898

2、数组类型原子类

数组类型的原子类有三个

类名 类功能
AtomicIntegerArray 整型原子类
AtomicLongArray 长整型原子类
AtomicReferenceArray 布尔型原子类

还是以AtomicIntegerArray为例讲下原子数组类型的用法

先看下AtomicIntegerArray类提供的原子操作方法

方法名 方法描述
public final int get(int i) 获取 index=i 位置元素的值
public final int getAndSet(int i, int newValue) 返回 index=i 位置当前的值,并将其设置为新值:newValue
public final int getAndIncrement(int i) 获取 index=i 位置元素的值,并让该位置的元素自增
public final int getAndDecrement(int i) 获取 index=i 位置元素的值,并让该位置的元素自减
public final int getAndAdd(int i, int delta) 获取 index=i 位置元素的值,并加上预期的值
boolean compareAndSet(int expect, int update) 如果输入的数值等于预期值,就以原子方式将位置i的元素值设置为输入值(update)
public final void lazySet(int i, int newValue) 最终将位置i的元素设置为newValue,lazySet()方法可能导致其他线程在之后的一小段时间内还是可以读到旧的值

原子数组很多方法和基础类型原子类中的方法很像,只是方法中多了一个参数:int i,表示操作指定位置的元素,剩下的就和基础类型原子类一样了。

接下来对长度等于10的整型原子数组每个位置上的元素都多线程自增10万次,看实现代码:

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicIntegerArray;

/**
 * @author kdyzm
 * @date 2024/9/26
 */
@Slf4j
public class AtomicIntegerArrayTest {

    private static final AtomicIntegerArray ARRAY = new AtomicIntegerArray(new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0});

    private static final int THREAD_COUNT = 10;

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(ARRAY.length());
        //对每个元素循环自增十万次
        for (int i = 0; i < ARRAY.length(); i++) {
            int finalI = i;
            CompletableFuture.runAsync(() -> {
                try {
                    selfPlus(finalI);
                } catch (InterruptedException e) {
                    log.error("", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        for (int i = 0; i < ARRAY.length(); i++) {
            log.info("坐标:{},数值:{}", i, ARRAY.get(i));
        }
    }

    //对offSet坐标的值多线程自增10万次
    public static void selfPlus(int offSet) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
        //10个线程每个线程循环1万次自增
        for (int i = 0; i < THREAD_COUNT; i++) {
            CompletableFuture.runAsync(() -> {
                for (int j = 0; j < 10000; j++) {
                    //对offSet位置的元素自增1
                    ARRAY.incrementAndGet(offSet);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
    }

}

运行结果:

image-20240926110211825

3、引用类型原子类

基础的原子类型只能保证一个变量的原子操作,当需要对多个变量进行操作时,CAS无法保证原子性操作,这时可以用AtomicReference(原子引用类型)保证对象引用的原子性。

简单来说,如果需要同时保障对多个变量操作的原子性,就可以把多个变量放在一个对象中进行操作。

类名 类功能
AtomicReference 基础的引用原子类
AtomicMarkableReference 带印戳的引用原子类
AtomicStampedReference 带修改标志的引用原子类

下面看下引用类型原子类的用法:

AtomicReference类的方法和基础类型原子类的方法几乎一模一样,不再赘述,下面一个小案例来看看它的使用方法

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.concurrent.atomic.AtomicReference;

/**
 * @author kdyzm
 * @date 2024/9/26
 */
public class AtomicReferenceTest {

    public static void main(String[] args) {

        User user = new User("张三", 12);
        AtomicReference<User> atomicReference = new AtomicReference<>(user);

        User user1 = new User("李四", 13);
        atomicReference.compareAndSet(user, user1);
    }


    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static class User {

        private String name;

        private Integer age;
    }
}

案例太过于简单,不再赘述。

源码和基础类型原子类几乎一样,也不再赘述。

需要注意的是,使用原子引用类型AtomicReference包装了User对象之后,只能保障User引用的原子操作,对被包装的User对象的字段值修改时不能保证原子性,这点要切记。想要对对象属性更改操作也保持原子性,需要使用属性更新原子类。

4、属性更新原子类

属性更新原子类有三个:

类名 类作用
AtomicIntegerFieldUpdater 保障整型字段的更新操作的原子性
AtomicLongFieldUpdater 保障长整型字段的更新操作的原子性。
AtomicReferenceFieldUpdater 保障引用字段的更新操作的原子性。

其内部实现原理都一样,下面使用AtomicIntegerUpdater类来做说明。

首先得明确一点,要想保证属性安全更新,必须满足以下条件:

  1. 更新的对象属性必须使用public volatile int修饰符
  2. 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须调用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性

使用示例如下所示:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
 * @author kdyzm
 * @date 2024/9/26
 */
@Slf4j
public class AtomicIntegerFieldUpdaterTest {

    static final AtomicIntegerFieldUpdater<User> updater = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");

    static final int THREAD_COUNT = 10;

    public static void main(String[] args) throws InterruptedException {
        User user = new User("张三", 0);
        CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
        //10条线程各自增1万次
        for (int i = 0; i < THREAD_COUNT; i++) {
            CompletableFuture.runAsync(() -> {
                for (int j = 0; j < 10000; j++) {
                    updater.incrementAndGet(user);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        log.info("多线程自增结果:{}", updater.get(user));
    }
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static class User {
        private String name;
        //注意age必须是 public volatile int 修饰
        public volatile int age;
    }
}

运行结果:

image-20240926211544021

5、原子加法器

在争用激烈的场景下,会导致大量的CAS空自旋。比如,在大量线程同时并发修改一个AtomicInteger时,可能有很多线程会不停地自旋,甚至有的线程会进入一个无限重复的循环中。在高并发场景下如何提升CAS操作的性能呢?可以使用LongAdder替代AtomicInteger,DoubleAdder替代AtomicDouble。

原子加法器有两个,均是JDK8新增的,它们是以空间换时间的方式提升高并发场景下CAS操作的性能:

类名 类作用
DoubleAdder 保障整型字段的更新操作的原子性
LongAdder 保障长整型字段的更新操作的原子性。

看下它的使用方式:

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.LongAdder;

/**
 * @author kdyzm
 * @date 2024/9/27
 */
@Slf4j
@Data
public class LongAdderTest {

    private static LongAdder count = new LongAdder();

    private static final int THREAD_COUNT = 10;

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
        for (int i = 0; i < THREAD_COUNT; i++) {
            CompletableFuture.runAsync(() -> {
                log.info("正在执行任务:{}", Thread.currentThread().getName());
                for (int j = 0; j < 10000; j++) {
                    count.add(1);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        log.info("十个线程共自增十万次结果:{}", count.sum());
    }
}

运行结果:

image-20240927102644059

它和以前的基础类型原子类用法有不同:

  1. 不能初始化指定值,构造函数无参,而且也没有set方法设置值
  2. 获取计算后的结果,需要调用sum()方法,没有别的原子类中的get方法,这和它内部的存储结构有关系。

6、原子累加器

相对于原子加法器,原子累加器的功能更加强大:

  1. 能够指定初始值,而原子加法器只能默认初始值为0
  2. 能够自定义双目运算规则,而不仅仅是加法运算
类名 类作用
DoubleAccumulator 保障整型字段的更新操作的原子性
LongAccumulator 保障长整型字段的更新操作的原子性。

以LongAccumulator为例

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.LongAccumulator;

/**
 * @author kdyzm
 * @date 2024/9/27
 */
@Slf4j
public class LongAccumulatorTest {

    private static final int THREAD_COUNT = 10;

    public static void main(String[] args) throws InterruptedException {
        LongAccumulator count = new LongAccumulator(Long::sum, 0L);
        CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
        for (int i = 0; i < THREAD_COUNT; i++) {
            CompletableFuture.runAsync(() -> {
                log.info("正在执行任务:{}", Thread.currentThread().getName());
                for (int j = 0; j < 10000; j++) {
                    count.accumulate(1L);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        log.info("十个线程共自增十万次结果:{}", count.get());
    }
}

运行结果:

image-20240927104048407



END.


#java #多线程编程
目录