Java并发基础构建模块简介

简介:

在实际并发编程中,可以利用synchronized来同步线程对于共享对象的访问,用户需要显示的定义synchronized代码块或者方法。为了加快开发,可以使用Java平台一些并发基础模块来开发。

  注:关于容器类中的常见并发容器和同步容器的类图,详见另一篇文章Java集合框架图

一 同步容器类

   同步容器类主要包括Vector和Hashtable,都是通过Collections.synchronizedXxx工厂方法创建的。这些类都是线程安全的。一些修改删除添加方法都利用了synchronized来进行同步。将这些容器类的状态封装,然后对于其所有的public方法都进行同步,这样就保证了每次只能允许一个线程访问这些容器类。对于每个public方法都保证了原子性操作。

当对于同步容器类进行多个synchronized方法组成的复合操作,为了保证正确性,必须要确保这些复合操作为一个原子操作。

对于这些同步容器里进行迭代处理中,需要注意多个线程访问的问题。当利用for来迭代处理,容器的大小可能会在迭代中修改,则就会抛出ArrayIndexOutOfBoundsException异常。而在利用迭代器foreach运用interator过程中,可能会抛出ConcurrentModificationException。所以在对于这种同步容器类迭代过程中一定要进行加锁处理。

在对同步容器类调用toString,hashCode,equals,containsAll,removeAll,retainAll以及容器作为参数传递都会对容器进行迭代操作。

二 并发容器类

   由于同步容器类,只允许同时一个外部线程访问该集合,则降低了它的并发能力。就引出了并发容器类。并发容器类主要就是针对多线程访问设计的。

   常见的有ConcurrentHashMap以代替同步且基于散列的Map。CopyOnWriteArrayList用于以迭代为主要操作来代替List。Queue和BlockingQueue,其中BlockingQueue尤其在生产者与消费者模式中作为缓冲得到了很大的运用。

 ConcurrentHashMap,这是一种并发性容器类,这里并没有针对每一个方法使用同一个锁进行同步,而是在内部用一种分段锁来实现并发性操作。可以允许同时多个读线程操作,允许同时多个写线程操作,多个读线程与写线程同时操作。迭代过程不需要加锁,但是在迭代过程可能容量大小会发生变化。这种最重要的是用于针对get,put,containsKey和remove等操作频繁的多线程中。

它增加了几个复合型的原子性操作,从而可以直接使用不用加锁。

   CopyOnWriteArrayList,这是一种并发性容器类,迭代过程不需要加锁。它是“写入时复制”的容器,当在每次修改时候,都会复制底层数组,创建并发布一个新的容器副本。这种最重要用于需要频繁的进行迭代操作,且迭代操作远远大于修改操作的时候,多个线程可以同时对这个容器进行迭代操作,而不会彼此干扰且与修改容器的线程不相干。

Queue,是一种用来保存临时的数据,包括ConcurrentLinkedQueue,Queue上的操作不会阻塞,如果队列为空,会立即返回。

   BlockingQueue,它主要用于并发操作方面。这是一种基于阻塞的队列,当从队列中获取元素时候,如果队列为空则等待,当向队列插入元素时候,如果队列满了则等待。它利用put和take来获取对象,这种操作都是在内部加锁机制实现的。常见的几个插入与移除对象操作:

  put,将object放入队列中,如果无空间,则一直等待到有空间,会阻塞调用该方法的线程。

  offer,如果可以放入object,则返回true,否则false。一会阻塞调用该方法的线程。

  poll,获取首位对象,若立即得不到,可以等待一定时间后再返回值或者null。

  take,获取首位对象,若队列为空,则一直等待有元素添加,会阻塞调用该方法的线程。

  常见的子类包括ArrayBlockingQueueLinkedBlockingQueue,分别用于替代LinkedList和ArrayList,提高并发性能。SynchronousQueue,这是一种同步移除与添加的队列,每个插入操作必须等待另一个线程的对应移除操作。同步队列没有任何内部容量,甚至连一个队列的容量都没有。

  这种最重要用于常见的生产者与消费者模式中,作为缓冲用。在我的一篇Java多线程设计模式(2)生产者与消费者模式》利用自定义数组大小来实现生产者与消费者模式,在ArrayBlockingQueue中其内部大体的实现机制就是和那一样的。

   利用ArrayBlockingQueue可以很容易实现生产者与消费者模式,不用进行额外的同步处理,因为它内部都已经实现了同步处理,并且进行了并发性能的提高。代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package  whut.producer;
import  java.util.Random;
import  java.util.concurrent.ArrayBlockingQueue;
import  java.util.concurrent.BlockingQueue;
//利用BlockingQueue实现消费者与生产者
class  Producer  implements  Runnable {
     private  final  BlockingQueue queue;
     public  Producer(BlockingQueue q) {
         queue = q;
     }
     public  void  run() {
         try  {
             int  i= 0 ;
             while  (i< 10 )
             {
                 queue.put(i);
                 i++;
             }
         catch  (InterruptedException ex) {
         }
     }
     private  Object produce()
     {
         Random rd= new  Random();
         int  res=rd.nextInt( 10 );
         return  res;
     }
}
class  Consumer  extends  Thread  {
     private  final  BlockingQueue queue;
     public  Consumer(String name,BlockingQueue q) {
         super (name);
         queue = q;
     }
     public  void  run() {
         try  {
             while  ( true ) {
                 consume(queue.take());
             }
         catch  (InterruptedException ex) {
         }
     }
     private  void  consume(Object x) {
         System.out.println(Thread.currentThread().getName()+ " = " +x);
     }
}
class  BlockingQueueDemo {
     public  static  void  main(String[] args) {
         BlockingQueue<Integer> q =  new  ArrayBlockingQueue<Integer>( 10 );
         Producer p =  new  Producer(q);
         Consumer c1 =  new  Consumer( "Apple" ,q);
         Consumer c2 =  new  Consumer( "Hawk" ,q);
         new  Thread(p).start();
         c1.start();
         c2.start();
     }
}


三 同步工具类

常见使用的同步工具类有信号量(Semaphore),栅栏(Barrier),闭锁(Latch).

  闭锁(Latch),可以延迟线程进度直到闭锁到达最终状态。闭锁主要用来确保某些活动直到其他活动都执行完毕后才能继续执行这里执行的形如递减操作,初始化等待活动的数目,当递减到0,则该活动才得以继续执行。

   CountDownLatch是一种灵活的闭锁实现,有个状态为计数器,利用构造器设置。通过countDown方法递减计数器,表示一个活动已经执行完毕,而await方法等待计数器为0,如果不为0则阻塞等待,或者线程被中断,或者等待超时。这两个方法必须成对使用。

CountDownLatch的简单实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package  whut.concurrentmodel;
import  java.util.concurrent.CountDownLatch;
//利用闭锁来实现,闭锁可以用于线程之间的协作,
//即一个线程必须等待其余所有活动完后执行
public  class  CountDownLatchClient {
     public  void  timeTasks( int  nThreads,  final  Runnable task)
             throws  InterruptedException {
         // 工作线程等待其他活动执行完毕的{闭锁}
         final  CountDownLatch startGate =  new  CountDownLatch( 1 );
         // 主线程等待所有工作线程执行完毕的{闭锁}
         final  CountDownLatch endGate =  new  CountDownLatch(nThreads);
         for  ( int  i =  0 ; i < nThreads; i++) {
             Thread t =  new  Thread() {
                 public  void  run() {
                     try  {
                         // 工作线程先等待其他活动执行完毕
                         startGate.await();
                         try  {
                             task.run();
                         finally  {
                             System.out.println(Thread.currentThread().getName()
                                     " work finished..." );
                             // 工作线程执行完毕后,递减闭锁值
                             endGate.countDown();
                         }
                     catch  (InterruptedException ie) {
                     }
                 }
             };
             t.start();
         }
         // 这里具体是任务,不过直接模拟了活动执行完毕了
         startGate.countDown();
         // 主线程先等待工作线程执行到0
         endGate.await();
         System.out.println( "All workthread have finished..." );
     }
     // 主线程
     public  static  void  main(String[] args) {
         // TODO Auto-generated method stub
         Runnable task =  new  Runnable() {
             public  void  run() {
                 int  i =  0 ;
                 while  (i <  100 ) {
                     i++;
                 }
             }
         };
         CountDownLatchClient cdl =  new  CountDownLatchClient();
         try  {
             cdl.timeTasks( 10 , task);
         catch  (InterruptedException e) {
         }
         System.out.println( "do another thing ...." );
     }
}

  栅栏(Barrier),可以阻塞一组线程直到某个事件发生。它和闭锁一样。闭锁用于等待其他活动,栅栏用于等待其他线程。这里执行的形如递增操作,初始化等待线程的数目,当递增到目标线程数目时候,则该线程才得以继续执行。

   CyclicBarrier,是一种常用的栅栏类,可以使得一定数量的参与者反复在栅栏处汇集。通常用于并行迭代计算中。这种将一个复杂的大问题,分解成多个子问题,为每一个子问题创建线程来处理。当线程到达栅栏位置时候将调用await,这个将一直阻塞,直到所有的线程都到达了栅栏位置。在使用它的时候,可以传递一个Runnable,用于当成功通过栅栏后执行的操作或任务。一般每个线程执行的时候先利用barrier.hasCoverged判断,然后执行任务,最后利用barrier.await(),来阻塞所有都到达栅栏位置。

  一般使用代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package  whut.concurrentmodel;
import  java.util.concurrent.BrokenBarrierException;
import  java.util.concurrent.CyclicBarrier;
//栅栏实例
public  class  BarrierClient {
     public  static  void  main(String[] args) {
         // TODO Auto-generated method stub
        BarrierClient bc= new  BarrierClient();
        //获取可以同时并行处理的数目
        int  count=Runtime.getRuntime().availableProcessors();
        CyclicBarrier barrier= new  CyclicBarrier(count);
        for ( int  i= 0 ;i<count;i++)
        {
            Worker work=bc. new  Worker(barrier);
            new  Thread(work).start();
        }
     }
                                                                                                                                                                                                                                                                                                      
     private  class  Worker  implements  Runnable
     {
         private  final  CyclicBarrier bar;
         public  Worker(CyclicBarrier bar)
         {
             this .bar=bar;
         }
         public  void  run()
         {
             //dosome work
             //...........
             try {
                 bar.await();
             } catch (InterruptedException e)
             {
             } catch (BrokenBarrierException e)
             {
                                                                                                                                                                                                                                                                                                                  
             }
         }
     }
}


栅栏与闭锁,可以使得任务线程同时开始同时结束,利用栅栏也可以实现与闭锁一样的效果。在并发测试中很有用

信号量(Semaphore),主要是用来控制同时访问某个特定资源的操作数目,或者执行某个指定操作的数目。可以用来实现资源池,对容器施加边界。Semaphore管理者一组虚拟许可,许可的数目由构造器指定。执行操作必须先获取许可,使用完毕后释放许可。如果没有许可,则acquire一直阻塞直到有许可。release方法将返回一个许可信号量。当初始值为1,则可以用作互斥体。

  Semaphore一般用于实现资源池以及设置任何容器为有界阻塞容器。注意这种方式与另一篇Java多线程设计模式(3)读写锁模式》的比较

  代码示例如下:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package  whut.concurrentmodel;
import  java.util.Collections;
import  java.util.HashSet;
import  java.util.Set;
import  java.util.concurrent.Semaphore;
//利用Semaphore来实现集合的边界处理
public  class  SemaphoreTest<T> {
     private  final  Set<T> set;
     private  final  Semaphore sem;
                                                                                                                                                                                                                    
     public  SemaphoreTest( int  bound)
     {
         this .set=Collections.synchronizedSet( new  HashSet<T>()); //同步处理
         //设置Semaphore的大小,用于设置set的边界,控制同时多少个访问
         sem= new  Semaphore(bound);
     }
     //add操作成功则会返回true,否则返回false
     public  boolean  add(T o) throws  InterruptedException
     {
         sem.acquire(); //获取信号量
         boolean  wasAdded= false ;
         try {
             wasAdded=set.add(o); //同步访问这些方法
             return  wasAdded;
         } finally {
             if (!wasAdded)
                 sem.release(); //释放信号量,如果没有添加成功
         }
     }
                                                                                                                                                                                                                    
     public  boolean  remove(Object o)
     {
         boolean  wasRemoved=set.remove(o); //成功移除返回true
         if (wasRemoved)
             sem.release(); //释放信号量
         return  wasRemoved;
     }
}

  FutureTask,这是一种可以获取长时间执行任务的快照,可以执行任何返回该对象,从而继续做其他工作,当需要获取任务的执行结果的时候,再利用Future.get获取任务的处理结果。如果任务已经完成,则get会立即返回,否则则会阻塞直到任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到了获取这个结果的线程。

   FutureTask可以包装Callable和Runnable作为其构造器参数,同时FutureTask是实现了Runnable接口,故可以作为Executor.execute的参数传递。

   使用FutureTask的方式有两种,一种是将其作为Thread的构造器参数或者execute()的参数在新的线程中执行。一种是直接运行其run方法,在主线程中串行运行。

   利用FutureTask其实就是和另一篇《Java多线程设计模式(5)Future模式》的机制一样的,只不过在内部已经封装好了。

   代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package  whut.future;
import  java.util.Random;
import  java.util.concurrent.Callable;
import  java.util.concurrent.ExecutionException;
import  java.util.concurrent.FutureTask;
//利用FutureTask来实现future设计模式
public  class  FutureTaskDemo {
     public  static  void  main(String[] args) {
         // TODO Auto-generated method stub
         MyCallale mc =  new  MyCallale();
         FutureTask<String> myfuture =  new  FutureTask<String>(mc);
         new  Thread(myfuture).start();
         System.out.println( "operate other thing" );
         try  {
             System.out.println( "data1="  + myfuture.get());
         catch  (InterruptedException e) {
         catch  (ExecutionException e) {
         }
     }
}
class  MyCallale  implements  Callable<String> {
     @Override
     public  String call()  throws  Exception {
         // TODO Auto-generated method stub
         int  i =  0 ;
         Random r =  new  Random();
         StringBuilder sb =  new  StringBuilder();
         int  res =  0 ;
         while  (i <  100000000 ) {
             i++;
             res = r.nextInt(i);
         }
         sb.append(res);
         return  sb.toString();
     }
}

四 阻塞对象池的几种方式

在实现阻塞对象池中,可以自定义同步,有数组方式和LinkedList,由于这些都不是线程安全的,所以需要显示的进行synchronized同步处理。但是都是串行化访问的,不利于并行处理。

还可以利用基础构建模块,利用BlockingQueue和Semaphore来实现,这些内部都实现了加锁机制,更便于并发与同步的效率。




本文转自 zhao_xiao_long 51CTO博客,原文链接:http://blog.51cto.com/computerdragon/1211712
相关文章
|
7天前
|
安全 Java Go
Java vs. Go:并发之争
【4月更文挑战第20天】
14 1
|
7天前
|
数据采集 存储 Java
高德地图爬虫实践:Java多线程并发处理策略
高德地图爬虫实践:Java多线程并发处理策略
|
22小时前
|
设计模式 存储 前端开发
18:JavaBean简介及其在表单处理与DAO设计模式中的应用-Java Web
18:JavaBean简介及其在表单处理与DAO设计模式中的应用-Java Web
11 4
|
22小时前
|
XML 前端开发 Oracle
16:JSP简介、注释与Scriptlet、Page指令元素、Include操作、内置对象、四种属性-Java Web
16:JSP简介、注释与Scriptlet、Page指令元素、Include操作、内置对象、四种属性-Java Web
8 2
|
1天前
|
Java 编译器 Android开发
构建高效Android应用:探究Kotlin与Java的性能差异
【5月更文挑战第1天】 在移动开发的世界中,性能优化始终是开发者关注的焦点。随着Kotlin的兴起,许多团队和开发者面临着一个选择:是坚持传统的Java语言,还是转向现代化、更加简洁的Kotlin?本文通过深入分析和对比Kotlin与Java在Android应用开发中的性能表现,揭示两者在编译效率、运行速度和内存消耗等方面的差异。我们将探讨如何根据项目需求和团队熟悉度,选择最适合的语言,以确保应用的高性能和流畅体验。
|
2天前
|
Java 编译器 Android开发
构建高效Android应用:探究Kotlin与Java的性能差异
【4月更文挑战第30天】在Android开发领域,Kotlin作为一种现代化的编程语言,因其简洁性和功能性受到了开发者的广泛欢迎。尽管与传统的Java相比,Kotlin提供了诸多便利,但关于其性能表现的讨论始终未息。本文将深入分析Kotlin和Java在Android平台上的性能差异,通过实际测试数据揭示两种语言在编译效率、运行速度以及内存占用方面的具体表现,并探讨如何利用Kotlin的优势来提升Android应用的整体性能。
|
3天前
|
Java
JavaFX库用于在Java中绘制K线图,适合构建富客户端应用。
JavaFX库用于在Java中绘制K线图,适合构建富客户端应用。以下是一个简单的K线图绘制示例:创建OHLCChart,设置标题和坐标轴,创建数据集并添加数据点,最后显示在Scene中。要定制图表外观,可利用JavaFX的丰富参数和方法。查阅JavaFX文档以获取更多细节。
14 3
|
3天前
|
存储 Java 开发工具
【Java探索之旅】用面向对象的思维构建程序世界
【Java探索之旅】用面向对象的思维构建程序世界
9 0
|
3天前
|
安全 Java 开发者
构建高效微服务架构:后端开发的新范式Java中的多线程并发编程实践
【4月更文挑战第29天】在数字化转型的浪潮中,微服务架构已成为软件开发的一大趋势。它通过解耦复杂系统、提升可伸缩性和促进敏捷开发来满足现代企业不断变化的业务需求。本文将深入探讨微服务的核心概念、设计原则以及如何利用最新的后端技术栈构建和部署高效的微服务架构。我们将分析微服务带来的挑战,包括服务治理、数据一致性和网络延迟问题,并讨论相应的解决方案。通过实际案例分析和最佳实践的分享,旨在为后端开发者提供一套实施微服务的全面指导。 【4月更文挑战第29天】在现代软件开发中,多线程技术是提高程序性能和响应能力的重要手段。本文通过介绍Java语言的多线程机制,探讨了如何有效地实现线程同步和通信,以及如
|
6天前
|
Java 测试技术 Android开发
构建高效Android应用:探究Kotlin与Java的性能对比
【4月更文挑战第26天】 在移动开发领域,性能优化一直是开发者追求的重要目标。随着Kotlin的兴起,其在Android平台上的应用逐渐增多,但关于Kotlin与Java在性能方面的对比,社区中仍存在诸多讨论。本文通过实际的性能测试,分析比较了使用Kotlin和Java编写的Android应用在多个维度上的运行效率,旨在为开发者提供一个明确的性能参考,帮助他们在选择编程语言时做出更加明智的决策。