在学习Python多进程的过程中,虽然Process和Pool都能实现多进程的功能,但是侧重点各有不同:

  • Process需要自己管理进程,起一个Process就是起一个新进程

  • Pool是进程池,它可以开启固定数量的进程,然后将任务放到一个池子里,系统来调度多进程执行池子里的任务

所以从直观感受上更倾向于使用Pool,但是使用过程中却发现Pool存在一些问题(或者说与Process的差异),所以记录下来给大家分享一下

关于Process和Pool的基本使用可以参考廖雪峰的Python教程,非常简单直观,易于上手。这个博客介绍的也不错

Process和Pool的使用差异

1.Pool使用全局变量的问题

这个问题我不知道该如何定义,只是在开发过程中发现了并探索到相应的解决办法。问题简单描述就是无法使用可变的全局变量(比如for循环),可见如下代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from multiprocessing import Pool

def multi_task():
print i, '|', global_var

if __name__ == '__main__':
print 'start'
global_var = "I'm a global variable"
p_pool = Pool(5)
for i in range(5):
p_pool.apply_async(func=multi_task)
p_pool.close()
p_pool.join()
print 'end'

# output
"""
start
end
"""

# def multi_task():
# print global_var
"""
start
I'm a global variable
I'm a global variable
I'm a global variable
I'm a global variable
I'm a global variable
end
"""

执行后会发现输出内容只有start和end,从0-4的数字并没有打印出来,并且连不变的全局变量global_var都没有打印,看起来是没有执行multi_task(实际上是执行了multi_task。如果multi_task有返回值,并且在main中用Pool的get方法获取返回值时会报错NameError: global name 'i' is not defined)。但是如果在multi_task里面删掉可变全局变量i,那么全局变量global_var还是能打印出来的

以上代码换用Process实现却不存在问题,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from multiprocessing import Process

def multi_task():
print i, '|', global_var

if __name__ == '__main__':
print 'start'
global_var = "I'm a global variable"
p_list = []
for i in range(5):
p = Process(target=multi_task)
p.start()
p_list.append(p)
for p in p_list:
p.join()
print 'end'

# output
"""
start
0 | I'm a global variable
1 | I'm a global variable
2 | I'm a global variable
3 | I'm a global variable
4 | I'm a global variable
end
"""

不变的全局变量global_var和变化的全局变量i都能正确的打印出来

至于原因,因为能力有限没有找到相关解释。但是解决办法有两种:其一就是如上换用Process,缺点是失去了进程池的功能(不过放心,后文会有Process实现进程池功能);其二是在使用Pool的apply_async方法时将i作为参数传递进去,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from multiprocessing import Pool

def multi_task(var):
print var, '|', global_var

if __name__ == '__main__':
print 'start'
global_var = "I'm a global variable"
p_pool = Pool(5)
for i in range(5):
p_pool.apply_async(func=multi_task, args=(i,))
p_pool.close()
p_pool.join()
print 'end'

# output
"""
start
0 | I'm a global variable
1 | I'm a global variable
3 | I'm a global variable
2 | I'm a global variable
4 | I'm a global variable
end
"""

可见结果与使用Process一致,两种变量都能正确输出

注:因是多进程,所以输出顺序并不一定是01234;以及apply_async方法的args参数形式是tuple,所以如果只有一个入参的话要加个逗号

2.类内使用Pool的问题

如果在类内直接使用Pool来使用多进程的话,一般会出现问题。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from multiprocessing import Pool

global_var = "I'm a global variable"

class SomeClass(object):

def some_method(self):
print 'start'
p_pool = Pool(5)
for i in range(5):
p_pool.apply_async(func=self.multi_task, args=(i,))
p_pool.close()
p_pool.join()
print 'end'

def multi_task(self, var):
print var, '|', global_var

cls = SomeClass()
cls.some_method()

# output
"""
start
end
"""

虽然参数传入了,但从结果上来看很像第一个问题(不传入也是这个结果),看起来是没有执行multi_task(实际上是执行了multi_task。如果multi_task有返回值,并且在main中用Pool的get方法获取返回值时会报错cPickle.PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed,这也是解释该问题的原因,实例无法序列化)

通过在网上搜索答案,发现这是因为Pool中使用Queue来进行通信,所有进入队列的数据必须可序列化,包括实例方法。而在python2.7中,实例方法并不能被序列化,从而出现该问题(python3中实例方法可以被序列化了,所以这么使用就没问题了)

解决方法有多种,最简单的就是把multi_task放到类外,问题迎刃而解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from multiprocessing import Pool

global_var = "I'm a global variable"

class SomeClass(object):

def some_method(self):
print 'start'
p_pool = Pool(5)
for i in range(5):
p_pool.apply_async(func=multi_task, args=(i,))
p_pool.close()
p_pool.join()
print 'end'

def multi_task(var):
print var, '|', global_var

cls = SomeClass()
cls.some_method()

# output
"""
start
0 | I'm a global variable
1 | I'm a global variable
3 | I'm a global variable
2 | I'm a global variable
4 | I'm a global variable
end
"""

其他的解决办法以及更为细致解释和实验,可以参见以下两篇文章:

其他事项

1.多进程之间的数据共享

多进程之间不能使用普通的Python数据类型,比如平常使用的list或者dict由父进程传递给子进程后,子进程只可读,写无效。但是Python在multiprocessing模块中给我们提供了一些用于多进程的数据类型:

  • multiprocessing.Queue,是多进程安全的队列(FIFO),其主要有put和get两个方法,分别是存储数据和取出数据。使用方法很简单。更多信息可自行搜索
  • multiprocessing.Manager,封装了可用于多进程的常用数据类型,例如list和dict。使用Manager创建一个list以后,该list的使用和普通list一致。更多信息可自行搜索
  • 还有一些其他的,例如multiprocessing.Array,multiprocessing.Pipe等,因为没有使用过所以在此不再赘述,感兴趣可自行搜索

在实践中发现,使用Manager的list时,虽然使用方法上和普通list一样,但可能因为多进程之间通信的缘故,list中每个元素大小存在限制。因为将之前单进程的代码修改为多进程后,出现报错OverflowError: cannot serialize a string larger than 2 GiB multiprocessing,经查看发现这是在某处给Manager的list添加元素时发生的,并且该元素的确很大,而原来单进程的时候却没有出现过该问题,所以遂产生刚才的猜想(因为没有找到相关解释)。我的解决办法就是把该超大元素切分后再分别添加到Manager的list中

2.Pool子进程不显示报错信息

在最一开始使用Pool的时候,发现子进程中如果有错误的话并不会抛出异常,而是该子进程直接死掉然后继续去取任务执行。诸如本文上述的NameErrorcPickle.PicklingErrorOverflowError都没有抛出,这在查找问题的时候颇费了一番功夫,而使用Process的时候这些异常均能正常抛出。经实践发现,只有多进程执行的任务有返回值,并且在主进程用get方法来获取返回值时,子进程中的异常才会正常抛出,原因是:apply_async返回的是AsyncResult,其中出现的异常只有在调用AsyncResult.get()的时候才会被重新引发(来源于今天遇到的Python多线程、多进程中的几个坑

3.使用Process实现进程池功能

用Queue和while True来模拟进程池,具体可见代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import os
import time
import random
from multiprocessing import Process, Queue, Manager

def multi_task(param, r):
"""多进程需要执行的任务"""
t = random.random() * 5
pid = os.getpid()
print 'Task %s(pid is %s) will run %s seconds' % (param, pid, t)
time.sleep(t)
res = param * param
r.append(res)
print 'Task %s\' result is %s' % (param, res)

def process_pool(q, r):
"""模拟进程池"""
while True:
try:
param = q.get(False)
multi_task(param, r)
except Exception:
if q.empty():
break

def your_code():
p_list = []
# 先将多进程所要执行的任务的所有参数放入队列中
all_task = Queue()
for task_param in range(7):
all_task.put(task_param)
# 结果存储
result = Manager().list()
# 启动多进程
for i in range(3):
p = Process(target=process_pool, args=(all_task, result))
p.start()
p_list.append(p)
for p in p_list:
p.join()
print result

if __name__ == '__main__':
print 'start'
your_code()
print 'end'

# output
"""
start
Task 0(pid is 14642) will run 3.78 seconds
Task 1(pid is 14643) will run 4.08 seconds
Task 2(pid is 14644) will run 4.62 seconds
Task 0' result is 0
Task 3(pid is 14642) will run 1.58 seconds
Task 1' result is 1
Task 4(pid is 14643) will run 1.20 seconds
Task 2' result is 4
Task 5(pid is 14644) will run 2.74 seconds
Task 4' result is 16
Task 6(pid is 14643) will run 2.07 seconds
Task 3' result is 9
Task 6' result is 36
Task 5' result is 25
[0, 1, 4, 16, 9, 36, 25]
end
"""