在 Python 中使用进程时为空的列表



我正在尝试使用多处理库来加快从文件中读取CSV的速度。我已经使用 Pool 这样做了,现在我正在尝试使用 Process(( 来做到这一点。但是,在连接列表以创建数据帧时,它给了我以下错误:

值错误:没有要连接的对象

对我来说,这些进程似乎正在覆盖uber_data列表。我在这里错过了什么?

import glob
import pandas as pd
from multiprocessing import Process
import matplotlib.pyplot as plt
import os
location = "/home/data/csv/"
uber_data = []
def read_csv(filename):
return uber_data.append(pd.read_csv(filename))
def data_wrangling(uber_data):
uber_data['Date/Time'] = pd.to_datetime(uber_data['Date/Time'], format="%m/%d/%Y %H:%M:%S")
uber_data['Dia Setmana'] = uber_data['Date/Time'].dt.weekday_name
uber_data['Num dia'] = uber_data['Date/Time'].dt.dayofweek
return uber_data
def plotting(uber_data):
weekdays = uber_data.pivot_table(index=['Num dia','Dia Setmana'], values='Base', aggfunc='count')
weekdays.plot(kind='bar', figsize=(8,6))
plt.ylabel('Total Journeys')
plt.title('Journey on Week Day')
def main():
processes = []
files = list(glob.glob(os.path.join(location,'*.csv*')))
for file in files:
print(file)
p = Process(target=read_csv, args=[file])
processes.append(p)
p.start()
for i, process in enumerate(processes):
process.join()
print(uber_data)
combined_df = pd.concat(uber_data, ignore_index=True)
dades_mod = data_wrangling(combined_df)
Plotting(dades_mod)
main()

回溯是:

Process Process-223:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "<timed exec>", line 17, in read_csv
File "/usr/local/lib/python3.6/dist-packages/pandas/core/reshape/concat.py", line 255, in concat
sort=sort,
File "/usr/local/lib/python3.6/dist-packages/pandas/core/reshape/concat.py", line 301, in __init__
objs = list(objs)
TypeError: 'NoneType' object is not iterable
Process Process-224:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "<timed exec>", line 17, in read_csv
File "/usr/local/lib/python3.6/dist-packages/pandas/core/reshape/concat.py", line 255, in concat
sort=sort,
File "/usr/local/lib/python3.6/dist-packages/pandas/core/reshape/concat.py", line 301, in __init__
objs = list(objs)
TypeError: 'NoneType' object is not iterable
Process Process-221:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "<timed exec>", line 17, in read_csv
File "/usr/local/lib/python3.6/dist-packages/pandas/core/reshape/concat.py", line 255, in concat
sort=sort,
File "/usr/local/lib/python3.6/dist-packages/pandas/core/reshape/concat.py", line 301, in __init__
objs = list(objs)
TypeError: 'NoneType' object is not iterable
Process Process-222:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
Process Process-225:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "<timed exec>", line 17, in read_csv
File "/usr/local/lib/python3.6/dist-packages/pandas/core/reshape/concat.py", line 255, in concat
sort=sort,
File "<timed exec>", line 17, in read_csv
File "/usr/local/lib/python3.6/dist-packages/pandas/core/reshape/concat.py", line 301, in __init__
objs = list(objs)
TypeError: 'NoneType' object is not iterable
File "/usr/local/lib/python3.6/dist-packages/pandas/core/reshape/concat.py", line 255, in concat
sort=sort,
File "/usr/local/lib/python3.6/dist-packages/pandas/core/reshape/concat.py", line 301, in __init__
objs = list(objs)
TypeError: 'NoneType' object is not iterable
Process Process-220:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "<timed exec>", line 17, in read_csv
File "/usr/local/lib/python3.6/dist-packages/pandas/core/reshape/concat.py", line 255, in concat
sort=sort,
File "/usr/local/lib/python3.6/dist-packages/pandas/core/reshape/concat.py", line 301, in __init__
objs = list(objs)
TypeError: 'NoneType' object is not iterable
[]
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<timed eval> in <module>
<timed exec> in main()
/usr/local/lib/python3.6/dist-packages/pandas/core/reshape/concat.py in concat(objs, axis, join, join_axes, ignore_index, keys, levels, names, verify_integrity, sort, copy)
253         verify_integrity=verify_integrity,
254         copy=copy,
--> 255         sort=sort,
256     )
257 
/usr/local/lib/python3.6/dist-packages/pandas/core/reshape/concat.py in __init__(self, objs, axis, join, join_axes, keys, levels, names, ignore_index, verify_integrity, copy, sort)
302 
303         if len(objs) == 0:
--> 304             raise ValueError("No objects to concatenate")
305 
306         if keys is None:
ValueError: No objects to concatenate

谢谢

每个进程中的uber_data与主进程中的uber_data不是同一个对象。 您实际上无法在进程之间共享数据。

from multiprocessing import Process
def read_csv(filename=r'c:pyProjectsdata.csv'):
print(id(uber_data))
uber_data.append(pd.read_csv(filename))
def main():
processes = []
for file in range(4):
p = Process(target=read_csv)
processes.append(p)
p.start()
for i, process in enumerate(processes):
process.join()
return processes
if __name__ == '__main__':
uber_data = []
print(id(uber_data))
ps = main()

指纹

PS C:pyProjects> py -m tmp
2632505050432
1932359777344
2230288136512
2039196563648
2479121315968

您可以使用队列将数据发送回主进程。

from multiprocessing import Process, Queue
def read_csv(filename=r'c:pyProjectsdata.csv', q=None):
q.put(pd.read_csv(filename))
def main(q):
processes = []
for file in range(4):
p = Process(target=read_csv, kwargs={'q':q})
processes.append(p)
p.start()
for i, process in enumerate(processes):
process.join()
while not q.empty():
print('.')
uber_data.append(q.get(block=True))
return processes

if __name__ == '__main__':
uber_data = []
q = Queue()
ps = main(q)
for thing in uber_data:
print(thing.head().to_string())
print('**')

或者你可以使用线程。

from threading import Thread
def g(filename):
uber_data.append(pd.read_csv(filename))
if __name__ == '__main__':
uber_data = []
threads = []
for _ in range(4):
threads.append(Thread(target=g, args=(r'c:pyProjectsdata.csv',)))
for t in threads:
t.start()
while any(t.is_alive() for t in threads):
pass
for thing in uber_data:
print(thing.head().to_string())
print('**')

最新更新