java并发编程笔记3-同步容器&并发容器&闭锁&栅栏&信号量

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: 一.同步容器:   1.Vector容器实现了List接口,Vector实际上就是一个数组,和ArrayList类似,但是Vector中的方法都是synchronized方法,即进行了同步措施。保证了线程安全。

一.同步容器:

  1.Vector容器实现了List接口,Vector实际上就是一个数组,和ArrayList类似,但是Vector中的方法都是synchronized方法,即进行了同步措施。保证了线程安全。源码如下图:

可以看到这些方法都加了synchronized。即加了同步操作。

  2.Hashtable集合。HashTable实现了Map接口,它和HashMap很相似,但是HashTable进行了同步处理,而HashMap没有源码如下:

可以看到HashTable的实现方法也用到了synchronized同步。

 

  3.Collections类中提供的静态工厂方法创建的类,在Collections类中提供了大量的方法,比如对集合或者容器进行排序、查找等操作。最重要的是,在它里面提供了几个静态工厂方法来创建同步容器类;源码如下图:

可以看到Collection类的静态工厂方法的内部实现也是使用了synchronized同步。

 

同步容器都是线程安全的。但是对于复合操作(迭代、缺少即加入、导航:根据一定的顺序寻找下一个元素),有时可能需要使用额外的客户端加锁进行保护。在一个同步容器中,复合操作是安全的。但是当其他线程能够并发修改容器的时候,它们就可能不会按照期望工作了。

例如:

几个单局语句是原子性的,但是一复合就不是在原子性的了。因为几条原子性的语句合起来就存在了时间差,这就出现线程安全的问题了,因为当一个线程进行删除容器中某个元素时,另外一个线程比这个线程已经先一步删除了该元素,这是导致抛出并发修改异常。

注意,并不是多线程才出现并发修改异常,单线程也会,比如,一个List,用iterator进行迭代,然后你没用iterator.remove()方法,而是直接用list.remove()方法就会发生并发修改异常。

这是就要手工

进行加synchronized了,如下:

 

  有一些原因造成我们不愿意在迭代期间对容器加锁,当其它线程需要访问容器时,必须等待,直到迭代结束,如果容器很大,或者对每一个元素执行的任务耗时比较长,它们可能需要等待很长一段时间。另外,如果对元素的操作还要持有另一个锁,这是一个产生死锁风险的因素。在迭代期间,对容器加锁的一个替代方法是复制容器,因为复制是线程限制的,没有其他的线程能够在迭代期间对其进行修改,这样就消除了ConcurrentModificationException发生的可能性。但是如果容器非常大,复制起来这就非常消耗性能。

同步容器中的方法采用了synchronized进行了同步,这必然会影响到执行性能.同步容器将所有对容器状态的访问都串行化了,这样保证了线程的安全性,但代价就是严重降低了并发性,当多个线程竞争容器时,吞吐量严重降低。

 

二.并发容器

java5.0开始针对多线程并发访问设计,提供了并发性能较好的并发容器,引入了java.util.concurrent包。主要解决了两个问题:

  1).根据具体场景进行设计,尽量避免synchronized,提供并发性。

  2).定义了一些并发安全的复合操作,并且保证并发环境下的迭代操作不会出错。

1.ConcurrentHashMap并发容器,来替代同步的哈希Map实现。ConcurrentHashMap实现采用了散列机制,但是采用了分段锁(Lock Striping)机制提供了并发性能。其内部的结构可以让其在进行写操作的时候能够将锁的粒度保持地尽量地小,不用对整个ConcurrentHashMap加锁。

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成,分段锁是对Segments中每一个Segment加锁,Segment是一种可重入锁ReentrantLock,在ConcurrentHshMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构,一个Segment里包含一个HashEntry数组,每一个HashEntry是一个链表结构的元素,每个Segment守护着HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment的锁。

并发环境下实现更的吞吐量,而在单线程环境下只损失非常小的性能。ConcurrentHashMap结构如下图:

 

没有则增加:

 V putIfAbsent(key,value):表示如果不存在(新的entry),那么会向map中添加该键值对,并返回null。 
如果已经存在,那么不会覆盖已有的值,直接返回已经存在的值。

 

相等则移除:

boolean remove(Object key, Object value) :当key对应到指定的value时,才移除该key-value对。

 

相等则替换:

boolean replace(K key, V oldValue, V newValue) :当key对应到指定的value时,才替换key对应的value值。

 

拥有则替换:

V replace(K key,V value):只有目前将键的条目映射到某一值时,才替换该键的条目。


1.1ConcurrentHashMap的使用注意项目

ConcurrentHashMap 虽然为并发安全的组件,但是使用不当还是会导致程序错误,通过使用简单的案例来复现这些问题并给出开发时候如何进行避免的策略。

这里借用直播的一个场景,直播业务中,每个直播间对应一个 topic,每个用户进入直播间时候会把自己设备 id 绑定到这个 topic 上,也就是一个 topic 对应一堆用户设备,可知可以使用 map 来维护这些信息,key 为 topic,value 为设备的 list。下面通过代码模拟多用户同时进入直播间时候 map 信息的维护,代码如下:

public class ConcurrentHashMapTest {
    //(1)创建map,key为topic,value为设备列表
    static ConcurrentHashMap<String, List<String>> map = new ConcurrentHashMap<String, List<String>>();
    public static void main(String[] args) {
        //(2)进入直播间topic1 线程one
        Thread threadOne = new Thread(new  Runnable() {
            public void run() {
                List<String> list1 = new ArrayList<String>();
                list1.add("device1");
                list1.add("device2");

                map.put("topic1", list1);
                System.out.println(JSON.toJSONString(map));
            }
        });
        //(3)进入直播间topic1 线程two
        Thread threadTwo = new Thread(new  Runnable() {
            public void run() {
                List<String> list1 = new ArrayList<String>();
                list1.add("device11");
                list1.add("device22");

                map.put("topic1", list1);

                System.out.println(JSON.toJSONString(map));
            }
        });

        //(4)进入直播间topic2 线程three
        Thread threadThree = new Thread(new  Runnable() {
            public void run() {
                List<String> list1 = new ArrayList<String>();
                list1.add("device111");
                list1.add("device222");

                map.put("topic2", list1);

                System.out.println(JSON.toJSONString(map));
            }
        });

        //(5)启动线程
        threadOne.start();
        threadTwo.start();
        threadThree.start();
    }
}

运行结果如下:

或者如下的运行结果:

可知 topic1 房间中的用户会丢失一部分,这是因为 put 方法如果发现 map 里面存在这个 key, 则使用 value 覆盖该 key 对应的老的 value 值,而 putIfAbsent 方法则如果已经存在该 key 则返回该 key 对应的 value 并不进行覆盖,如果不存在则会新增该 key,并且判断和写入是原子性操作。使用 putIfAbsent 替代 put 方法后代码如下:

public class ConcurrentHashMapTest1 {
    //(1)创建map,key为topic,value为设备列表
    static ConcurrentHashMap<String, List<String>> map = new ConcurrentHashMap<String, List<String>>();
    public static void main(String[] args) {
        //(2)进入直播间topic1 线程one
        Thread threadOne = new Thread(new  Runnable() {
            public void run() {
                List<String> list1 = new ArrayList<String>();
                list1.add("device1");
                list1.add("device2");
                //(2.1)
                List<String> oldList = map.putIfAbsent("topic1", list1);
                if(null != oldList){
                    oldList.addAll(list1);
                }
                System.out.println(JSON.toJSONString(map));
            }
        });
        //(3)进入直播间topic1 线程two
        Thread threadTwo = new Thread(new  Runnable() {
            public void run() {
                List<String> list1 = new ArrayList<String>();
                list1.add("device11");
                list1.add("device22");

                List<String> oldList = map.putIfAbsent("topic1", list1);
                if(null != oldList){
                    oldList.addAll(list1);
                }

                System.out.println(JSON.toJSONString(map));
            }
        });

        //(4)进入直播间topic2 线程three
        Thread threadThree = new Thread(new  Runnable() {
            public void run() {
                List<String> list1 = new ArrayList<String>();
                list1.add("device111");
                list1.add("device222");

                List<String> oldList = map.putIfAbsent("topic2", list1);
                if(null != oldList){
                    oldList.addAll(list1);
                }
                System.out.println(JSON.toJSONString(map));
            }
        });

        //(5)启动线程
        threadOne.start();
        threadTwo.start();
        threadThree.start();
    }
}

运行结果如下:

如上代码(2.1)使用 map.putIfAbsent 方法添加新设备列表,如果 topic1 在 map 中不存在则放入 topic1 和对应设备列表到 map,要注意的是这个判断不存在和放入是原子性操作,这时候放入后会返回 null。如果 topic1 已经在 map 里面存在,则调用 putIfAbsent 会返回 topic1 对应的设备里面,代码发现返回的设备列表不为 null 则把新的设备列表添加到返回的设备列表里面,从而问题得到解决。


总结:put(K key, V value) 方法如果 key 已经存在则使用 value 覆盖原来的值并返回原来的值,如果不存在则把 value 放入并返回 null。而 putIfAbsent(K key, V value) 方法如果 key 已经存在则直接返回原来对应的值并不使用 value 覆盖,如果 key 不存在则存入 value 并返回 null,另外要注意判断 key 不存在和存入是原子操作。 


2.CopyOnWriteArrayList/set并发容器

  CopyOnWrite容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。

这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。大部分用于读操作,写操作少,因为如果的数据很大,你每次进行写操作

都要进行拷贝,重新复制一份数组,这开销很大的。

源码如下:

 


CopyOnWrite容器有很多优点,但是同时也存在两个问题,即内存占用问题和数据一致性问题:

    1.内存问题:因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。

如果这些对象占用的内存比较大,比如说200M左右,那么再写入100M数据进去,内存就会占用300M,那么这个时候很有可能造成频繁的Yong GC和Full GC。

之前我们系统中使用了一个服务由于每晚使用CopyOnWrite机制更新大对象,造成了每晚15秒的Full GC,应用响应时间也随之变长。

 

    2.数据一致性问题:CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。

 

3.BlockingQueue阻塞队列并发容器:Java 5.0之后新增加了Queue(队列)和BlockingQueue(阻塞队列)。Queue的底层实现其实就是一个LinkedList。队列是典型的FIFO先进先出的实现。阻塞队列提供了很多现成的方法可以满足我们实现生产者—消费者模型。

生产者—消费者模型简单理解就是一个缓冲容器,协调生产者和消费者之间的关系。生产者生产数据扔到容器里,消费者直接从容器里消费数据,大家不需要关心彼此,只需要和容器打交道,这样就实现了生产者和消费者的解耦。

队列分为有界队列和无界队列,无界队列会因为数据的累计造成内存溢出,使用时要小心。阻塞队列有很多种实现,最常用的有ArrayBlockingQueue和LinkedBlockingQueue。

阻塞队列提供了阻塞的take和put方法,如果队列已满,那么put方法将等待队列有空间时在执行插入操作;如果队列为空,那么take方法将一直阻塞直到有元素可取。有界队列是一个强大的资源管理器,它能抑制产生过多的工作项,使程序更加健壮。

 

 三.闭锁(CountDownLatch)

  

闭锁相当于一扇门,在闭锁到达结束状态之前,这扇门一直是关闭着的,没有任何线程可以通过,当到达结束状态时,这扇门才会打开并容许所有线程通过。它可以使一个或多个线程等待一组事件发生。

闭锁状态包括一个计数器,初始化为一个正式,正数表示需要等待的事件数量。countDown方法递减计数器,表示一个事件已经发生,而await方法等待计数器到达0,表示等待的事件已经发生。

CountDownLatch强调的是一个线程(或多个)需要等待另外的n个线程干完某件事情之后才能继续执行。

应用场景:

    1、确保某个计算在其所有资源都被初始化之后才继续执行。二元闭锁(只有两个状态)可以用来表示“资源R已经被初始化”,而所有需要R操作都必须先在这个闭锁上等待。

    2、确保某个服务在所有其他服务都已经启动之后才启动。这时就需要多个闭锁。让S在每个闭锁上等待,只有所有的闭锁都打开后才会继续运行。

    3、等待直到某个操作的参与者(例如,多玩家游戏中的玩家)都就绪再继续执行。在这种情况下,当所有玩家都准备就绪时,闭锁将到达结束状态。

代码例子:

public class Test2 {

    public static void main(String[] args) {
        //参数代表等待线程的数量
        final CountDownLatch latch = new CountDownLatch(2);

        new Thread(){
            @Override
            public void run() {
                try {
                    System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
                    Thread.sleep(3000);
                    System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
                    //子线程完成
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();

        new Thread(){
            public void run() {
                try {
                    System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
                    Thread.sleep(3000);
                    System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
                    //子线程完成
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();

        try {
            System.out.println("等待2个子线程执行完毕。。。。");
            //进行等待
            latch.await();
            System.out.println("2个子线程已经执行完毕");
            System.out.println("继续执行主线程");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }

}

运行结果如下:

 

 可以看到主线程等待两个子线程执行完毕后,才能继续执行主线程。

 

四.栅栏

 栅栏(Bariier)类似于闭锁,它能阻塞一组线程知道某个事件发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待等待时间,而栅栏用于等待线程。

        CyclicBarrier 可以使一定数量的参与方反复的在栅栏位置汇聚,它在并行迭代算法中非常有用:将一个问题拆成一系列相互独立的子问题。当线程到达栅栏位置时,调用await() 方法,这个方法是阻塞方法,

直到所有线程到达了栅栏位置,那么栅栏被打开,此时所有线程被释放,而栅栏将被重置以便下次使用。

比方说数据写入的例子,多个子线程写数据库,必须都写完了,其他任务才能继续,如下图:

public class Test3 {

    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier = new CyclicBarrier(N);
        for (int i = 0;i<N;i++){
            new Writer(barrier).start();
        }
    }

    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier){
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据。。。。。");
            try {
                Thread.sleep(5000);//以睡眠来模拟写入数据操作
                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入");
                cyclicBarrier.await(); //await()的数量都达到了指定N时,才继续放行。
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务。。。。");
        }
    }

}

结果如下:

 

另一种形式的栅栏是Exchanger,它是一种两方(Two-Party)栅栏,各方在栅栏位置上交换数据。例如当一个线程想缓冲区写入数据,而另一个线程从缓冲区中读取数据。这些线程可以使用 Exchanger 来汇合,并将慢的缓冲区与空的缓冲区交换。当两个线程通过 Exchanger 交换对象时,这种交换就把这两个对象安全的发布给另一方。

Exchanger 可能被视为 SynchronousQueue 的双向形式。我们也可以用两个SynchronousQueue来实现 Exchanger的功能。

 

五。信号量

信号量用于对有限数量的资源的同时并发访问数进行控制。若有m个资源,但有n条线程(n>m),因此同一时刻只能允许m条线程访问资源,此时可以使用Semaphore控制访问该资源的线程数量。

闭锁控制访问的时间,而信号量则用来控制访问某个特定资源的操作数量,控制空间。而且闭锁只能够减少,一次性使用,而信号量则申请可释放,可增可减。 计数信号量还可以用来实现某种资源池,或者对容器施加边界。

        Semaphone 管理这一组许可(permit),可通过构造函数指定。同时提供了阻塞方法acquire,用来获取许可。同时提供了release方法表示释放一个许可。

        Semaphone 可以将任何一种容器变为有界阻塞容器,如用于实现资源池。例如数据库连接池。我们可以构造一个固定长度的连接池,使用阻塞方法 acquire和release获取释放连接,而不是获取不到便失败。

(当然,一开始设计时就使用BlockingQueue来保存连接池的资源是一种更简单的方法)

 

例子如下:

public class Test3 {

    public static void main(String[] args) {
        int N = 8;//工人数
        Semaphore semaphore = new Semaphore(5);//机器数
        for (int i = 0;i<N;i++){
            new Worker(i,semaphore).start();
        }
    }

    static class Worker extends Thread{
        private int num;
        private Semaphore semaphore;
        public Worker(int num,Semaphore semaphore){
            this.num = num;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println("工人"+this.num+"占用一个机器在生产。。。");
                Thread.sleep(2000);
                System.out.println("工人"+this.num+"释放机器");
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

运行结果如下:

可以看到信号量对有限数量的资源的同时并发访问数进行控制

目录
相关文章
|
12天前
|
Java
Java基础—笔记—static篇
`static`关键字用于声明静态变量和方法,在类加载时初始化,只有一份共享内存。静态变量可通过类名或对象访问,但推荐使用类名。静态方法无`this`,不能访问实例成员,常用于工具类。静态代码块在类加载时执行一次,用于初始化静态成员。
10 0
|
12天前
|
Java API 索引
Java基础—笔记—String篇
本文介绍了Java中的`String`类、包的管理和API文档的使用。包用于分类管理Java程序,同包下类无需导包,不同包需导入。使用API时,可按类名搜索、查看包、介绍、构造器和方法。方法命名能暗示其功能,注意参数和返回值。`String`创建有两种方式:双引号创建(常量池,共享)和构造器`new`(每次新建对象)。此外,列举了`String`的常用方法,如`length()`、`charAt()`、`equals()`、`substring()`等。
14 0
|
11天前
|
安全 算法 Java
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第11天】 在Java中,高效的并发编程是提升应用性能和响应能力的关键。本文将探讨Java并发的核心概念,包括线程安全、锁机制、线程池以及并发集合等,同时提供实用的编程技巧和最佳实践,帮助开发者在保证线程安全的前提下,优化程序性能。我们将通过分析常见的并发问题,如竞态条件、死锁,以及如何利用现代Java并发工具来避免这些问题,从而构建更加健壮和高效的多线程应用程序。
|
4天前
|
安全 Java
深入理解 Java 多线程和并发工具类
【4月更文挑战第19天】本文探讨了Java多线程和并发工具类在实现高性能应用程序中的关键作用。通过继承`Thread`或实现`Runnable`创建线程,利用`Executors`管理线程池,以及使用`Semaphore`、`CountDownLatch`和`CyclicBarrier`进行线程同步。保证线程安全、实现线程协作和性能调优(如设置线程池大小、避免不必要同步)是重要环节。理解并恰当运用这些工具能提升程序效率和可靠性。
|
4天前
|
安全 Java 开发者
Java并发编程:深入理解Synchronized关键字
【4月更文挑战第19天】 在Java多线程编程中,为了确保数据的一致性和线程安全,我们经常需要使用到同步机制。其中,`synchronized`关键字是最为常见的一种方式,它能够保证在同一时刻只有一个线程可以访问某个对象的特定代码段。本文将深入探讨`synchronized`关键字的原理、用法以及性能影响,并通过具体示例来展示如何在Java程序中有效地应用这一技术。
|
5天前
|
安全 Java 调度
Java并发编程:深入理解线程与锁
【4月更文挑战第18天】本文探讨了Java中的线程和锁机制,包括线程的创建(通过Thread类、Runnable接口或Callable/Future)及其生命周期。Java提供多种锁机制,如`synchronized`关键字、ReentrantLock和ReadWriteLock,以确保并发访问共享资源的安全。此外,文章还介绍了高级并发工具,如Semaphore(控制并发线程数)、CountDownLatch(线程间等待)和CyclicBarrier(同步多个线程)。掌握这些知识对于编写高效、正确的并发程序至关重要。
|
5天前
|
安全 Java 程序员
Java中的多线程并发编程实践
【4月更文挑战第18天】在现代软件开发中,为了提高程序性能和响应速度,经常需要利用多线程技术来实现并发执行。本文将深入探讨Java语言中的多线程机制,包括线程的创建、启动、同步以及线程池的使用等关键技术点。我们将通过具体代码实例,分析多线程编程的优势与挑战,并提出一系列优化策略来确保多线程环境下的程序稳定性和性能。
|
5天前
|
缓存 分布式计算 监控
Java并发编程:深入理解线程池
【4月更文挑战第17天】在Java并发编程中,线程池是一种非常重要的技术,它可以有效地管理和控制线程的执行,提高系统的性能和稳定性。本文将深入探讨Java线程池的工作原理,使用方法以及在实际开发中的应用场景,帮助读者更好地理解和使用Java线程池。
|
6天前
|
存储 安全 Java
Java中的容器,线程安全和线程不安全
Java中的容器,线程安全和线程不安全
13 1
|
6天前
|
Java 开发者
Java中多线程并发控制的实现与优化
【4月更文挑战第17天】 在现代软件开发中,多线程编程已成为提升应用性能和响应能力的关键手段。特别是在Java语言中,由于其平台无关性和强大的运行时环境,多线程技术的应用尤为广泛。本文将深入探讨Java多线程的并发控制机制,包括基本的同步方法、死锁问题以及高级并发工具如java.util.concurrent包的使用。通过分析多线程环境下的竞态条件、资源争夺和线程协调问题,我们提出了一系列实现和优化策略,旨在帮助开发者构建更加健壮、高效的多线程应用。
6 0