我正在尝试了解多处理和池来处理我在MySQL数据库中的一些推文。下面是代码和错误消息。
import multiprocessing
import sqlalchemy
import pandas as pd
import config
from nltk import tokenize as token
q = multiprocessing.Queue()
engine = sqlalchemy.create_engine(config.sqlConnectionString)
def getRow(pandasSeries):
df = pd.DataFrame()
tweetTokenizer = token.TweetTokenizer()
print(pandasSeries.loc['BODY'], "n", type(pandasSeries.loc['BODY']))
for tokens in tweetTokenizer.tokenize(pandasSeries.loc['BODY']):
df = df.append(pd.Series(data=[pandasSeries.loc['ID'], tokens, pandasSeries.loc['AUTHOR'],
pandasSeries.loc['RETWEET_COUNT'], pandasSeries.loc['FAVORITE_COUNT'],
pandasSeries.loc['FOLLOWERS_COUNT'], pandasSeries.loc['FRIENDS_COUNT'],
pandasSeries.loc['PUBLISHED_AT']],
index=['id', 'tweet', 'author', 'retweet', 'fav', 'followers', 'friends',
'published_at']), ignore_index=True)
df.to_sql(name="tweet_tokens", con=engine, if_exists='append')
if __name__ == '__main__':
##LOADING SQL INTO DATAFRAME##
databaseData = pd.read_sql_table(config.tweetTableName, engine)
pool = multiprocessing.Pool(6)
for row in databaseData.iterrows():
print(row)
pool.map(getRow, row)
pool.close()
q.close()
q.join_thread()
"""
OUPUT
C:UsersDefAnaconda3python.exe C:/Users/Def/Dropbox/Dissertation/testThreadCopy.py
(0, ID 3247
AUTHOR b'Elon Musk News'
RETWEET_COUNT 0
FAVORITE_COUNT 0
FOLLOWERS_COUNT 20467
FRIENDS_COUNT 14313
BODY Elon Musk Takes an Adorable 5th Grader's Idea ...
PUBLISHED_AT 2017-03-03 00:00:01
Name: 0, dtype: object)
Elon Musk Takes an Adorable 5th Grader's
<class 'str'>
multiprocessing.pool.RemoteTraceback:
Traceback (most recent call last):
File "C:UsersDefAnaconda3libmultiprocessingpool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "C:UsersDefAnaconda3libmultiprocessingpool.py", line 44, in mapstar
return list(map(*args))
File "C:UsersDefDropboxDissertationtestThreadCopy.py", line 16, in getRow
print(pandasSeries.loc['BODY'], "n", type(pandasSeries.loc['BODY']))
AttributeError: 'numpy.int64' object has no attribute 'loc'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:/Users/Def/Dropbox/Dissertation/testThreadCopy.py", line 34, in <module>
pool.map(getRow, row)
File "C:UsersDefAnaconda3libmultiprocessingpool.py", line 260, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "C:UsersDefAnaconda3libmultiprocessingpool.py", line 608, in get
raise self._value
AttributeError: 'numpy.int64' object has no attribute 'loc'
Process finished with exit code 1
"""
我不明白的是为什么它会打印出第一个系列然后崩溃?为什么它说pandasSeries.loc['BODY']是numpy.int64类型,而打印出来说它是字符串类型?我敢肯定,如果您在其他一些地方出错了,如果您能看到在哪里,请您指出来。谢谢。
当我构造一个简单的数据帧时:
frame
0 1 2 3
0 0 1 2 3
1 4 5 6 7
2 8 9 10 11
并迭代两次,我得到:
for row in databaseData.iterrows():
for i in row:
print(i, type(i))
该内部循环生成 2 个项目、一个行索引/标签和一个包含值的系列。
0 <class 'numpy.int64'>
0 0
1 1
2 2
3 3
Name: 0, dtype: int32 <class 'pandas.core.series.Series'>
您的map
执行相同的操作,将数字索引发送到一个进程(产生错误(,并将序列发送到另一个进程。
如果我使用没有for row
的pool.map
:
pool.map(getRow, databaseData.iterrows())
然后getRow
接收一个 2 元素元组。
def getRow(aTuple):
rowlbl, rowSeries = aTuple
print(rowSeries)
...
您的print(row)
显示此元组;它更难看到,因为系列部分是多行的。 如果我添加一个,可能会更清楚
(0, # row label
ID 3247 # multiline Series
AUTHOR b'Elon Musk News'
RETWEET_COUNT 0
....
Name: 0, dtype: object)