Pushshift Reddit网络抓取循环中的代码效率/性能改进



我正在通过Pushshift API提取Reddit数据。更确切地说,我对从现在到日期时间Z在reddit X子版块中搜索词为Y的评论和帖子(提交(感兴趣(例如,在reddit/rwallstreetbets子版块中提到"GME"的所有评论(。所有这些参数都可以指定。到目前为止,我使用了以下代码:

import pandas as pd
import requests
from datetime import datetime
import traceback
import time
import json
import sys
import numpy as np
username = ""  # put the username you want to download in the quotes
subreddit = "gme"  # put the subreddit you want to download in the quotes
search_query = "gamestop" # put the word you want to search for (present in comment or post) in the quotes
# leave either one blank to download an entire user's, subreddit's, or search word's history
# or fill in all to download a specific users history from a specific subreddit mentioning a specific word
filter_string = None
if username == "" and subreddit == "" and search_query == "":
print("Fill in either username or subreddit")
sys.exit(0)
elif username == "" and subreddit != "" and search_query == "":
filter_string = f"subreddit={subreddit}"
elif username != "" and subreddit == "" and search_query == "":
filter_string = f"author={username}"
elif username == "" and subreddit != "" and search_query != "":
filter_string = f"subreddit={subreddit}&q={search_query}"
elif username == "" and subreddit == "" and search_query != "":
filter_string = f"q={search_query}"    
else:
filter_string = f"author={username}&subreddit={subreddit}&q={search_query}"
url = "https://api.pushshift.io/reddit/search/{}/?size=500&sort=desc&{}&before="
start_time = datetime.utcnow()
def redditAPI(object_type):
global df_comments    
df_comments = pd.DataFrame(columns=["date", "comment", "score", "id"])
global df_posts    
df_posts = pd.DataFrame(columns=["date", "post", "score", "id"])      
print(f"nLooping through {object_type}s and append to dataframe...")
count = 0
previous_epoch = int(start_time.timestamp())
while True:
# Ensures that loop breaks at March 16 2021 for testing purposes
if previous_epoch <= 1615849200:
break

new_url = url.format(object_type, filter_string)+str(previous_epoch)
json_text = requests.get(new_url)
time.sleep(1)  # pushshift has a rate limit, if we send requests too fast it will start returning error messages
try:
json_data = json.loads(json_text.text)
except json.decoder.JSONDecodeError:
time.sleep(1)
continue
if 'data' not in json_data:
break
objects = json_data['data']
if len(objects) == 0:
break

df2 = pd.DataFrame.from_dict(objects)
for object in objects:
previous_epoch = object['created_utc'] - 1
count += 1
if object_type == "comment":
df2.rename(columns={'created_utc': 'date', 'body': 'comment'}, inplace=True)
df_comments = df_comments.append(df2[['date', 'comment', 'score']])               
elif object_type == "submission":
df2.rename(columns={'created_utc': 'date', 'selftext': 'post'}, inplace=True)
df_posts = df_posts.append(df2[['date', 'post', 'score']])

# Convert UNIX to datetime
df_comments["date"] = pd.to_datetime(df_comments["date"],unit='s')
df_posts["date"] = pd.to_datetime(df_posts["date"],unit='s')

# Drop blank rows (the case when posts only consists of an image)
df_posts['post'].replace('', np.nan, inplace=True)
df_posts.dropna(subset=['post'], inplace=True)

# Drop duplicates (see last comment on https://www.reddit.com/r/pushshift/comments/b7onr6/max_number_of_results_returned_per_query/)
df_comments = df_comments.drop_duplicates()
df_posts = df_posts.drop_duplicates()
print("nDone. Saved to dataframe.")

不幸的是,我确实有一些性能问题。由于我基于created_utc-1进行分页(并且由于我不想错过任何评论/帖子(,初始数据帧将包含重复的内容(因为不会有100个(=API限制(新的评论/帖子每新的一秒(。如果我长时间运行代码(例如当前时间-2021年3月1日(,这将导致一个巨大的数据帧,需要相当长的时间来处理。

正如现在的代码一样,重复项被添加到数据帧中,只有在循环之后,它们才会被删除。有没有办法提高效率?例如,在for循环中检查对象是否已经存在于数据帧中?就性能而言,这会有什么不同吗?如有任何意见,我们将不胜感激。

可以查询数据,这样一来就不会有重复的数据。

您正在使用API的before参数,只允许获取严格在时间戳之前的记录。这意味着我们可以在每次迭代中将我们已经拥有的最早记录的时间戳作为before发送。在这种情况下,作为回应,我们只会有我们还没有看到的记录,所以没有重复。

在看起来像这样的代码中:

import pandas as pd
import requests
import urllib
import time
import json
def get_data(object_type, username='', subreddit='', search_query='', max_time=None, min_time=1615849200):
# start from current time if not specified
if max_time is None:
max_time = int(time.time())
# generate filter string
filter_string = urllib.parse.urlencode(
{k: v for k, v in zip(
['author', 'subreddit', 'q'],
[username, subreddit, search_query]) if v != ""})
url_format = "https://api.pushshift.io/reddit/search/{}/?size=500&sort=desc&{}&before={}"
before = max_time
df = pd.DataFrame()

while before > min_time:
url = url_format.format(object_type, filter_string, before)
resp = requests.get(url)
# convert records to dataframe
dfi = pd.json_normalize(json.loads(resp.text)['data'])

if object_type == 'comment':
dfi = dfi.rename(columns={'created_utc': 'date', 'body': 'comment'})
df = pd.concat([df, dfi[['id', 'date', 'comment', 'score']]])
elif object_type == 'submission':
dfi = dfi.rename(columns={'created_utc': 'date', 'selftext': 'post'})
dfi = dfi[dfi['post'].ne('')]
df = pd.concat([df, dfi[['id', 'date', 'post', 'score']]])
# set `before` to the earliest comment/post in the results
# next time we call requests.get(...) we will only get comments/posts before
# the earliest that we already have, thus not fetching any duplicates
before = dfi['date'].min()
# if needed
# time.sleep(1)

return df

通过获取注释和检查重复值进行测试(通过id(:

username = ""
subreddit = "gme"
search_query = "gamestop"
df_comments = get_data(
object_type='comment',
username=username,
subreddit=subreddit,
search_query=search_query)
df_comments['id'].duplicated().any()    # False
df_comments['id'].nunique()             # 2200

我建议使用bloom过滤器来检查值是否已经通过。

PyPi上有一个包,它可以很容易地实现这一点。要使用bloom过滤器,您只需要添加一个";键";对于过滤器,这可以是用户名和注释的组合。通过这种方式,您可以检查是否已将注释添加到数据框中。我建议您在方法中尽早使用bloom过滤器,即在从API获得响应之后。

最新更新