我有这段代码:
import urllib2, json, csv
import requests
import itertools
import multiprocessing
import numpy
from datetime import datetime, date, timedelta
def getTaxiTrips(date):
"""
Gets the taxi trips occurred in NY from a starting date.
:param date: (Y-m-d).
:return: list of tuples (long, lat, drop off date).
"""
today = str(datetime.date(datetime.now())).split('-')
today_y = today[0]
today_m = today[1]
start = date.split('-')
start_y = start[0]
start_m = start[1]
print start_m + "-" + start_y + " / " + today_m + "-" + today_y
data = []
y = int(start_y)
m = int(start_m)
while int(start_y) <= int(today_y):
# Month transformation
if m > 12:
m %= 12
y += 1
mt = str(m) if m > 9 else '0' + str(m)
# Green cabs
if readCSV("https://storage.googleapis.com/tlc-trip-data/" + str(y) +
"/green_tripdata_" + str(y) + "-" + mt + ".csv") is not None:
data.append("https://storage.googleapis.com/tlc-trip-data/" + str(y) +
"/green_tripdata_" + str(y) + "-" + mt + ".csv")
# Yellow cabs
if readCSV("https://storage.googleapis.com/tlc-trip-data/" + str(y) +
"/yellow_tripdata_" + str(y) + "-" + mt + ".csv") is not None:
data.append("https://storage.googleapis.com/tlc-trip-data/" + str(y) +
"/yellow_tripdata_" + str(y) + "-" + mt + ".csv")
if m == int(today_m):
break
m += 1
pool = multiprocessing.Pool(mps-1)
result = pool.map(consumeTaxiData, data)
pool.close()
pool.join()
return list(itertools.chain(*result))
def consumeTaxiData(url):
"""
Given a url, reads its content and process its data.
:param url: the url to be readen.
:return: a list of tuples in the form (long, lat, hour).
"""
print "Processing", url
points = []
data = readCSV(url)
for line in data:
latitude = line.get('dropoff_latitude', None)
if latitude is None:
latitude = line.get('Dropoff_latitude', None)
longitude = line.get('dropoff_longitude', None)
if longitude is None:
longitude = line.get('Dropoff_longitude', None)
time = line.get('tpep_dropoff_datetime', None)
if time is None:
time = line.get('Lpep_dropoff_datetime', None)
if time is not None:
time = datetime.strptime(time, '%Y-%m-%d %H:%M:%S')
if latitude is not None and longitude is not None and time >= datetime.strptime(date, '%Y-%m-%d') and
time.weekday():
time = roundTime(time, roundTo=60 * 60).hour
points.append((float(longitude), float(latitude), time))
return points
def readCSV(url):
"""
Read a csv file.
:param url: url to be read.
:return: an array of dictionaries.
"""
try:
response = urllib2.urlopen(url)
return csv.DictReader(response, delimiter=',')
except urllib2.HTTPError as e:
return None
def roundTime(dt=None, roundTo=60):
"""
Round a datetime object to any time laps in seconds
:param dt: datetime.datetime object, default now.
:param roundTo: closest number of seconds to round to, default 1 minute.
:return: the rounded time.
"""
if dt == None : dt = datetime.now()
seconds = (dt - dt.min).seconds
rounding = (seconds+roundTo/2) // roundTo * roundTo
return dt + timedelta(0, rounding-seconds, -dt.microsecond)
if __name__ == '__main__':
mps = multiprocessing.cpu_count()
date = str(datetime.date(datetime.now()) - timedelta(31*8))
print "-----> Inital date:", date
print "-----> Getting taxi data..."
taxi_dropoffs = getTaxiTrips(date)
print len(taxi_dropoffs), "taxi trips"
这对于这些数据来说效果很好:
https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-06.csv
https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-06.csv
现在我正在尝试处理更多数据:
https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-06.csv
https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-07.csv
https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-08.csv
https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-08.csv
https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-07.csv
https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-06.csv
https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-09.csv
我不断收到此消息:
Traceback (most recent call last):
File "noiseInference.py", line 489, in <module>
taxi_dropoffs = getTaxiTrips(date)
File "noiseInference.py", line 300, in getTaxiTrips
result = pool.map(consumeTaxiData, data)
File "/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 251, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
socket.error: [Errno 60] Operation timed out
因为每个.csv
文件都很大,而且我正在处理很多文件,所以我预计处理方法需要一些时间。但是,处理正在终止。如何解决此问题?
数据来自这里: http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml
multiprocessing.Pool
的输出通常具有很大的误导性。
回溯:
return self.map_async(func, iterable, chunksize).get()
File "/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
socket.error: [Errno 60] Operation timed out
显示引发存储在其_value
属性中的错误的map_async.get
。此属性仅包含在此过程中引发的错误。错误很明显:套接字操作超时。
为了更好地了解问题所在,我建议您在multiprocessing.Pool.map_async
之外重现它。