python多进程—multiprocessing

简介:

一、进程

    python中提供多进程包:multiprocessing,支持子进程,通信,共享内存,执行不同形式的同步,提供了Process、Pipi、Lock等组件

  

  多进程和多线程区别:

  多线程使用的是CPU的一个核,适合IO密集型

  多进程使用的是CPU的多个核,适合运算密集型


1)multiprocessing的方法

  cpu_count():统计cpu总数

  active_children():获取所有子进程


例子:

1
2
3
4
5
6
7
8
#!/usr/bin/env python
import  multiprocessing
 
=  multiprocessing.cpu_count()
=  multiprocessing.active_children()
 
print (p)
print (m)

运行结果:

8

[]


2)Process进程

 创建一个Process对象:p = multiprocessing.Precess(target=worker,args=(2,))


 说明:

 target = 函数名字

 args = 函数需要的的参数,以tuple形式传入


3)Process常用方法

  is_alive():判断进程是否存活

  run():启动进程

  start():启动进程,会自动调用run方法,常用

  join(timeout=):等待进程结束或者直到超时


4)Process常用属性

  name:进程名字

  pid:进程的pid


例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python
import  time
import  multiprocessing
 
def  worker(interval):
     time.sleep(interval)
     print ( "hello,China" )
     
if  __name__  = =  "__main__" :
     =  multiprocessing.Process(target = worker,args = ( 5 ,))
     
     p.start()
     
     print (p.is_alive())
     p.join(timeout = 3 )   # 只等待3秒,如果进程还没结束,则向下执行print(p.name)
     
     print (p.name)
     print (p.pid)
     print ( "This is end" )

运行结果:

True

Process-1

121764

This is end

hello,China


实例: 多进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import  time
import  multiprocessing
 
def  worker(name,interval):
     print ( "{0} start"  . format (name))
     time.sleep(interval)
     print ( "{0} end"  . format (name))
     
if  __name__  = =  "__main__" :
     print ( "main start" )
     print ( "The computer has {0} core"  . format (multiprocessing.cpu_count()))
     
     p1  =  multiprocessing.Process(target = worker,args = ( "worker" , 2 ))
     p2  =  multiprocessing.Process(target = worker,args = ( "worker" , 3 ))
     p3  =  multiprocessing.Process(target = worker,args = ( "worker" , 4 ))
     
     p1.start()
     p2.start()
     p3.start()
     
     for  in   multiprocessing.active_children():
         print ( "The pid of {0} is {1}"  . format (p.name,p.pid) )
     print ( "main end" )

运行结果:

main start

The computer has 4 core

The pid of Process-1 is 21112

The pid of Process-3 is 20536

The pid of Process-2 is 2116

main end

worker start

worker start

worker start

worker end

worker end

worker end


说明:启动的多个进程之间都是相互独立存在的



二、lock组件

  当我们用多进程来读写文件时,如果一个写一个读同时进行时不行的,必须一个写完之后,另一个才可以读。因此需要用到一个锁机制进行控制


实例1:多进程不加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import  multiprocessing
import  time
 
def  add(number,value,lock):
     print ( "init,member = {1}" . format (value,number))
     for  in  xrange ( 1 , 6 ):
         number  + =  value
         time.sleep( 1 )
         print ( "add {0},number = {1}" . format (value,number))
         
if  __name__  = =  "__main__" :
     lock  =  multiprocessing.Lock()
     number  =  0
     
     p1  =  multiprocessing.Process(target = add,args = (number, 1 ,lock))
     p3  =  multiprocessing.Process(target = add,args = (number, 3 ,lock))
     
     p1.start()
     p3.start()
     
     print ( "main end" )

运行结果:

main end

init,member = 0

init,member = 0

add 1,number = 1

add 3,number = 3

add 1,number = 2

add 3,number = 6

add 1,number = 3

add 3,number = 9

add 1,number = 4

add 3,number = 12

add 1,number = 5

add 3,number = 15


说明:多进程互不干扰,同时进行;进程1: 0、1、2、3、4、5;进程3: 0、3、6、9、15



实例2:多进程加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import  multiprocessing
import  time
 
def  add(number,value,lock):
     lock.acquire()       # 获取锁
     try :
         print ( "init,member = {1}" . format (value,number))
         for  in  xrange ( 1 , 6 ):
             number  + =  value
             time.sleep( 1 )
             print ( "add {0},number = {1}" . format (value,number))
     except  Exception as e:
         raise  e
     finally :
         lock.release()        # 释放锁
         
if  __name__  = =  "__main__" :
     lock  =  multiprocessing.Lock()   # 定义锁
     number  =  0
     
     p1  =  multiprocessing.Process(target = add,args = (number, 1 ,lock))
     p3  =  multiprocessing.Process(target = add,args = (number, 3 ,lock))
     
     p1.start()
     p3.start()
     
     print ( "main end" )

运行结果:

main end

init,member = 0

add 1,number = 1

add 1,number = 2

add 1,number = 3

add 1,number = 4

add 1,number = 5

init,member = 0

add 3,number = 3

add 3,number = 6

add 3,number = 9

add 3,number = 12

add 3,number = 15


说明:进程1和进程3,谁先抢到锁,则另一个进程只能等待抢到者执行完之后,才能执行



三、共享内存

  两个“同时“读写的文件,其中一个作用的结果对另外一个有影响。multiprocessing提供了Value和Array模块


实例1:多进程内存共享不加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import  multiprocessing
import  time
 
def  add(number,value1):
     try :
         print ( "init,member = {1}" . format (value1,number.value))    # 值的调用方式 number.value
         for  in  xrange ( 1 , 6 ):
             number.value  + =  value1
             time.sleep( 1 )
             print ( "add {0},number = {1}" . format (value1,number.value))
     except  Exception as e:
         raise  e
         
if  __name__  = =  "__main__" :
     lock  =  multiprocessing.Lock()
     number  =  multiprocessing.Value( "i" , 0 )    # Value共享内存模块
     
     p1  =  multiprocessing.Process(target = add,args = (number, 1 ))
     p3  =  multiprocessing.Process(target = add,args = (number, 3 ))
     
     p1.start()
     p3.start()
     
     print ( "main end" )

运行结果:

main end

init,member = 0

init,member = 1

add 1,number = 4

add 3,number = 5

add 1,number = 8

add 3,number = 9

add 1,number = 12

add 3,number = 13

add 1,number = 16

add 3,number = 17

add 1,number = 20

add 3,number = 20


说明:不加锁,进程1和进程3在彼此运算完之后的结果上继续运算,同时进行



实例2: 多进程共享内存加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import  multiprocessing
import  time
 
def  add(number,value1,lock):
     lock.acquire()
     try :
         print ( "init,member = {1}" . format (value1,number.value))
         for  in  xrange ( 1 , 6 ):
             number.value  + =  value1
             time.sleep( 1 )
             print ( "add {0},number = {1}" . format (value1,number.value))
     except  Exception as e:
         raise  e
     finally :
         lock.release()
         
if  __name__  = =  "__main__" :
     lock  =  multiprocessing.Lock()
     number  =  multiprocessing.Value( "i" , 0 )   
     
     p1  =  multiprocessing.Process(target = add,args = (number, 1 ,lock))
     p3  =  multiprocessing.Process(target = add,args = (number, 3 ,lock))
     
     p1.start()
     p3.start()
     
     print ( "main end" )

运行结果:

main end

init,member = 0

add 1,number = 1

add 1,number = 2

add 1,number = 3

add 1,number = 4

add 1,number = 5

init,member = 5

add 3,number = 8

add 3,number = 11

add 3,number = 14

add 3,number = 17

add 3,number = 20


说明:加锁,进程3等待进程1执行完毕之后,在前者的结果上,继续执行



四、多进程Manager

    一般实现的数据共享的方式只有两种结构Value和Array。Python中提供了强大的Manage专门用来做数据共享的,其支持的类型非常多,包括,Value, Array,list,dict, Queue, Lock等


例:支持字典和列表类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import  multiprocessing
def  worker(d,l):
     + =  range ( 11 , 16 )    # 返回一个列表序列的特殊写法
     for  in  xrange ( 1 , 6 ):
         key  =  "key {0}" . format (i)
         value  =  "value {0}" . format (i)
         d[key]  =  value
         
if  __name__  = =  "__main__" :
     manager  =  multiprocessing.Manager()
     =  manager. list ()
     =  manager. dict ()
     =  multiprocessing.Process(target = worker,args = (d,l))
     p.start()
     p.join()
     
     print (d)
     print (l)
     print ( "main end" )


运行结果:

{'key 1': 'value 1', 'key 2': 'value 2', 'key 3': 'value 3', 'key 4': 'value 4', 'key 5': 'value 5'}

[11, 12, 13, 14, 15]

main end



五、进程池

    Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程

阻塞和非阻塞

Pool.apply_async: 非阻塞,定义的进程池进程最大数可以同时执行

Pool.apply:阻塞,一个进程结束,释放回进程池,下一个进程才可以开始


例1:非阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import  multiprocessing
import  time
def  worker(msg):
     print ( "###### start {0}#######" . format (msg))
     time.sleep( 1 )
     print ( "###### end {0}#######" . format (msg))
     
if  __name__  = =  "__main__" :
     print ( "main start" )
     pool  =  multiprocessing.Pool(processes = 3 )
     
     for  in  xrange ( 1 , 10 ):
         msg  =  "hello {0}" . format (i)
         pool.apply_async(func = worker,args = (msg,))   # pool.apply_async()非阻塞型
         
     pool.close()
     pool.join()      #调用join之前,先调用close函数,否则会出错。执行完close后如果没有新的进程加入到pool,则join函数等待所有子进程结束
     print ( "main end" )

运行结果:

main start

###### start hello 1#######

###### start hello 2#######

###### start hello 3#######

###### end hello 1#######

###### start hello 4#######

###### end hello 2#######

###### start hello 5#######

###### end hello 3#######

###### start hello 6#######

###### end hello 4#######

###### start hello 7#######

###### end hello 5#######

###### start hello 8#######

###### end hello 6#######

###### start hello 9#######

###### end hello 7#######

###### end hello 8#######

###### end hello 9#######

main end

说明:一开始启动3个进程,之后先关闭一个进程,再补充另外一个进程进来,始终保持3个,直至结束


例2:阻塞型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import  multiprocessing
import  time
def  worker(msg):
     print ( "###### start {0}#######" . format (msg))
     time.sleep( 1 )
     print ( "###### end {0}#######" . format (msg))
     
if  __name__  = =  "__main__" :
     print ( "main start" )
     pool  =  multiprocessing.Pool(processes = 3 )
     
     for  in  xrange ( 1 , 10 ):
         msg  =  "hello {0}" . format (i)
         pool. apply (func = worker,args = (msg,))    # pool.apply() 阻塞型
         
     pool.close()
     pool.join()
     print ( "main end" )

运行结果:

main start

###### start hello 1#######

###### end hello 1#######

###### start hello 2#######

###### end hello 2#######

###### start hello 3#######

###### end hello 3#######

###### start hello 4#######

###### end hello 4#######

###### start hello 5#######

###### end hello 5#######

###### start hello 6#######

###### end hello 6#######

###### start hello 7#######

###### end hello 7#######

###### start hello 8#######

###### end hello 8#######

###### start hello 9#######

###### end hello 9#######

main end

说明:每次只能启动一个进程,启动新进程前,需关闭老进程










本文转自 huangzp168 51CTO博客,原文链接:http://blog.51cto.com/huangzp/2045990,如需转载请自行联系原作者
目录
相关文章
|
13天前
|
安全 Java 数据处理
Python网络编程基础(Socket编程)多线程/多进程服务器编程
【4月更文挑战第11天】在网络编程中,随着客户端数量的增加,服务器的处理能力成为了一个重要的考量因素。为了处理多个客户端的并发请求,我们通常需要采用多线程或多进程的方式。在本章中,我们将探讨多线程/多进程服务器编程的概念,并通过一个多线程服务器的示例来演示其实现。
|
1月前
|
并行计算 安全 Unix
Python教程第8章 | 线程与进程
本章主要讲解了线程与进程的概念,多线程的运用以及Python进程的相关案例学习
36 0
|
1月前
|
分布式计算 并行计算 Java
浅析Python自带的线程池和进程池
浅析Python自带的线程池和进程池
85 0
|
1月前
|
安全 Python
Python中的并发编程:多线程与多进程技术探究
本文将深入探讨Python中的并发编程技术,重点介绍多线程和多进程两种并发处理方式的原理、应用场景及优缺点,并结合实例分析如何在Python中实现并发编程,以提高程序的性能和效率。
|
7天前
|
调度 Python
Python多线程、多进程与协程面试题解析
【4月更文挑战第14天】Python并发编程涉及多线程、多进程和协程。面试中,对这些概念的理解和应用是评估候选人的重要标准。本文介绍了它们的基础知识、常见问题和应对策略。多线程在同一进程中并发执行,多进程通过进程间通信实现并发,协程则使用`asyncio`进行轻量级线程控制。面试常遇到的问题包括并发并行混淆、GIL影响多线程性能、进程间通信不当和协程异步IO理解不清。要掌握并发模型,需明确其适用场景,理解GIL、进程间通信和协程调度机制。
27 0
|
30天前
|
并行计算 Python
Python中的并发编程:多线程与多进程的比较
在Python编程中,实现并发操作是提升程序性能的重要手段之一。本文将探讨Python中的多线程与多进程两种并发编程方式的优劣及适用场景,帮助读者更好地选择合适的方法来提高程序运行效率。
|
1月前
|
消息中间件 网络协议 API
Python语言的进程通讯及网络
Python语言的进程通讯及网络
|
1月前
|
安全 程序员 数据处理
深入探索Python多进程编程:理论与实践
深入探索Python多进程编程:理论与实践
36 2
|
1月前
|
Python
Python实现多线程或多进程编程。
Python实现多线程或多进程编程。
17 0
|
2月前
|
安全 Python
Python多进程编程中的资源共享与同步问题探讨
Python多进程编程中的资源共享与同步问题探讨

热门文章

最新文章