在 Windows 上使用多处理时出现"无法腌制<键入'_csv.reader'>"错误



我正在编写一个多进程程序来并行处理一个大的。csv文件,使用Windows。

我发现了一个关于类似问题的极好的例子。当在Windows下运行它时,我收到一个错误,csv。

我想我可以在阅读器子进程中打开CSV文件,然后从父进程将文件名发送给它。但是,我想传递一个已经打开的CSV文件(就像代码应该做的那样),具有特定的状态,即真正使用共享对象。

有谁知道在Windows下怎么做或者缺少什么吗?

这是代码(为了方便阅读,我重新发布):

"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""
import csv
import multiprocessing
import optparse
import sys
NUM_PROCS = multiprocessing.cpu_count()
def make_cli_parser():
    """Make the command line interface parser."""
    usage = "nn".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser
class CSVWorker(object):
    def __init__(self, numprocs, infile, outfile):
        self.numprocs = numprocs
        self.infile = open(infile)
        self.outfile = outfile
        self.in_csvfile = csv.reader(self.infile)
        self.inq = multiprocessing.Queue()
        self.outq = multiprocessing.Queue()
        self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())
        self.pout = multiprocessing.Process(target=self.write_output_csv, args=())
        self.ps = [ multiprocessing.Process(target=self.sum_row, args=())
                        for i in range(self.numprocs)]
        self.pin.start()
        self.pout.start()
        for p in self.ps:
            p.start()
        self.pin.join()
        i = 0
        for p in self.ps:
            p.join()
            print "Done", i
            i += 1
        self.pout.join()
        self.infile.close()
    def parse_input_csv(self):
            """Parses the input CSV and yields tuples with the index of the row
            as the first element, and the integers of the row as the second
            element.
            The index is zero-index based.
            The data is then sent over inqueue for the workers to do their
            thing.  At the end the input thread sends a 'STOP' message for each
            worker.
            """
            for i, row in enumerate(self.in_csvfile):
                row = [ int(entry) for entry in row ]
                self.inq.put( (i, row) )
            for i in range(self.numprocs):
                self.inq.put("STOP")
    def sum_row(self):
        """
        Workers. Consume inq and produce answers on outq
        """
        tot = 0
        for i, row in iter(self.inq.get, "STOP"):
                self.outq.put( (i, sum(row)) )
        self.outq.put("STOP")
    def write_output_csv(self):
        """
        Open outgoing csv file then start reading outq for answers
        Since I chose to make sure output was synchronized to the input there
        is some extra goodies to do that.
        Obviously your input has the original row number so this is not
        required.
        """
        cur = 0
        stop = 0
        buffer = {}
        # For some reason csv.writer works badly across threads so open/close
        # and use it all in the same thread or else you'll have the last
        # several rows missing
        outfile = open(self.outfile, "w")
        self.out_csvfile = csv.writer(outfile)
        #Keep running until we see numprocs STOP messages
        for works in range(self.numprocs):
            for i, val in iter(self.outq.get, "STOP"):
                # verify rows are in order, if not save in buffer
                if i != cur:
                    buffer[i] = val
                else:
                    #if yes are write it out and make sure no waiting rows exist
                    self.out_csvfile.writerow( [i, val] )
                    cur += 1
                    while cur in buffer:
                        self.out_csvfile.writerow([ cur, buffer[cur] ])
                        del buffer[cur]
                        cur += 1
        outfile.close()
def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")
    c = CSVWorker(opts.numprocs, args[0], args[1])
if __name__ == '__main__':
    main(sys.argv[1:])

当在windows下运行时,我收到的错误是:

Traceback (most recent call last):
  File "C:Usersron.bermanDocumentsAttributionubrShapleytest.py", line 130, in <module>
    main(sys.argv[1:])
  File "C:Usersron.bermanDocumentsAttributionubrShapleytest.py", line 127, in main
    c = CSVWorker(opts.numprocs, args[0], args[1])
  File "C:Usersron.bermanDocumentsAttributionubrShapleytest.py", line 44, in __init__
    self.pin.start()
  File "C:Python27libmultiprocessingprocess.py", line 130, in start
    self._popen = Popen(self)
  File "C:Python27libmultiprocessingforking.py", line 271, in __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "C:Python27libmultiprocessingforking.py", line 193, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:Python27libpickle.py", line 224, in dump
    self.save(obj)
  File "C:Python27libpickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:Python27libpickle.py", line 419, in save_reduce
    save(state)
  File "C:Python27libpickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:Python27libpickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:Python27libpickle.py", line 681, in _batch_setitems
    save(v)
  File "C:Python27libpickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:Python27libmultiprocessingforking.py", line 66, in dispatcher
    self.save_reduce(obj=obj, *rv)
  File "C:Python27libpickle.py", line 401, in save_reduce
    save(args)
  File "C:Python27libpickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:Python27libpickle.py", line 548, in save_tuple
    save(element)
  File "C:Python27libpickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:Python27libpickle.py", line 419, in save_reduce
    save(state)
  File "C:Python27libpickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:Python27libpickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "C:Python27libpickle.py", line 681, in _batch_setitems
    save(v)
  File "C:Python27libpickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "C:Python27libpickle.py", line 396, in save_reduce
    save(cls)
  File "C:Python27libpickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:Python27libpickle.py", line 753, in save_global
    (obj, module, name))
pickle.PicklingError: Can't pickle <type '_csv.reader'>: it's not the same object as _csv.reader
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:Python27libmultiprocessingforking.py", line 374, in main
    self = load(from_parent)
  File "C:Python27libpickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:Python27libpickle.py", line 858, in load
    dispatch[key](self)
  File "C:Python27libpickle.py", line 880, in load_eof
    raise EOFError
EOFError

你遇到的问题是由使用CSVWorker类的方法作为过程目标引起的;这个类的成员是不能被腌制的;那些打开的文件永远不会起作用;

你要做的是把这个类分成两个类;一个负责协调所有工作子进程,另一个负责实际的计算工作。工作进程将文件名作为参数,并根据需要打开各个文件,或者至少等到调用了它们的工作方法后再打开文件。它们也可以将multiprocessing.Queue作为参数或作为实例成员;这是安全的传递。

在某种程度上,你已经这样做了;您的write_output_csv方法正在打开子进程中的文件,但是您的parse_input_csv方法期望找到一个已经打开并准备好的文件作为self的属性。坚持用另一种方法做,你应该会有良好的身材。

由于多处理依赖于序列化和反序列化对象,当在进程之间传递它们作为参数时,并且您的代码依赖于在进程周围传递CSVWorker的实例(该实例表示为'self'),因此您得到了此错误-因为csv阅读器和打开的文件都可以被pickle。

你提到你的CSV很大,我不认为将所有数据读取到列表中对你来说是一个解决方案-所以你必须想到一种方法,从你的输入CSV中一次性传递一行到每个worker,并从每个worker中检索已处理的行,并在主进程中执行所有I/O。

看起来像是多处理。池将是编写应用程序的更好方式请在http://docs.python.org/library/multiprocessing.html上查看多处理文档,并尝试使用进程池和池。映射来处理您的CSV。它还负责保持顺序——这将消除代码中许多复杂的逻辑。

您正在尝试为cpu绑定的任务获得加速,这在Python中由于GIL很难实现。我认为您应该使用单个线程一次读取10000行,然后释放线程将它们处理为包含10000个结果的第二个列表。然后将结果写入输出文件。用于cpu绑定操作的线程选项:

  • 多进程库中的进程。
  • 安装pyspark库和JDK将行列表转换为RDD。使用RDD的map()方法获取输出行列表
  • 把你的程序移到c++/c#/Java。

相关内容

最新更新