我尝试并行化一个scraper。不幸的是,当我执行此代码时,它运行的时间异常长。直到我停下来。也没有生成输出。这里有我错过的东西吗?我使用操作系统的问题是什么?
首先定义函数,然后加载数据池,然后将其输入到多进程中。
总而言之,我想这样做:
def cube(x):
return x**3
pool = mp.Pool(processes=2)
results = pool.map(cube, range(1,7))
print(results)
但这个小计算现在已经运行了5分钟以上。所以我认为代码本身没有错误。而是我如何理解多处理
from multiprocessing import Pool
import os
import json
import datetime
from dateutil.relativedelta import relativedelta
import re
os.chdir(r'C:Usersfinal_tweets_de')
p = Pool(5)
import time
def get_id(data_tweets):
for i in range(len(data_tweets)):
account = data_tweets[i]['user_screen_name']
created = datetime.datetime.strptime(data_tweets[i]['date'], '%Y-%m-%d').date()
until = created + relativedelta(days=10)
id = data_tweets[i]['id']
filename = re.search(r'(.*).json',file).group(1) + '_' + 'tweet_id_' +str(id)+ '_' + 'user_id_' + str(data_tweets[i]['user_id'])
os.system('snscrape twitter-search "(to:'+account+') since:'+created.strftime("%Y-%m-%d")+' until:'+until.strftime("%Y-%m-%d")+' filter:replies" >C:\Users\test_'+filename)
directory =r'C:Usersfinal_tweets_de'
path= r'C:Usersfinal_tweets_de'
for file in os.listdir(directory):
fh = open(os.path.join(path, file),'r')
print(file)
with open(file, 'r', encoding='utf-8') as json_file:
data_tweets = json.load(json_file)
data_tweets = data_tweets[0:5]
start = time.time()
print("start")
p.map(get_id, data_tweets)
p.terminate()
p.join()
end = time.time()
print(end - start)
更新
代码没有运行的原因首先是@Booboo解决的问题。另一个是,在使用windows时,在多处理的情况下,必须通过cmd启动脚本。
喜欢这里:Python多处理示例不工作
现在我的关键错误0。如果我运行代码。
import multiprocessing as mp
import os
import json
import datetime
from dateutil.relativedelta import relativedelta
import re
os.chdir(r'C:UsersPaulDocumentsUniMasterarbeitDatengewinnungfinal_tweets_de')
import time
def get_id(data_tweets):
for i in range(len(data_tweets)):
print(i)
account = data_tweets[i]['user_screen_name']
created = datetime.datetime.strptime(data_tweets[i]['date'], '%Y-%m-%d').date()
until = created + relativedelta(days=10)
id = data_tweets[i]['id']
filename = re.search(r'(.*).json',file).group(1) + '_' + 'tweet_id_' +str(id)+ '_' + 'user_id_' + str(data_tweets[i]['user_id'])
try:
os.system('snscrape twitter-search "(to:'+account+') since:'+created.strftime("%Y-%m-%d")+' until:'+until.strftime("%Y-%m-%d")+' filter:replies" >C:\Users\Paul\Documents\Uni\Masterarbeit\Datengewinnung\Tweets_antworten\Antworten\test_'+filename)
except:
continue
directory =r'C:UsersPaulDocumentsUniMasterarbeitDatengewinnungfinal_tweets_de'
path= r'C:UsersPaulDocumentsUniMasterarbeitDatengewinnungfinal_tweets_de'
for file in os.listdir(directory):
fh = open(os.path.join(path, file),'r')
print(file)
with open(file, 'r', encoding='utf-8') as json_file:
data_tweets = json.load(json_file)
data_tweets = data_tweets[0:2]
start = time.time()
print("start")
if __name__ == '__main__':
pool = mp.Pool(processes=2)
pool.map(get_id, data_tweets)
end = time.time()
print(end - start)
del(data_tweets)
输出:
(NLP 2) C:UsersPaulDocumentsUniMasterarbeitDatengewinnungTweets_antworten>python scrape_id_antworten_parallel.py
corona.json
start
corona.json
corona.json
start
0.0009980201721191406
coronavirus.json
start
0.0
coronavirus.json
start
0.0
covid.json
start
0.0
SARS_CoV.json
start
0.0
0
0
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "C:UsersPaulAnaconda3envsNLP 2libmultiprocessingpool.py", line 121, in worker
result = (True, func(*args, **kwds))
File "C:UsersPaulAnaconda3envsNLP 2libmultiprocessingpool.py", line 44, in mapstar
return list(map(*args))
File "C:UsersPaulDocumentsUniMasterarbeitDatengewinnungTweets_antwortenscrape_id_antworten_parallel.py", line 25, in get_id
account = data_tweets[i]['user_screen_name']
KeyError: 0
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "scrape_id_antworten_parallel.py", line 60, in <module>
pool.map(get_id, data_tweets)
File "C:UsersPaulAnaconda3envsNLP 2libmultiprocessingpool.py", line 268, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "C:UsersPaulAnaconda3envsNLP 2libmultiprocessingpool.py", line 657, in get
raise self._value
KeyError: 0
我可以从path= r'C:Usersfinal_tweets_de'
中看到您的平台是Windows。当您在Windows下进行多处理时,创建子进程的代码必须在如下块中执行:
import multiprocessing as mp
def cube(x):
return x**3
if __name__ == '__main__':
pool = mp.Pool(processes=2)
results = pool.map(cube, range(1,7))
print(results)
否则,您将进入一个递归循环,其中子进程将尝试创建一个新的池和新的子进程无限。修复此问题并重新测试。最简单的方法是将代码封装在一个函数中(例如称为main
(,然后添加:
if __name__ == '__main_':
main()
此外,为什么在实际示例中只使用2个或5个过程。通过不为Pool
构造函数指定参数,您将创建一个等于计算机上实际可用处理器数的池大小。这是一个不错的默认。
让我分享一个检索未来学家推文的工作示例。请注意,我们希望多线程而不是多处理并行化I/O操作。
# install social media scrapper: !pip3 install snscrape
import snscrape.modules.twitter as sntwitter
import itertools
import multiprocessing as mp
import datetime
import pandas as pd
start_date = datetime.datetime(2023,2,1,tzinfo=datetime.timezone.utc) # from when
attributes = ('date','url','rawContent') # what attributes to keep
def get_tweets(username,n_tweets=None,attributes=attributes):
tweets = sntwitter.TwitterSearchScraper(f'from:{username}').get_items() # invoke the scrapper
tweets = itertools.islice(tweets,n_tweets) # stopped when the count reached
tweets = itertools.takewhile(lambda t:t.date>=start_date, tweets) # stop when date passed
tweets = map(lambda t: (username,)+tuple(getattr(t,a) for a in attributes),tweets) # keep only attributes needed
tweets = list(tweets) # the result has to be pickle'able
return tweets
# prepare a list of accounts to scrape
user_names = ['kevin2kelly','briansolis','PeterDiamandis','Richard_Florida']
# parallelise queries for speed !
with mp.Pool(4) as p:
results = p.map(get_tweets, user_names)
# combine results
results = list(itertools.chain(*results))