一、进程
python中提供多进程包:multiprocessing,支持子进程,通信,共享内存,执行不同形式的同步,提供了Process、Pipi、Lock等组件
多进程和多线程区别:
多线程使用的是CPU的一个核,适合IO密集型
多进程使用的是CPU的多个核,适合运算密集型
1)multiprocessing的方法
cpu_count():统计cpu总数
active_children():获取所有子进程
例子:
1
2
3
4
5
6
7
8
|
#!/usr/bin/env python
import
multiprocessing
p
=
multiprocessing.cpu_count()
m
=
multiprocessing.active_children()
print
(p)
print
(m)
|
运行结果:
8
[]
2)Process进程
创建一个Process对象:p = multiprocessing.Precess(target=worker,args=(2,))
说明:
target = 函数名字
args = 函数需要的的参数,以tuple形式传入
3)Process常用方法
is_alive():判断进程是否存活
run():启动进程
start():启动进程,会自动调用run方法,常用
join(timeout=):等待进程结束或者直到超时
4)Process常用属性
name:进程名字
pid:进程的pid
例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
#!/usr/bin/env python
import
time
import
multiprocessing
def
worker(interval):
time.sleep(interval)
print
(
"hello,China"
)
if
__name__
=
=
"__main__"
:
p
=
multiprocessing.Process(target
=
worker,args
=
(
5
,))
p.start()
print
(p.is_alive())
p.join(timeout
=
3
)
# 只等待3秒,如果进程还没结束,则向下执行print(p.name)
print
(p.name)
print
(p.pid)
print
(
"This is end"
)
|
运行结果:
True
Process-1
121764
This is end
hello,China
实例: 多进程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import
time
import
multiprocessing
def
worker(name,interval):
print
(
"{0} start"
.
format
(name))
time.sleep(interval)
print
(
"{0} end"
.
format
(name))
if
__name__
=
=
"__main__"
:
print
(
"main start"
)
print
(
"The computer has {0} core"
.
format
(multiprocessing.cpu_count()))
p1
=
multiprocessing.Process(target
=
worker,args
=
(
"worker"
,
2
))
p2
=
multiprocessing.Process(target
=
worker,args
=
(
"worker"
,
3
))
p3
=
multiprocessing.Process(target
=
worker,args
=
(
"worker"
,
4
))
p1.start()
p2.start()
p3.start()
for
p
in
multiprocessing.active_children():
print
(
"The pid of {0} is {1}"
.
format
(p.name,p.pid) )
print
(
"main end"
)
|
运行结果:
main start
The computer has 4 core
The pid of Process-1 is 21112
The pid of Process-3 is 20536
The pid of Process-2 is 2116
main end
worker start
worker start
worker start
worker end
worker end
worker end
说明:启动的多个进程之间都是相互独立存在的
二、lock组件
当我们用多进程来读写文件时,如果一个写一个读同时进行时不行的,必须一个写完之后,另一个才可以读。因此需要用到一个锁机制进行控制
实例1:多进程不加锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import
multiprocessing
import
time
def
add(number,value,lock):
print
(
"init,member = {1}"
.
format
(value,number))
for
i
in
xrange
(
1
,
6
):
number
+
=
value
time.sleep(
1
)
print
(
"add {0},number = {1}"
.
format
(value,number))
if
__name__
=
=
"__main__"
:
lock
=
multiprocessing.Lock()
number
=
0
p1
=
multiprocessing.Process(target
=
add,args
=
(number,
1
,lock))
p3
=
multiprocessing.Process(target
=
add,args
=
(number,
3
,lock))
p1.start()
p3.start()
print
(
"main end"
)
|
运行结果:
main end
init,member = 0
init,member = 0
add 1,number = 1
add 3,number = 3
add 1,number = 2
add 3,number = 6
add 1,number = 3
add 3,number = 9
add 1,number = 4
add 3,number = 12
add 1,number = 5
add 3,number = 15
说明:多进程互不干扰,同时进行;进程1: 0、1、2、3、4、5;进程3: 0、3、6、9、15
实例2:多进程加锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
import
multiprocessing
import
time
def
add(number,value,lock):
lock.acquire()
# 获取锁
try
:
print
(
"init,member = {1}"
.
format
(value,number))
for
i
in
xrange
(
1
,
6
):
number
+
=
value
time.sleep(
1
)
print
(
"add {0},number = {1}"
.
format
(value,number))
except
Exception as e:
raise
e
finally
:
lock.release()
# 释放锁
if
__name__
=
=
"__main__"
:
lock
=
multiprocessing.Lock()
# 定义锁
number
=
0
p1
=
multiprocessing.Process(target
=
add,args
=
(number,
1
,lock))
p3
=
multiprocessing.Process(target
=
add,args
=
(number,
3
,lock))
p1.start()
p3.start()
print
(
"main end"
)
|
运行结果:
main end
init,member = 0
add 1,number = 1
add 1,number = 2
add 1,number = 3
add 1,number = 4
add 1,number = 5
init,member = 0
add 3,number = 3
add 3,number = 6
add 3,number = 9
add 3,number = 12
add 3,number = 15
说明:进程1和进程3,谁先抢到锁,则另一个进程只能等待抢到者执行完之后,才能执行
三、共享内存
两个“同时“读写的文件,其中一个作用的结果对另外一个有影响。multiprocessing提供了Value和Array模块
实例1:多进程内存共享不加锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import
multiprocessing
import
time
def
add(number,value1):
try
:
print
(
"init,member = {1}"
.
format
(value1,number.value))
# 值的调用方式 number.value
for
i
in
xrange
(
1
,
6
):
number.value
+
=
value1
time.sleep(
1
)
print
(
"add {0},number = {1}"
.
format
(value1,number.value))
except
Exception as e:
raise
e
if
__name__
=
=
"__main__"
:
lock
=
multiprocessing.Lock()
number
=
multiprocessing.Value(
"i"
,
0
)
# Value共享内存模块
p1
=
multiprocessing.Process(target
=
add,args
=
(number,
1
))
p3
=
multiprocessing.Process(target
=
add,args
=
(number,
3
))
p1.start()
p3.start()
print
(
"main end"
)
|
运行结果:
main end
init,member = 0
init,member = 1
add 1,number = 4
add 3,number = 5
add 1,number = 8
add 3,number = 9
add 1,number = 12
add 3,number = 13
add 1,number = 16
add 3,number = 17
add 1,number = 20
add 3,number = 20
说明:不加锁,进程1和进程3在彼此运算完之后的结果上继续运算,同时进行
实例2: 多进程共享内存加锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
import
multiprocessing
import
time
def
add(number,value1,lock):
lock.acquire()
try
:
print
(
"init,member = {1}"
.
format
(value1,number.value))
for
i
in
xrange
(
1
,
6
):
number.value
+
=
value1
time.sleep(
1
)
print
(
"add {0},number = {1}"
.
format
(value1,number.value))
except
Exception as e:
raise
e
finally
:
lock.release()
if
__name__
=
=
"__main__"
:
lock
=
multiprocessing.Lock()
number
=
multiprocessing.Value(
"i"
,
0
)
p1
=
multiprocessing.Process(target
=
add,args
=
(number,
1
,lock))
p3
=
multiprocessing.Process(target
=
add,args
=
(number,
3
,lock))
p1.start()
p3.start()
print
(
"main end"
)
|
运行结果:
main end
init,member = 0
add 1,number = 1
add 1,number = 2
add 1,number = 3
add 1,number = 4
add 1,number = 5
init,member = 5
add 3,number = 8
add 3,number = 11
add 3,number = 14
add 3,number = 17
add 3,number = 20
说明:加锁,进程3等待进程1执行完毕之后,在前者的结果上,继续执行
四、多进程Manager
一般实现的数据共享的方式只有两种结构Value和Array。Python中提供了强大的Manage专门用来做数据共享的,其支持的类型非常多,包括,Value, Array,list,dict, Queue, Lock等
例:支持字典和列表类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import
multiprocessing
def
worker(d,l):
l
+
=
range
(
11
,
16
)
# 返回一个列表序列的特殊写法
for
i
in
xrange
(
1
,
6
):
key
=
"key {0}"
.
format
(i)
value
=
"value {0}"
.
format
(i)
d[key]
=
value
if
__name__
=
=
"__main__"
:
manager
=
multiprocessing.Manager()
l
=
manager.
list
()
d
=
manager.
dict
()
p
=
multiprocessing.Process(target
=
worker,args
=
(d,l))
p.start()
p.join()
print
(d)
print
(l)
print
(
"main end"
)
|
运行结果:
{'key 1': 'value 1', 'key 2': 'value 2', 'key 3': 'value 3', 'key 4': 'value 4', 'key 5': 'value 5'}
[11, 12, 13, 14, 15]
main end
五、进程池
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程
阻塞和非阻塞
Pool.apply_async: 非阻塞,定义的进程池进程最大数可以同时执行
Pool.apply:阻塞,一个进程结束,释放回进程池,下一个进程才可以开始
例1:非阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import
multiprocessing
import
time
def
worker(msg):
print
(
"###### start {0}#######"
.
format
(msg))
time.sleep(
1
)
print
(
"###### end {0}#######"
.
format
(msg))
if
__name__
=
=
"__main__"
:
print
(
"main start"
)
pool
=
multiprocessing.Pool(processes
=
3
)
for
i
in
xrange
(
1
,
10
):
msg
=
"hello {0}"
.
format
(i)
pool.apply_async(func
=
worker,args
=
(msg,))
# pool.apply_async()非阻塞型
pool.close()
pool.join()
#调用join之前,先调用close函数,否则会出错。执行完close后如果没有新的进程加入到pool,则join函数等待所有子进程结束
print
(
"main end"
)
|
运行结果:
main start
###### start hello 1#######
###### start hello 2#######
###### start hello 3#######
###### end hello 1#######
###### start hello 4#######
###### end hello 2#######
###### start hello 5#######
###### end hello 3#######
###### start hello 6#######
###### end hello 4#######
###### start hello 7#######
###### end hello 5#######
###### start hello 8#######
###### end hello 6#######
###### start hello 9#######
###### end hello 7#######
###### end hello 8#######
###### end hello 9#######
main end
说明:一开始启动3个进程,之后先关闭一个进程,再补充另外一个进程进来,始终保持3个,直至结束
例2:阻塞型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import
multiprocessing
import
time
def
worker(msg):
print
(
"###### start {0}#######"
.
format
(msg))
time.sleep(
1
)
print
(
"###### end {0}#######"
.
format
(msg))
if
__name__
=
=
"__main__"
:
print
(
"main start"
)
pool
=
multiprocessing.Pool(processes
=
3
)
for
i
in
xrange
(
1
,
10
):
msg
=
"hello {0}"
.
format
(i)
pool.
apply
(func
=
worker,args
=
(msg,))
# pool.apply() 阻塞型
pool.close()
pool.join()
print
(
"main end"
)
|
运行结果:
main start
###### start hello 1#######
###### end hello 1#######
###### start hello 2#######
###### end hello 2#######
###### start hello 3#######
###### end hello 3#######
###### start hello 4#######
###### end hello 4#######
###### start hello 5#######
###### end hello 5#######
###### start hello 6#######
###### end hello 6#######
###### start hello 7#######
###### end hello 7#######
###### start hello 8#######
###### end hello 8#######
###### start hello 9#######
###### end hello 9#######
main end
说明:每次只能启动一个进程,启动新进程前,需关闭老进程