1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
一、概念
进程: 未完成任务而执行一堆代码的过程,进程是任务,真正执行进程的是cpu
并行: 多个cpu同时运行
并发: 单个cpu分时操作,利用单cpu 的多道技术,看起来像是程序同时运行,其实是分时段运行,
只不过cpu切换速度比较快,并行也属于并发。
 
多道技术:内存中同时存入多道(多个)程序,cpu从一个进程快速切换到另外一个,使每个进程各
自运行几十或几百毫秒,这样,虽然在某一个瞬间,一个cpu只能执行一个任务,但在 1 秒内,cpu却
可以运行多个进程,这就给人产生了并行的错觉,即伪并发,以此来区分多处理器操作系统的真正硬
件并行(多个cpu共享同一个物理内存)
同步: 同步就是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息,那么这个
进程将会一直等待下去,直到收到返回信息才继续执行下去
异步: 异步是指进程不需要一直等下去,而是继续执行下面的操作,不管其他进程的状态。当有消息
返回时系统会通知进程进行处理,这样可以提高执行的效率
二、创建进程模块与方法
例子:
from  multiprocessing  import  Process
import  time
import  random
 
def  piao(name):
     print ( '%s is piaoing'  % name)
     time.sleep((random.randint( 1 , 3 )))
     print ( '%s is piao end'  % name)
 
if  __name__  = =  '__main__' :
     p1  =  Process(target = piao, args = ( 'p1' ,))
     p2  =  Process(target = piao, args = ( 'p2' ,))
     p3  =  Process(target = piao, args = ( 'p3' ,))
     p4  =  Process(target = piao, args = ( 'p4' ,))
 
     p_l  =  [p1, p2, p3, p4]
     for  in  p_l:
         p.start()
     for  in  p_l:
         p.join()
     print ( '主进程' )
multiprocessing: multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如
函数),该模块与多线程模块threading的编程接口类似
Process: Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,
表示一个子进程中的任务(尚未启动)
group: 未使用,默认是 None
target: 子进程执行的任务,一般为函数
name: 子进程名字
args: target任务对象的参数,由元组组成
kwargs: target任务对象的字典参数,
 
进程方法介绍:
p.start(): 启动进程,并调用该子进程中的p.run()
p.run(): 进程启动时运行的方法,正是它去调用target指定的函数,自定义类的类中一定要实现该方法
p.terminate(): 强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进
程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive(): 如果p仍然运行,返回 True
p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。
timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run
开启的进程
进程属性介绍:
p.daemon:默认值为 False ,如果设为 True ,代表p为后台运行的守护进程,当p的父进程终止时,
p也随之终止,并且设定为 True 后,p不能创建自己的新进程,必须在p.start()之前设置
p.name: 进程的名称
p.pid:进程的pid
创建进程的两种方式:
例子:
from  multiprocessing  import  Process
import  time
import  random
 
def  piao(name):
     print ( '%s is piaoing'  % name)
     time.sleep( 3 )
     print ( '%s is piao end'  % name)
 
if  __name__  = =  '__main__' :
     p1  =  Process(target = piao, args = ( 'hyh' ,), name = '<p1>' )
     p1.start()
     print ( 'p1 name is %s '  % p1.name)
     print ( '父进程' )
 
from  multiprocessing  import  Process
import  time
import  random
 
class  piao(Process):
     def  __init__( self , name):
         super ().__init__()
         self .name  =  name
 
     def  run( self ):
         print ( '%s is piaoing'  % self .name)
         time.sleep( 3 )
         print ( '%s is piao end'  % self .name)
 
if  __name__  = =  '__main__' :
     p1  =  piao( 'hyh' )
     p1.start()   #p1.run
     print ( '父进程' )
并发实现例子:
server:
#!/usr/bin/python
# --*-- coding: utf-8 --*--
from  multiprocessing  import  Process
from  socket  import  *
 
server  =  socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1 )
server.bind(( '127.0.0.1' 8080 ))
server.listen( 5 )
 
def  talk(conn, addr):
     while  True :
         try :
             msg  =  conn.recv( 1024 )
             if  not  msg:  break
             conn.send(msg.upper())
         except  Exception:
             break
 
if  __name__  = =  '__main__' :
     while  True :
         conn, addr  =  server.accept()
         =  Process(target = talk, args = (conn, addr))
         p.start()
         
多个client
#!/usr/bin/python
# --*-- coding: utf-8 --*--
from  socket  import  *
 
client  =  socket(AF_INET, SOCK_STREAM)
client.connect(( '127.0.0.1' 8080 ))
 
while  True :
     msg  =  input ( ">>: " ).strip()
     if  not  msg:  continue
     client.send(msg.encode( 'utf-8' ))
     msg  =  client.recv( 1024 )
     print (msg.decode( 'utf-8' ))
     
多少个client链接到服务端,就需要服务端开启多少个进程,大并发下,服务器会崩溃,怎么处理,
需要借助进程池pool(后面有pool类使用例子)
 
        
守护进程daemon,主进程执行完毕,子进程就跟着结束
例子:
from  multiprocessing  import  Process
import  time
import  random
 
class  Piao(Process):
     def  __init__( self , name):
         super ().__init__()
         self .name  =  name
 
     def  run( self ):
         print ( '%s is piaoing'  % self .name)
         time.sleep(random.randint( 1 , 3 ))
         print ( '%s is piao end' )
=  Piao( 'test' )
p.daemon  =  True
p.start()
#p.join(0.0001)
print ( '开始' )
 
三、进程间通信
进程之间是彼此隔离的,需要借助队列和管道实现通信
队列类Queue
Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间
的数据传递
maxsize表示队列中存放最多进程数
 
Queue对象q的方法:
q.put(): 插入数据到队列,put方法还有两个可选参数:blocked和timeout。如果blocked为 True
(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。
如果超时,会抛出Queue.Full异常。如果blocked为 False ,但该Queue已满,会立即抛出Queue.Full异常
 
q.get(): 从队列中去除数据
q.empty():调用此方法时q为空则返回 True ,该结果不可靠,比如在返回 True 的过程中,如果队列中又
加入了项目
q.full(): q已满,返回 True
q.qsize(): 返回队列中目前项目的正确数量
 
Lock锁类,通过锁定对象,使一个进程操作该对象时,其他进程等待
抢票例子:
from  multiprocessing  import  Process,Lock
import  json
import  time
import  random
 
def  work(dbfile, name, lock):
     with lock:
         with  open (dbfile, encoding = 'utf-8' ) as f:
             dic  =  json.loads(f.read())
         if  dic[ 'count' ] >  0 :
             dic[ 'count' - =  1
             time.sleep(random.randint( 1 , 3 ))
             with  open (dbfile,  'w' , encoding = 'utf-8' ) as f:
                 f.write(json.dumps(dic))
             print ( '\033[34m%s 抢票成功\033[0m'  % name)
         else :
             print ( '\033[37m%s 抢票失败\033[0m'  % name)
 
if  __name__  = =  '__main__' :
     lock  =  Lock()
     p_l  =  []
     for  in  range ( 100 ):
         =  Process(target = work, args = ( 'a.txt' '用户%s'  % i, lock))
         p_l.append(p)
         p.start()
 
     for  in  p_l:
         p.join()
     print ( '主进程' )
     
     
Queue队列存放取出对象例子:
from  multiprocessing  import  Process, Queue
import  time
=  Queue( 3 )
q.put( 3 )
q.put( 4 )
q.put( 5 )
print (q.full())
 
print (q.get())
print (q.get())
print (q.get())
print (q.empty())
 
生产者消费者模型:
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直
接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞
队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生
产者和消费者的处理能力
例子:
from  multiprocessing  import  Process, Queue
import  time, random, os
 
def  consumer(q):
     while  True :
         time.sleep(random.randint( 1 , 3 ))
         res  =  q.get()
         if  res  is  None break
         print ( '\033[34m消费者拿到了: %s\033[0m'  % res)
 
def  producer(seq, q):
     for  item  in  seq:
         time.sleep(random.randint( 1 , 3 ))
         print ( '\033[45m生产者生产了: %s\033[0m'  % item)
         q.put(item)
 
if  __name__  = =  '__main__' :
     =  Queue()
     =  Process(target = consumer, args = (q,))
     c.start()
 
     producer(( 'P-%s'  % for  in  range ( 10 )), q)
     q.put( None )
     c.join()
     print ( '主线程' )
     
JoinableQueue 的实例p除了与Queue对象相同的方法之外还具有
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止  
from  multiprocessing  import  Process, JoinableQueue
import  time, random
def  consumer(q):
     while  True :
         res  =  q.get()
         print ( '消费者拿到了 %s'  % res)
         q.task_done()
 
def  producer(seq, q):
     for  item  in  seq:
         q.put(item)
         print ( '生产者做好了 %s'  % item)
     q.join()
 
if  __name__  = =  '__main__' :
     =  JoinableQueue()
     seq  =  ( 'P-%s'  % for  in  range ( 10 ))
     =  Process(target = consumer, args = (q,))
     p.daemon = True
     p.start()
 
     producer(seq, q)
     print ( '主线程' )
     
多消费者模式
例子:
from  multiprocessing  import  Process, JoinableQueue
import  time,random
 
def  consumer(name, q):
     while  True :
         time.sleep(random.randint( 1 , 3 ))
         res  =  q.get()
         print ( '\033[34m%s拿到了 %s\033[0m'  % (name, res))
         q.task_done()
 
def  producer(seq, q):
     for  item  in  seq:
         time.sleep(random.randrange( 1 , 2 ))
         q.put(item)
         print ( '\033[42m生产者做好了 %s\033[0m'  % item)
     q.join()
 
if  __name__  = =  '__main__' :
     =  JoinableQueue()
     seq  =  ( 'P-%s'  % for  in  range ( 10 ))
     p1  =  Process(target = consumer, args = ( 'C-1' , q,))
     p2  =  Process(target = consumer, args = ( 'C-2' , q,))
     p3  =  Process(target = consumer, args = ( 'C-3' , q,))
     p1.daemon  =  True
     p2.daemon  =  True
     p3.daemon  =  True
     p1.start()
     p2.start()
     p3.start()
 
     producer(seq, q)
     print ( '主线程' )
 
Semaphore类允许一定数量的线程修改数据,,如果指定信号量为 3 ,那么来一个人获得一把锁,计数加 1
当计数等于 3 时,后面的人均需要等待。一旦释放,就有人可以获得一把锁
信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
例子:
from  multiprocessing  import  Process, Semaphore
import  time, random
 
def  go_wc(sem, user):
     sem.acquire()
     print ( '%s 占用一个位置'  % user)
     time.sleep(random.randint( 0 , 3 ))
     sem.release()
 
if  __name__  = =  '__main__' :
     sem  =  Semaphore( 5 )
     p_l  =  []
     for  in  range ( 13 ):
         =  Process(target = go_wc, args = (sem, 'user%s'  % i,))
         p.start()
         p_l.append(p)
 
     for  in  p_l:
         i.join()
     print ( '=========>' )
 
     
四、进程池pool
进程开启越多占用系统资源越多,不能无限开启进程,需要用有控制进程数量的手段,pool类可以控制
创建进程的数量.
 
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就
会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,
直到池中有进程结束,就重用进程池中的进程
 
Pool([numprocess  [,initializer [, initargs]]]): 创建进程池
numprocess: 要创建的进程数,如果省略,将默认使用cpu_count()的值
initializer:是每个工作进程启动时要执行的可调用对象,默认为 None
initargs:是要传给initializer的参数组
 
p. apply (func [, args [, kwargs]]): 在一个池工作进程中执行func( * args, * * kwargs),然后返回结果。
需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地
执行func函数,必须从不同线程调用p. apply ()函数或者使用p.apply_async()
 
p.apply_async(func [, args [, kwargs]]): 在一个池工作进程中执行func( * args, * * kwargs),然后返回
结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变
为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果
 
p.close(): 关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
p.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
p.join():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
 
使用进程池pool的例子:
异步例子:    
#!/usr/bin/python
# --*-- coding: utf-8 --*--
from  multiprocessing  import  Process, Pool
from  socket  import  *
import  os
 
server  =  socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1 )
server.bind(( '127.0.0.1' 8080 ))
server.listen( 5 )
 
def  talk(conn, addr):
     print (os.getpid())
     while  True :
         try :
             msg  =  conn.recv( 1024 )
             if  not  msg:  break
             conn.send(msg.upper())
         except  Exception:
             break
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
if  __name__  = =  '__main__' :
     pool  =  Pool()    #默认保留4个进程,最多同时4个client连过来
     res_l  =  []
     while  True :
         conn, addr  =  server.accept()
         res  =  pool.apply_async(talk, args = (conn, addr))      #异步方法apply_async
         res_l.append(res)
         #print(res_l)
同步例子:
#!/usr/bin/python
# --*-- coding: utf-8 --*--
from  multiprocessing  import  Process, Pool
from  socket  import  *
import  os
 
server  =  socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1 )
server.bind(( '127.0.0.1' 8080 ))
server.listen( 5 )
 
def  talk(conn, addr):
     print (os.getpid())
     while  True :
         try :
             msg  =  conn.recv( 1024 )
             if  not  msg:  break
             conn.send(msg.upper())
         except  Exception:
             break
 
if  __name__  = =  '__main__' :
     pool  =  Pool()    #默认保留4个进程
     #res_l = []
     while  True :
         conn, addr  =  server.accept()
         pool. apply (talk, args = (conn,addr))    一次只能一个进程通信,其他进程等待io
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
回掉函数
例子:
from  multiprocessing  import  Pool
import  time, random
 
def  get_page(url):
     time.sleep(random.randint( 1 , 3 ))
     print ( '下载页面: %s'  % url)
     return  { 'url' : url}
 
def  parse_page(page_content):
     time.sleep( 1 )
     print ( '解析页面: %s'  % page_content)
 
if  __name__  = =  '__main__' :
     urls  =  [
         'http://maoyan.com/board/7' ,
         'http://maoyan.com/board/1' ,
         'http://maoyan.com/board/2'
     ]
     =  Pool()
     res_l  =  []
     for  url  in  urls:
         res  =  p.apply_async(get_page, args = (url, ), callback = parse_page)     #调用parse_page
         res_l.append(res)
 
     for  in  res_l:
         i.get()