线程同步工具类

简介:

写在前面

同步工具类主要包括闭锁(如CountDownLatch),栅栏(如CyclicBarrier),信号量(如Semaphore)和阻塞队列(如LinkedBlockingQueue)等;

使用同步工具类可以协调线程的控制流;

同步工具类封装了一些状态,这些状态决定线程是继续执行还是等待,此外同步工具类还提供了修改状态的方法;

下面将简单介绍以上同步工具类;

闭锁

可以让一个线程等待一组事件发生后(不一定要线程结束)继续执行;

以CountDownLatch为例,内部包含一个计数器,一开始初始化为一个整数(事件个数),发生一个事件后,调用countDown方法,计数器减1,await用于等待计数器为0后继续执行当前线程;

举个例子如下,main线程等待其它子线程的事件发生后继续执行main线程:

复制代码
package concurrency;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class TaskTest implements Runnable {

    private CountDownLatch latch;
    private int sleepTime;

    /**
     * 
     */
    public TaskTest(int sleepTime, CountDownLatch latch) {
        this.sleepTime = sleepTime;
        this.latch = latch;
    }

    /**
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {
        try {
            CountDownLatchTest.print(" is running。");
            TimeUnit.MILLISECONDS.sleep(sleepTime);
            CountDownLatchTest.print(" finished。");
            //计数器减减
            latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

public class CountDownLatchTest {
    public static void main(String[] args) {
        int count = 10;
        final CountDownLatch latch = new CountDownLatch(count);
        ExecutorService es = Executors.newFixedThreadPool(count);
        for (int i = 0; i < count; i++) {
            es.execute(new TaskTest((i + 1) * 1000, latch));
        }

        try {
            CountDownLatchTest.print(" waiting...");
            //主线程等待其它事件发生
            latch.await();
            //其它事件已发生,继续执行主线程
            CountDownLatchTest.print(" continue。。。");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            es.shutdown();
        }
    }
    
    public static void print(String str){
        SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
        System.out.println("[" + dfdate.format(new Date()) + "]" + Thread.currentThread().getName() + str);
    }
}
复制代码

结果打印如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
[09:41:43]pool-1-thread-1  is  running。
[09:41:43]pool-1-thread-6  is  running。
[09:41:43]main waiting...
[09:41:43]pool-1-thread-10  is  running。
[09:41:43]pool-1-thread-4  is  running。
[09:41:43]pool-1-thread-5  is  running。
[09:41:43]pool-1-thread-2  is  running。
[09:41:43]pool-1-thread-3  is  running。
[09:41:43]pool-1-thread-7  is  running。
[09:41:43]pool-1-thread-8  is  running。
[09:41:43]pool-1-thread-9  is  running。
[09:41:44]pool-1-thread-1 finished。
[09:41:45]pool-1-thread-2 finished。
[09:41:46]pool-1-thread-3 finished。
[09:41:47]pool-1-thread-4 finished。
[09:41:48]pool-1-thread-5 finished。
[09:41:49]pool-1-thread-6 finished。
[09:41:50]pool-1-thread-7 finished。
[09:41:51]pool-1-thread-8 finished。
[09:41:52]pool-1-thread-9 finished。
[09:41:53]pool-1-thread-10 finished。
[09:41:53]main  continue 。。。

 此外,FutureTask也可用作闭锁,其get方法会等待任务完成后返回结果,否则一直阻塞直到任务完成;

信号量

控制同时执行某个指定操作的数量,常用于实现资源池,如数据库连接池,线程池...
以Semaphore为例,其内部维护一组资源,可以通过构造函数指定数目,其它线程在执行的时候,可以通过acquire方法获取资源,有的话,继续执行(使用结束后释放资源),没有资源的话将阻塞直到有其它线程调用release方法释放资源;

举个例子,如下代码,十个线程竞争三个资源,一开始有三个线程可以直接运行,剩下的七个线程只能阻塞等到其它线程使用资源完毕才能执行;

复制代码
package concurrency;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreTest {
    
    public static void print(String str){
        SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
        System.out.println("[" + dfdate.format(new Date()) + "]" + Thread.currentThread().getName() + str);
    }
    
    public static void main(String[] args) {
        // 线程数目
        int threadCount = 10;
        // 资源数目
        Semaphore semaphore = new Semaphore(3);
        
        ExecutorService es = Executors.newFixedThreadPool(threadCount);

        // 启动若干线程
        for (int i = 0; i < threadCount; i++)
            es.execute(new ConsumeResourceTask((i + 1) * 1000, semaphore));
    }
}

class ConsumeResourceTask implements Runnable {
    private Semaphore semaphore;
    private int sleepTime;

    /**
         * 
         */
    public ConsumeResourceTask(int sleepTime, Semaphore semaphore) {
        this.sleepTime = sleepTime;
        this.semaphore = semaphore;
    }

    public void run() {
        try {
            //获取资源
            semaphore.acquire();
            SemaphoreTest.print(" 占用一个资源...");
            TimeUnit.MILLISECONDS.sleep(sleepTime);
            SemaphoreTest.print(" 资源使用结束,释放资源");
            //释放资源
            semaphore.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
复制代码
复制代码
[10:30:11]pool-1-thread-1 占用一个资源...
[10:30:11]pool-1-thread-2 占用一个资源...
[10:30:11]pool-1-thread-3 占用一个资源...
[10:30:12]pool-1-thread-1 资源使用结束,释放资源
[10:30:12]pool-1-thread-4 占用一个资源...
[10:30:13]pool-1-thread-2 资源使用结束,释放资源
[10:30:13]pool-1-thread-5 占用一个资源...
[10:30:14]pool-1-thread-3 资源使用结束,释放资源
[10:30:14]pool-1-thread-8 占用一个资源...
[10:30:16]pool-1-thread-4 资源使用结束,释放资源
[10:30:16]pool-1-thread-6 占用一个资源...
[10:30:18]pool-1-thread-5 资源使用结束,释放资源
[10:30:18]pool-1-thread-9 占用一个资源...
[10:30:22]pool-1-thread-8 资源使用结束,释放资源
[10:30:22]pool-1-thread-7 占用一个资源...
[10:30:22]pool-1-thread-6 资源使用结束,释放资源
[10:30:22]pool-1-thread-10 占用一个资源...
[10:30:27]pool-1-thread-9 资源使用结束,释放资源
[10:30:29]pool-1-thread-7 资源使用结束,释放资源
[10:30:32]pool-1-thread-10 资源使用结束,释放资源
复制代码

栅栏

栅栏用于等待其它线程,且会阻塞自己当前线程;

所有线程必须同时到达栅栏位置后,才能继续执行;

举个例子如下:

复制代码
package concurrency;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class CyclicBarrierTaskTest implements Runnable {
    private CyclicBarrier cyclicBarrier;

    private int timeout;

    public CyclicBarrierTaskTest(CyclicBarrier cyclicBarrier, int timeout) {
        this.cyclicBarrier = cyclicBarrier;
        this.timeout = timeout;
    }

    @Override
    public void run() {
        TestCyclicBarrier.print(" 正在running...");
        try {
            TimeUnit.MILLISECONDS.sleep(timeout);
            TestCyclicBarrier.print(" 到达栅栏处,等待其它线程到达");
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }

        TestCyclicBarrier.print(" 所有线程到达栅栏处,继续执行各自线程任务...");
    }
}

public class TestCyclicBarrier {

    public static void print(String str) {
        SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
        System.out.println("[" + dfdate.format(new Date()) + "]"
                + Thread.currentThread().getName() + str);
    }

    public static void main(String[] args) {
        int count = 5;
        
        ExecutorService es = Executors.newFixedThreadPool(count);

        CyclicBarrier barrier = new CyclicBarrier(count, new Runnable() {

            @Override
            public void run() {
                TestCyclicBarrier.print(" 所有线程到达栅栏处,可以在此做一些处理...");
            }
        });
        for (int i = 0; i < count; i++)
            es.execute(new CyclicBarrierTaskTest(barrier, (i + 1) * 1000));
    }

}
复制代码
复制代码
[11:07:00]pool-1-thread-2 正在running...
[11:07:00]pool-1-thread-1 正在running...
[11:07:00]pool-1-thread-5 正在running...
[11:07:00]pool-1-thread-3 正在running...
[11:07:00]pool-1-thread-4 正在running...
[11:07:01]pool-1-thread-1 到达栅栏处,等待其它线程到达
[11:07:02]pool-1-thread-2 到达栅栏处,等待其它线程到达
[11:07:03]pool-1-thread-3 到达栅栏处,等待其它线程到达
[11:07:04]pool-1-thread-4 到达栅栏处,等待其它线程到达
[11:07:05]pool-1-thread-5 到达栅栏处,等待其它线程到达
[11:07:05]pool-1-thread-5 所有线程到达栅栏处,可以在此做一些处理...
[11:07:05]pool-1-thread-1 所有线程到达栅栏处,继续执行各自线程任务...
[11:07:05]pool-1-thread-2 所有线程到达栅栏处,继续执行各自线程任务...
[11:07:05]pool-1-thread-5 所有线程到达栅栏处,继续执行各自线程任务...
[11:07:05]pool-1-thread-3 所有线程到达栅栏处,继续执行各自线程任务...
[11:07:05]pool-1-thread-4 所有线程到达栅栏处,继续执行各自线程任务...
复制代码

阻塞队列

阻塞队列提供了可阻塞的入队和出对操作,如果队列满了,入队操作将阻塞直到有空间可用,如果队列空了,出队操作将阻塞直到有元素可用;

队列可以为有界和无界队列,无界队列不会满,因此入队操作将不会阻塞;

下面将使用阻塞队列LinkedBlockingQueue举个生产者-消费者例子,生产者每隔1秒生产1个产品,然后有6个消费者在消费产品,可以发现,每隔1秒,只有一个消费者能够获取到产品消费,其它线程只能等待...

如下代码:

复制代码
package concurrency;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

//生产者
public class Producer implements Runnable {
    private final BlockingQueue<String> fileQueue;

    public Producer(BlockingQueue<String> queue) {
        this.fileQueue = queue;

    }

    public void run() {
        try {
            while (true) {
                TimeUnit.MILLISECONDS.sleep(1000);
                String produce = this.produce();
                System.out.println(Thread.currentThread() + "生产:" + produce);
                fileQueue.put(produce);
            }

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public String produce() {
        SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
        return dfdate.format(new Date());
    }

    public static void main(String[] args) {
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);

        for (int i = 0; i < 1; i++) {
            new Thread(new Producer(queue)).start();
        }
        for (int i = 0; i < 6; i++) {
            new Thread(new Consumer(queue)).start();
        }
    }
}

// 消费者
class Consumer implements Runnable {
    private final BlockingQueue<String> queue;

    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            while (true) {
                TimeUnit.MILLISECONDS.sleep(1000);
                System.out.println(Thread.currentThread() + "prepare 消费");
                System.out.println(Thread.currentThread() + "starting:"
                        + queue.take());
                System.out.println(Thread.currentThread() + "end 消费");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
复制代码
复制代码
Thread[Thread-1,5,main]prepare 消费
Thread[Thread-3,5,main]prepare 消费
Thread[Thread-4,5,main]prepare 消费
Thread[Thread-2,5,main]prepare 消费
Thread[Thread-6,5,main]prepare 消费
Thread[Thread-5,5,main]prepare 消费
Thread[Thread-0,5,main]生产:11:36:36
Thread[Thread-1,5,main]starting:11:36:36
Thread[Thread-1,5,main]end 消费
Thread[Thread-1,5,main]prepare 消费
Thread[Thread-0,5,main]生产:11:36:37
Thread[Thread-4,5,main]starting:11:36:37
Thread[Thread-4,5,main]end 消费
Thread[Thread-4,5,main]prepare 消费
Thread[Thread-0,5,main]生产:11:36:38
Thread[Thread-3,5,main]starting:11:36:38
Thread[Thread-3,5,main]end 消费
...
复制代码

 参考资料:java并发编程实战

本文转自风一样的码农博客园博客,原文链接:http://www.cnblogs.com/chenpi/p/5358579.html,如需转载请自行联系原作者
相关文章
|
11月前
|
Java 大数据 开发者
大数据开发基础的编程语言的Java的并发/多线程编程的JUC并发工具类
在Java并发编程中,JUC(java.util.concurrent)包中的并发工具类提供了各种功能强大的工具来协调多线程之间的执行和通信。本文将介绍Java的JUC并发工具类以及如何使用它们进行多线程编程。
138 0
|
12月前
|
Java
浅谈Java线程池中的ThreadPoolExecutor工具类
浅谈Java线程池中的ThreadPoolExecutor工具类
327 0
|
Java Maven
java 异步线程监听、结果回调、异常捕获 | Java工具类
java 异步线程监听、结果回调、异常捕获 | Java工具类
java 异步线程监听、结果回调、异常捕获 | Java工具类
|
前端开发 Java 程序员
【线程池工具类】打卡学习Java线程池(案例详解)
本期给大家分享线程池在Java项目中的真实案例,欢迎打卡!
404 0
【线程池工具类】打卡学习Java线程池(案例详解)
|
数据库连接
防止多线程同时操作一个资源,不能不学的JUC工具类: Semaphore详解
在工作中我们经常需要考虑对资源的使用,避免资源被过度使用或者资源没有被利用到而造成的问题,那我们该如何去限制访问某些资源的线程数目,从而对完成资源的保护。
|
算法 安全 Java
阻塞队列、线程池、原子性及并发工具类
阻塞队列、线程池、原子性及并发工具类的简单示例
119 1
阻塞队列、线程池、原子性及并发工具类
|
Java
一文理解java线程间协作问题的工具类Exchanger
在很久之前我曾写过一篇一篇文章介绍线程间如何进行通信的问题,当时使用的是等待通知模型,这篇文章介绍一个java提供的用于两个线程间通信的工具类Exchanger。
158 0
一文理解java线程间协作问题的工具类Exchanger
|
Java Maven
java 多线程分段等待执行完成状况,循环屏障CyclicBarrier | Java工具类
java 多线程分段等待执行完成状况,循环屏障CyclicBarrier | Java工具类
|
Java Maven
java 优雅的实现多线程等待,可重用的同步屏障Phaser | Java工具类
java 优雅的实现多线程等待,可重用的同步屏障Phaser | Java工具类
java 优雅的实现多线程等待,可重用的同步屏障Phaser | Java工具类
|
Java Maven
java多线程提交,如何按照时间顺序获取线程结果,看完你就懂了 | Java工具类
java多线程提交,如何按照时间顺序获取线程结果,看完你就懂了 | Java工具类
java多线程提交,如何按照时间顺序获取线程结果,看完你就懂了 | Java工具类

热门文章

最新文章