worker
的返回值。我该怎么做呢?此值存储在哪里?示例代码:
import multiprocessing
def worker(procnum):
'''worker function'''
print str(procnum) + ' represent!'
return procnum
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print jobs
输出:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
我似乎在
jobs
中存储的对象中找不到相关的属性。#1 楼
使用共享变量进行通信。例如这样的例子: import multiprocessing
def worker(procnum, return_dict):
"""worker function"""
print(str(procnum) + " represent!")
return_dict[procnum] = procnum
if __name__ == "__main__":
manager = multiprocessing.Manager()
return_dict = manager.dict()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i, return_dict))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print(return_dict.values())
#2 楼
我认为@sega_sai建议的方法更好。但这确实需要一个代码示例,因此请按以下步骤操作: br />如果您熟悉map
(内置Python 2),这应该不会太有挑战性。否则,请查看sega_Sai的链接。请注意只需很少的代码。 (另请注意如何重用流程。)
评论
有什么想法为什么我的getpid()返回所有相同的值?我正在运行Python3
–zelusp
16-10-29在17:39
我不确定Pool如何将任务分配给工作人员。如果他们真的很快的话,也许他们都可以成为同一个工人?它会持续发生吗?另外,如果您添加延迟?
–马克
16-10-31在15:30
我还认为这是与速度有关的事情,但是当我使用10多个进程来给pool.map映射1,000,000的范围时,我最多看到两个不同的pid。
–zelusp
16-10-31在19:00
那我不确定。我认为对此提出一个单独的问题会很有趣。
–马克
16年11月1日在11:27
如果您想为每个进程发送不同的功能,请使用pool.apply_async:docs.python.org/3/library/…
–凯尔
19年5月5日在20:28
#3 楼
出于某种原因,我在任何地方都找不到如何使用Queue
进行操作的一般示例(即使Python的doc示例也不会生成多个进程),所以这是经过10次尝试后我才能工作的内容: /> def add_helper(queue, arg1, arg2): # the func called in child processes
ret = arg1 + arg2
queue.put(ret)
def multi_add(): # spawns child processes
q = Queue()
processes = []
rets = []
for _ in range(0, 100):
p = Process(target=add_helper, args=(q, 1, 2))
processes.append(p)
p.start()
for p in processes:
ret = q.get() # will block
rets.append(ret)
for p in processes:
p.join()
return rets
Queue
是一个线程安全的阻塞队列,可用于存储子进程的返回值。因此,您必须将队列传递给每个进程。这里不太明显的是,您必须先从队列中退出队列,否则队列将填满并阻塞所有内容。面向对象的对象的更新(在Python 3.4中进行了测试) ):
from multiprocessing import Process, Queue
class Multiprocessor():
def __init__(self):
self.processes = []
self.queue = Queue()
@staticmethod
def _wrapper(func, queue, args, kwargs):
ret = func(*args, **kwargs)
queue.put(ret)
def run(self, func, *args, **kwargs):
args2 = [func, self.queue, args, kwargs]
p = Process(target=self._wrapper, args=args2)
self.processes.append(p)
p.start()
def wait(self):
rets = []
for p in self.processes:
ret = self.queue.get()
rets.append(ret)
for p in self.processes:
p.join()
return rets
# tester
if __name__ == "__main__":
mp = Multiprocessor()
num_proc = 64
for _ in range(num_proc): # queue up multiple tasks running `sum`
mp.run(sum, [1, 2, 3, 4, 5])
ret = mp.wait() # get all results
print(ret)
assert len(ret) == num_proc and all(r == 15 for r in ret)
#4 楼
此示例显示如何使用多重处理列表。管道实例从任意数量的进程返回字符串:import multiprocessing
def worker(procnum, send_end):
'''worker function'''
result = str(procnum) + ' represent!'
print result
send_end.send(result)
def main():
jobs = []
pipe_list = []
for i in range(5):
recv_end, send_end = multiprocessing.Pipe(False)
p = multiprocessing.Process(target=worker, args=(i, send_end))
jobs.append(p)
pipe_list.append(recv_end)
p.start()
for proc in jobs:
proc.join()
result_list = [x.recv() for x in pipe_list]
print result_list
if __name__ == '__main__':
main()
输出:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']
此解决方案比多处理使用的资源更少。队列使用
管道
至少一个锁
缓冲区
一个线程
或一个multiprocessing.SimpleQueue,它使用
a Pipe
至少一个锁
查看每种类型的源代码非常有启发性。
评论
在不使管道成为全局变量的情况下,做到这一点的最佳方法是什么?
– Nickpick
16-10-25在13:15
我将所有全局数据和代码放入一个主函数中,并且其工作原理相同。这是否回答你的问题?
– David Cullen
16-10-25在13:43
在可以向其添加(发送)任何新值之前,始终必须读取管道吗?
– Nickpick
16-10-25在14:56
+1,好答案。但是,要想使该解决方案更加有效,就需要权衡的是,您要为每个进程创建一个Pipe,而为所有进程创建一个Queue。我不知道这是否在所有情况下都更有效率。
– sudo
17-09-21在20:41
如果返回的对象很大,此答案将导致死锁。与首先执行proc.join()相比,我首先尝试使用recv()返回值,然后进行联接。
– L. Pes
2月12日20:13
#5 楼
对于寻求使用Process
从Queue
获取值的其他任何人:import multiprocessing
ret = {'foo': False}
def worker(queue):
ret = queue.get()
ret['foo'] = True
queue.put(ret)
if __name__ == '__main__':
queue = multiprocessing.Queue()
queue.put(ret)
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
p.join()
print(queue.get()) # Prints {"foo": True}
请注意,在Windows或Jupyter Notebook中,使用
multithreading
必须将其保存为文件并执行文件。如果在命令提示符下执行此操作,将会看到类似以下的错误: AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>
评论
当我在工作进程中将某些内容放入队列时,我的连接就永远无法达到。知道这怎么可能吗?
– Laurens Koppenol
16-10-6在12:30
@LaurensKoppenol您是说您的主要代码永久挂在p.join()上并且永远不会继续吗?您的过程是否有无限循环?
–马修·莫森(Matthew Moisen)
16-10-6在17:44
是的,它无限地悬挂在那里。我的工作人员全部完成(对于所有工作人员,工作人员功能内的循环结束,然后打印打印语句)。联接不执行任何操作。如果我从函数中删除队列,它的确让我通过了join()
– Laurens Koppenol
16-10-10在8:11
@LaurensKoppenol也许在调用p.start()之前不调用queue.put(ret)吗?在这种情况下,工作线程将永远挂在queue.get()上。您可以通过在注释queue.put(ret)时复制上面的代码段来复制此代码。
–马修·莫森(Matthew Moisen)
17年8月16日在2:47
@Bendemann有人编辑了答案,并通过将queue.get放在queue.join之前使其不正确。我现在通过在p.join之后放置queue.get来修复它。请再试一次。
–马修·莫森(Matthew Moisen)
7月28日下午16:58
#6 楼
似乎应该改用multiprocessing.Pool类,并使用.apply().apply_async(),map()http://docs.python.org/library/multiprocessing方法。 html?highlight = pool#multiprocessing.pool.AsyncResult
评论
我有用于多处理的tensorflow代码。池将挂起,但没有多处理。进程
– Le Frite
19年6月24日在23:34
#7 楼
您可以使用内置的exit
来设置进程的退出代码。可以从进程的exitcode
属性获得:import multiprocessing
def worker(procnum):
print str(procnum) + ' represent!'
exit(procnum)
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
result = []
for proc in jobs:
proc.join()
result.append(proc.exitcode)
print result
输出:
评论
请注意,这种方法可能会造成混淆。进程通常应以没有退出的退出代码0退出。如果您有任何监视系统进程退出代码的信息,那么您可能会看到这些报告为错误。
–铁轮
17年5月23日在21:50
如果您只想在父进程中因错误引发异常,则是完美的选择。
–crizCraig
18年7月19日在17:45
#8 楼
pebble软件包利用multiprocessing.Pipe
很好地抽象了它,这使它变得非常简单:来自:https://pythonhosted.org/Pebble/#concurrent-decorators #9 楼
以为我会简化上面复制的最简单的示例,在Py3.6上为我工作。最简单的是multiprocessing.Pool
:import multiprocessing
import time
def worker(x):
time.sleep(1)
return x
pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))
您可以使用例如
Pool(processes=5)
来设置池中的进程数。但是,它默认为CPU计数,因此对于与CPU绑定的任务,将其留空。 (I / O绑定的任务无论如何通常都适合线程,因为线程通常在等待,因此可以共享一个CPU内核。)Pool
还应用了分块优化。(请注意worker方法不能嵌套在其中我最初在该方法内部定义了我的worker方法,该方法对
pool.map
进行了调用,以使其全部独立,但随后进程无法导入它,并抛出“ AttributeError:无法腌制本地对象external_method。 “。inner_method”。更多信息。它可以在类内。) 。)Py3的
'represent!'
也是两行(time.sleep()
返回发生器,因此您需要ProcessPoolExecutor
):from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
print(list(executor.map(worker, range(10))))
使用普通
.map
es:import multiprocessing
import time
def worker(x, queue):
time.sleep(1)
queue.put(x)
queue = multiprocessing.SimpleQueue()
tasks = range(10)
for task in tasks:
multiprocessing.Process(target=worker, args=(task, queue,)).start()
for _ in tasks:
print(queue.get())
如果需要的只是
list()
和Process
,请使用SimpleQueue
。第一个循环开始所有进程,然后第二个循环进行阻塞的put
调用。我认为也没有必要致电get
。#10 楼
一个简单的解决方案:import multiprocessing
output=[]
data = range(0,10)
def f(x):
return x**2
def handler():
p = multiprocessing.Pool(64)
r=p.map(f, data)
return r
if __name__ == '__main__':
output.append(handler())
print(output[0])
输出:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
#11 楼
如果您使用的是Python 3,则可以使用concurrent.futures.ProcessPoolExecutor
作为方便的抽象:from concurrent.futures import ProcessPoolExecutor
def worker(procnum):
'''worker function'''
print(str(procnum) + ' represent!')
return procnum
if __name__ == '__main__':
with ProcessPoolExecutor() as executor:
print(list(executor.map(worker, range(5))))
输出:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
#12 楼
由于需要从函数中获取错误代码,因此我对vartec的答案做了一些修改。 (感谢vertec !!!这是一个很棒的技巧)也可以使用
manager.list
来完成,但我认为最好将它放在字典中并在其中存储列表。这样,由于我们无法确定列表的填充顺序,因此我们保留了函数和结果。from multiprocessing import Process
import time
import datetime
import multiprocessing
def func1(fn, m_list):
print 'func1: starting'
time.sleep(1)
m_list[fn] = "this is the first function"
print 'func1: finishing'
# return "func1" # no need for return since Multiprocess doesnt return it =(
def func2(fn, m_list):
print 'func2: starting'
time.sleep(3)
m_list[fn] = "this is function 2"
print 'func2: finishing'
# return "func2"
def func3(fn, m_list):
print 'func3: starting'
time.sleep(9)
# if fail wont join the rest because it never populate the dict
# or do a try/except to get something in return.
raise ValueError("failed here")
# if we want to get the error in the manager dict we can catch the error
try:
raise ValueError("failed here")
m_list[fn] = "this is third"
except:
m_list[fn] = "this is third and it fail horrible"
# print 'func3: finishing'
# return "func3"
def runInParallel(*fns): # * is to accept any input in list
start_time = datetime.datetime.now()
proc = []
manager = multiprocessing.Manager()
m_list = manager.dict()
for fn in fns:
# print fn
# print dir(fn)
p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
p.start()
proc.append(p)
for p in proc:
p.join() # 5 is the time out
print datetime.datetime.now() - start_time
return m_list, proc
if __name__ == '__main__':
manager, proc = runInParallel(func1, func2, func3)
# print dir(proc[0])
# print proc[0]._name
# print proc[0].name
# print proc[0].exitcode
# here you can check what did fail
for i in proc:
print i.name, i.exitcode # name was set up in the Process line 53
# here will only show the function that worked and where able to populate the
# manager dict
for i, j in manager.items():
print dir(i) # things you can do to the function
print i, j
评论
我建议使用multiprocessing.Queue,而不是此处的Manager。使用管理器需要产生一个全新的过程,而当一个队列执行时,这是过大的。
–达诺
2015年4月19日,0:54
@dano:我想知道,如果我们使用Queue()对象,我们不确定每个进程返回值时的顺序。我的意思是,如果我们需要结果中的顺序,请执行下一个工作。我们如何确定哪个过程的确切输出在哪里
–猫建物
16-9-29在11:08
@Catbuilts您可以从每个流程返回一个元组,其中一个值是您关心的实际返回值,另一个是流程中的唯一标识符。但是我也想知道为什么您需要知道哪个过程返回哪个值。如果那是您实际上需要了解的过程,还是需要在输入清单和输出清单之间建立关联?在那种情况下,我建议使用multiprocessing.Pool.map处理您的工作项列表。
–达诺
16 Dec 1'在14:43
仅具有单个参数的函数的警告:应使用args =(my_function_argument,)。注意这里的逗号!否则,Python将抱怨“缺少位置参数”。花了我10分钟来弄清楚。还要检查手动用法(在“过程类”部分下)。
– yuqli
19年4月29日在15:17
@vartec使用multipriocessing.Manager()词典的一个缺点是,对它返回的对象进行腌制(序列化),因此它具有一个瓶颈,该瓶颈由最大2GiB大小的腌制库提供,以使对象可以返回。还有其他方法可以避免返回对象的序列化吗?
– hirschme
19年11月13日在21:46