很抱歉,我无法使用更简单的示例重现该错误,并且我的代码太复杂而无法发布。如果我在IPython Shell中而不是在常规Python中运行该程序,那么效果会很好。

我查阅了有关此问题的以前的笔记。它们都是由使用池调用在类函数中定义的函数引起的。但这不是我的情况。

 Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
 


我将不胜感激。

更新:我腌制的功能定义在模块的顶层。虽然它调用包含嵌套函数的函数。即f()调用g()调用h(),它具有嵌套函数i(),而我正在调用pool.apply_async(f)f()g()h()都在顶层定义。我用这种模式尝试了更简单的示例,尽管它可以工作。

评论

顶级/可接受的答案是好的,但这可能意味着您需要重新构建代码,这可能会很痛苦。我建议有此问题的任何人也阅读利用莳萝和悲伤的其他答案。但是,使用vtkobjects时,我对任何解决方案都不满意:(是否有人设法在并行处理vtkPolyData中运行python代码?

#1 楼

这是可以腌制的食物的清单。特别是,只有在模块的顶级定义了函数时,这些函数才是可拾取的。

这段代码:产生的错误几乎与您发布的错误相同:通过pool进行的所有操作都必须是可拾取的,而不能通过mp.SimpleQueue进行拾取,因为它不是在模块的顶层定义的。

可以通过在顶层定义一个函数来对其进行固定,调用mp.SimpleQueue

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()


注意foo.work是可选择的,因为foo.work()是在顶层定义的,而foo是可选择的。

评论


感谢您的回复。我更新了我的问题。我不认为这是原因

–仇杀队
2012年1月10日15:04

要获得PicklingError,必须在队列中放置一些不可腌制的东西。它可能是函数或其参数。要查找有关该问题的更多信息,我建议您制作一个程序副本,并开始对其进行精简,使其变得越来越简单,每次重新运行该程序以查看问题是否依然存在。当事情变得非常简单时,您将自己发现问题,或者将其发布在此处。

–unutbu
2012年1月10日15:56

另外:如果您在模块的顶层定义了一个函数,但是该函数已经过修饰,则引用将指向该修饰符的输出,无论如何您都会遇到此错误。

– bobpoekert
13年4月17日在0:35

仅晚了5年,但我刚刚遇到了这个问题。事实证明,“顶级”必须比平时更实际地使用:在我看来,函数定义必须在初始化池之前(即此处的pool = Pool()行)。我没想到,这可能是OP问题持续存在的原因。

–安德拉斯·迪克(Andras Deak)
17 Mar 1 '17 at 0:15

特别是,只有在模块的顶层定义了功能时,这些功能才是可挑选的。似乎将functool.partial应用于顶层函数的结果也是可腌的,即使它是在另一个函数中定义的也是如此。

–user1071847
19年2月21日在22:07

#2 楼

我会使用pathos.multiprocesssing而不是multiprocessingpathos.multiprocessing是使用multiprocessingdill的前叉。 dill几乎可以在python中序列化任何东西,因此您可以并行发送更多信息。 pathos分支还具有直接使用多个参数函数的能力,这是您需要类方法所必需的。


https://github.com/uqfoundation

评论


工作了。对于其他任何人,我都通过以下方式安装了这两个库:sudo pip install git + https://github.com/uqfoundation/dill.git@master和sudo pip install git + https://github.com/uqfoundation/pathos.git@主

–亚历山大·麦克法兰
2015年3月29日在21:20



@AlexanderMcFarlane我不会用sudo安装python软件包(尤其是从github等外部资源)。相反,我建议运行:pip install --user git + ...

–克里斯
15年8月12日在10:00

仅使用pip install pathos不能令人遗憾地工作,并显示以下消息:找不到满足pp == 1.5.7-pathos要求的版本(来自pathos)

– xApple
16年5月18日在15:00

pip install pathos现在可以使用,并且pathos与python 3兼容。

–迈克·麦克肯斯(Mike McKerns)
16年7月21日在16:16

@DanielGoldfarb:多进程是多处理的一个分支,莳萝在代码中的多个地方都代替了泡菜……但实际上就是这样。 pathos在多进程上提供了一些其他API层,并且还具有其他后端。但是,这就是要点。

–迈克·麦克肯斯(Mike McKerns)
19-09-25在13:05

#3 楼

正如其他人所说,multiprocessing只能将Python对象传输到可以腌制的工作进程。如果您无法按照unutbu的描述重新组织代码,则可以使用dill的扩展的酸洗/酸洗功能来传输数据(尤其是代码数据),如下所示。并且没有其他像dill一样的库:

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()


评论


我是莳萝和悲痛的作者……虽然您是对的,但也可以像我的回答中那样使用悲痛吗?也许我有点偏见...

–迈克·麦克肯斯(Mike McKerns)
2014年10月2日,下午16:38

在撰写本文时,我还不了解悲痛的状况,因此想提出一个非常接近答案的解决方案。现在,我已经看到了您的解决方案,我同意这是要走的路。

– Rocksportrocker
2014年10月6日在7:31

我读了您的解决方案,然后,道,……我什至没有想到那样做。因此,这有点酷。

–迈克·麦克肯斯(Mike McKerns)
2014年10月6日上午11:36

感谢您的发帖,我使用了这种方法来处理无法腌制的参数,使它们变得愚蠢:stackoverflow.com/questions/27883574/…

–爵士蓝
2015年1月11日在21:37

@rocksportrocker。我正在阅读此示例,无法理解为什么存在显式的for循环。我通常会看到并行例程获取一个列表并返回一个没有循环的列表。

–user1700890
16年4月13日在19:57

#4 楼

我发现通过尝试在其上使用探查器,我也可以在完美工作的代码段上准确生成该错误输出。

请注意,这是在Windows上进行的(分叉少了一点)优雅)。

我正在运行:

python -m profile -o output.pstats <script> 


,发现删除概要文件可以消除错误,并放置概要文件可以恢复错误。我也很生气,因为我知道以前的代码可以工作。我正在检查是否有更新过的pool.py ...然后有下沉的感觉并消除了配置文件,仅此而已。

评论


哇,谢谢你的提及!在最后一个小时左右,它使我发疯。我通过一个非常简单的示例尝试了所有操作-似乎没有任何效果。但是我也通过我的批处理文件运行了探查器:(

–蒂姆
16-10-14在11:37

哦,谢谢你这听起来确实很愚蠢,因为太出乎意料了。我认为应该在文档中提及。我所拥有的只是一个导入pdb语句,并且仅带有一个传递的简单顶级功能就无法“刺穿”。

– 0xc0de
5月21日18:50

#5 楼

multiprocessing出现此问题时,一个简单的解决方案是将Pool切换到ThreadPool。除了import-

from multiprocessing.pool import ThreadPool as Pool


之外,无需更改任何代码即可完成此操作,因为ThreadPool与主线程共享内存,而不是创建新进程,因此-意味着不需要酸洗。

这种方法的缺点是python并不是处理线程的最佳语言-它使用一种称为Global Interpreter Lock的方法来保持线程安全,这会减慢速度这里有一些用例。但是,如果您主要是与其他系统进行交互(运行HTTP命令,与数据库进行交谈,写入文件系统),则您的代码很可能不受CPU的束缚,因此不会受到太大的影响。实际上,在编写HTTP / HTTPS基准测试时,我发现此处使用的线程模型具有较少的开销和延迟,因为创建新进程的开销比创建新线程的开销高得多。

因此,如果要在python用户空间中处理大量内容,这可能不是最佳方法。

评论


但是,此时您仅使用一个CPU(至少与使用GIL的常规Python版本一起使用),这无法达到目的。

– Endre两者
19/12/27在19:35

这实际上取决于目的是什么。全局解释器锁定确实意味着一次只能运行一个实例,但是对于严重阻塞的操作(文件系统访问,下载大文件或多个文件,运行外部代码),GIL最终不是问题。在某些情况下,打开新进程(而不是线程)的开销超过了GIL的开销。

– tedivm
19/12/30在5:38

是的,谢谢。您仍然可能需要在答案中包括一个警告。如今,处理能力的提高主要以功能更多而不是功能更强大的CPU内核的形式出现,从多核执行切换到单核执行是一个相当大的副作用。

– Endre两者
19/12/30在8:27

好点-我已经用更多详细信息更新了答案。我确实要指出,尽管切换到线程多处理并不能使python仅在单个内核上起作用。

– tedivm
1月2日19:02

#6 楼


此解决方案仅需要安装dill,而无需安装其他库作为pathos


 def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items
 
/>
也适用于numpy数组。

#7 楼


Can't pickle <type 'function'>: attribute lookup __builtin__.function failed



如果传递给异步作业的模型对象内部有任何内置函数,也会发生此错误。

因此,请确保检查传递的模型对象没有内置函数。 (在我们的例子中,我们在模型内部使用django-model-utils的FieldTracker()函数来跟踪特定字段)。这是相关的GitHub问题的链接。

#8 楼

以@rocksportrocker解决方案为基础,
发送和接收结果时,请假意莳萝。