Featured image of post Java并发编程

Java并发编程

Java线程池统一管理ThreadPoolConfig

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
 * 通用线程池
 */
@Configuration
@EnableAsync
public class ThreadPoolExecutorConfig {

    @Bean
    public Executor threadPoolExecutor() {
        ThreadPoolTaskExecutor threadPoolExecutor = new ThreadPoolTaskExecutor();
        int processNum = Runtime.getRuntime().availableProcessors(); // 返回可用处理器的Java虚拟机的数量
        int corePoolSize = (int) (processNum / (1 - 0.2));
        int maxPoolSize = (int) (processNum / (1 - 0.5));
        threadPoolExecutor.setCorePoolSize(corePoolSize); // 核心池大小
        threadPoolExecutor.setMaxPoolSize(maxPoolSize); // 最大线程数
        threadPoolExecutor.setQueueCapacity(maxPoolSize * 1000); // 队列程度
        threadPoolExecutor.setThreadPriority(Thread.MAX_PRIORITY); // 线程可以具有的最大优先级。
        threadPoolExecutor.setDaemon(false);
        threadPoolExecutor.setKeepAliveSeconds(300); // 超过核心线程数的空闲线程,等待新任务的最大时间,超时将被终止
        threadPoolExecutor.setThreadNamePrefix("业务名-"); // 线程名字前缀
        return threadPoolExecutor;
    }

}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
 * 线程池统一管理
 */
public class ThreadPoolConfig {

    /**
     * gps消费线程池
     */
    private static ThreadPoolExecutor gpsConsumerThreadPool = null;

    public static ThreadPoolExecutor gpsConsumerThreadPool() {
        if (gpsConsumerThreadPool == null) {
            gpsConsumerThreadPool = new ThreadPoolExecutor(
                    4,
                    50,
                    10,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(100),
                    new ThreadPoolExecutor.CallerRunsPolicy()
            );
        }
        return gpsConsumerThreadPool;
    }

    /**
     * sql查询线程池
     */
    private static ThreadPoolExecutor sqlQueryThreadPool = null;

    public static ThreadPoolExecutor sqlQueryThreadPool() {
        if (sqlQueryThreadPool == null) {
            sqlQueryThreadPool = new ThreadPoolExecutor(
                    2,
                    50,
                    10,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(100),
                    new ThreadPoolExecutor.CallerRunsPolicy()
            );
        }
        return sqlQueryThreadPool;
    }

    /**
     * 预约配置生成线程池
     */
    private static ThreadPoolExecutor bookScheduleThreadPool = null;

    public static ThreadPoolExecutor bookScheduleThreadPool() {
        if (bookScheduleThreadPool == null) {
            bookScheduleThreadPool = new ThreadPoolExecutor(
                    2,
                    10,
                    30,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(5),
                    new ThreadPoolExecutor.CallerRunsPolicy()
            );
        }
        return bookScheduleThreadPool;
    }
}

Java线程池基本使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
 * 线程池使用注意(最佳实践):https://javaguide.cn/java/concurrent/java-thread-pool-best-practices.html
 * 两个开源线程池框架:
 * Hippo4jopen in new window:异步线程池框架,支持线程池动态变更&监控&报警,无需修改代码轻松引入。支持多种使用模式,轻松引入,致力于提高系统运行保障能力。
 * Dynamic TPopen in new window:轻量级动态线程池,内置监控告警功能,集成三方中间件线程池管理,基于主流配置中心(已支持 Nacos、Apollo,Zookeeper、Consul、Etcd,可通过 SPI 自定义实现)。
 */
@Slf4j
@RequiredArgsConstructor
class 线程池创建 {
    // 方式一(推荐) 全局配置spring线程池 com.zx.alice.config.ThreadPoolExecutorConfig.threadPoolExecutor
    private final ThreadPoolExecutor threadPoolExecutor;
    public static void main(String[] args) {

        // 方式二(推荐) 全参构造器
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5,
                49,
                2,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(51)
        );
        for (int i = 0; i < 100; i++) {
            threadPool.execute(new MyRunnable("" + i));
        }

        // 开启一个线程,实时监控线程池的状态
        ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(2);
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            log.info("=========================");
            log.info("核心线程数: [{}]", threadPool.getPoolSize());
            log.info("活跃线程数: {}", threadPool.getActiveCount());
            log.info("已完成任务数: {}", threadPool.getCompletedTaskCount());
            log.info("等待队列数: {}", threadPool.getQueue().size());
            log.info("=========================");
        }, 0, 1, TimeUnit.SECONDS);

        // shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。
        threadPool.shutdown();
        // shutdownNow() :关闭线程池,线程池的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。isTerminated() VS isShutdown()
        threadPool.shutdownNow();

        // isShutDown 当调用 shutdown() 方法后返回为 true。
        threadPool.isShutdown();
        // isTerminated 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true几种常见的内置线程池
        threadPool.isTerminated();

        // 方式二(不推荐)
        // 1.固定线程数    使用无界队列,容易OOM
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
        ExecutorService fixedThreadPool2 = Executors.newFixedThreadPool(10);
        // 2.只有一个线程      使用无界队列,容易OOM
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        ExecutorService singleThreadExecutor2 = Executors.newSingleThreadExecutor();
        // 3.会根据需要创建新线程的线程池       使用同步队列,允许创建最大线程数线程池,当提交任务速度 高于 线程处理任务速度 时,会不断创建线程,耗尽CPU和OOM
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        ExecutorService cachedThreadPool2 = Executors.newCachedThreadPool();
        // 4.延迟或定期执行    使用 延迟阻塞队列
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        ScheduledExecutorService scheduledThreadPool2 = Executors.newScheduledThreadPool(5);
    }
}

Java线程池submit使用

示例 1:使用 get()方法获取返回值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
ExecutorService executorService = Executors.newFixedThreadPool(3);
Future<String> submit = executorService.submit(() -> {
    try {
        Thread.sleep(5000L);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return "abc";
});
System.out.println(submit.get());
executorService.shutdown();
// abc

示例 2:使用 get(long timeout,TimeUnit unit)方法获取返回值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
ExecutorService executorService2 = Executors.newFixedThreadPool(3);
Future<String> submit2 = executorService2.submit(() -> {
    try {
        Thread.sleep(5000L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "abc";
});
System.out.println(submit2.get(3, TimeUnit.SECONDS));
executorService2.shutdown();
// Exception in thread "main" java.util.concurrent.TimeoutException
//  at java.util.concurrent.FutureTask.get(FutureTask.java:205)

原子类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
final AtomicInteger ai = new AtomicInteger(0);
ai.getAndIncrement();
ai.getAndSet(3);
ai.getAndAdd(3);
int i = ai.compareAndExchange(0, 1);
boolean b = ai.compareAndSet(0, 1);

final AtomicIntegerArray aia = new AtomicIntegerArray(new int[]{1, 2, 3, 4, 5, 6});
int e = aia.get(0); // 1
int e1 = aia.getAndSet(0, 2); // 1
int e2 = aia.getAndIncrement(0); // 2
int e3 = aia.getAndAdd(0, 5); // 3
aia.compareAndSet(0, 8, 99); // [99, 2, 3, 4, 5, 6]

AtomicReference<User> ar = new AtomicReference<>();
User user = new User();
User user2 = new User();
ar.set(user);
ar.compareAndSet(user, user2);

AtomicIntegerFieldUpdater<User> aifu = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
User user1 = new User();
user1.setAge(22);
System.out.println(aifu.getAndIncrement(user)); // 22
System.out.println(aifu.get(user)); // 23

final String ref = "A";
final boolean mark = false;
final AtomicMarkableReference<String> amr = new AtomicMarkableReference<>(ref, mark);
amr.set("B", true); // 重新设置当前值和 mark 值
amr.compareAndSet(ref, "B", mark, true);
// 获取当前的ref和mark, 当前ref直接返回,当前mark存入mark[0]
boolean[] marks = new boolean[1];
String ref1 = amr.get(marks);
boolean mark1 = marks[0];
amr.attemptMark(ref, true); // 尝试设置mark值

final int stamp = 0;
final AtomicStampedReference<String> asr = new AtomicStampedReference<>(ref, stamp);
asr.set("B", 1);  // 重新设置当前值和 stamp 值
asr.compareAndSet(ref, "B", 0, 1);
// 获取当前的ref和stamp, 当前ref直接返回,当前stamp存入stamp[0]
int[] stamps = new int[1];
String ref2 = asr.get(stamps);
int stamp2 = stamps[0];
asr.attemptStamp(ref, 88); // 尝试设置stamp值

Java并发容器的创建

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
ConcurrentHashMap<String, String> concurrentHashMap = new ConcurrentHashMap<>();

LinkedHashMap<String, String> linkedHashMap = new LinkedHashMap<>();

CopyOnWriteArrayList<String> copyOnWriteArrayList = new CopyOnWriteArrayList<>();

ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(50);

PriorityQueue<String> priorityQueue = new PriorityQueue<>();

DelayQueue<Delayed> delayQueue = new DelayQueue<>();

ConcurrentSkipListMap<String, String> concurrentSkipListMap = new ConcurrentSkipListMap<>();

CompletableFuture

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
private final DemoRepository demoRepository;

CompletableFuture<List<Demo>> demo1Get = CompletableFuture
        .supplyAsync(() -> demoRepository.findAll(), ThreadPoolConfig.sqlQueryThreadPool());
CompletableFuture<List<Demo>> demo2Get = CompletableFuture
        .supplyAsync(() -> demoRepository.findAll(), ThreadPoolConfig.sqlQueryThreadPool());

CompletableFuture.allOf(demo1Get, demo2Get);

List<Demo> demos1 = demo1Get.get(5, TimeUnit.SECONDS);
List<Demo> demos2 = demo2Get.get(5, TimeUnit.SECONDS);

CountDownLatch

倒计时器,可以将多个线程阻塞在同一个地方

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
ExecutorService threadPool = Executors.newFixedThreadPool(300); // 线程数量给太少,执行会很慢
int threadCount = 550;
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
    final int threadNum = i;
    threadPool.execute(() -> {
        try {
            test(threadNum);
        } catch (InterruptedException e) {
            log.error("执行异常!!!");
            throw new RuntimeException(e);
        } finally {
            countDownLatch.countDown(); // 表示一个请求已经被完成
        }
    });
}
countDownLatch.await();
threadPool.shutdown();
        
public static void test(int threadNum) throws InterruptedException {
    Thread.sleep(1000);
    System.out.println("threadNum:" + threadNum);
    Thread.sleep(1000);
}

CyclicBarrier

CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,

但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

private final CyclicBarrier cyclicBarrier = new CyclicBarrier(5,
        () -> System.out.println("------当线程数达到之后,优先执行------"));

ExecutorService threadPool = Executors.newFixedThreadPool(10);
int threadCount = 10;
for (int i = 0; i < threadCount; i++) {
    final int threadNum = i;
    Thread.sleep(1000);
    threadPool.execute(() -> {
        try {
            System.out.println("线程" + threadNum + "准备好了...");
            // 阻塞后面的代码,等待所有线程都到达屏障点
            cyclicBarrier.await(); // 一直阻塞
            // cyclicBarrier.await(60, TimeUnit.SECONDS); // 阻塞一段时间
            System.out.println("线程" + threadNum + "完成了.");
        } catch (Exception e) {
            log.error("执行异常!!!");
            throw new RuntimeException(e);
        }
    });
}
threadPool.shutdown();

Semaphore

控制同时访问特定资源的线程数量。

应用场景:资源池管理、限流器实现、互斥锁模拟、共享资源保护、负载均衡、流量控制、并发任务调度、公平性访问

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ExecutorService threadPool = Executors.newFixedThreadPool(300);
    final Semaphore semaphore = new Semaphore(20);
    for (int i = 0; i < 500; i++) {
        final int threadNum = i;
        threadPool.execute(() -> {
            try {
                semaphore.acquire(); // 获取一个许可,所以可运行线程数量为20/1=20
                test(threadNum);
                semaphore.release(); // 释放一个许可
            } catch (InterruptedException e) {
                log.error("执行异常!!!");
                throw new RuntimeException(e);
            }
        });
    }
    threadPool.shutdown();
}

public static void test(int threadNum) throws InterruptedException {
    Thread.sleep(1000);
    System.out.println("threadNum:" + threadNum);
    Thread.sleep(1000);
}
Licensed under CC BY-NC-SA 4.0
皖ICP备2024056275号-1
发表了78篇文章 · 总计149.56k字
本站已稳定运行