在Python中使用多线程方法将数据存储到BigQuery中的问题



我正在实现一个Python脚本,从Google BigQuery数据库中提取现有的用户数据,然后使用多线程方法为每个用户执行一些web抓取功能,最后将结果存储在BigQuery上的另一个表中。现有用户记录约为360万条,每个用户执行抓取操作最多需要40秒。我的目标是每天能够处理100000个用户,所以这就是为什么我需要一种并发处理方法。

我正在使用concurrent.futures模块中的ThreadPoolExecutor。在给定数量的线程完成它们的工作后,执行器应该将相应的一批结果存储回BigQuery中。我看到线程继续执行它们的web抓取功能。但在一定时间后(或使用大量线程(,它们将停止将记录存储回数据库中。

起初,我认为我正在处理一些与清除一批结果有关的竞争条件,但从那时起,我从threading模块中实现了BoundedSemaphore,以实现一种锁定方法,我相信它已经解决了最初的问题。但结果仍然不能可靠地存储回数据库中。也许我错过了什么?

我可以从那些在Python中处理并发处理方面有丰富经验的人那里得到一些帮助。具体来说,我在Heroku服务器上运行脚本,因此Heroku体验可能也会有所帮助。谢谢我的代码片段如下:

service = BigQueryService() # a custom class defined elsewhere
users = service.fetch_remaining_users(min_id=MIN_ID, max_id=MAX_ID, limit=LIMIT) # gets users from BigQuery
print("FETCHED UNIVERSE OF", len(users), "USERS")
with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:
batch = []
lock = BoundedSemaphore()
futures = [executor.submit(user_with_friends, row) for row in users]
print("FUTURE RESULTS", len(futures))
for index, future in enumerate(as_completed(futures)):
#print(index)
result = future.result()
# OK, so this locking business:
# ... prevents random threads from clearing the batch, which was causing results to almost never get stored, and
# ... restricts a thread's ability to acquire access to the batch until another one has released it
lock.acquire()
batch.append(result)
if (len(batch) >= BATCH_SIZE) or (index + 1 >= len(futures)): # when batch is full or is last
print("-------------------------")
print(f"SAVING BATCH OF {len(batch)}...")
print("-------------------------")
service.append_user_friends(batch) # stores the results in another table on BigQuery
batch = []
lock.release()

另请参阅:

https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

https://docs.python.org/3.7/library/threading.html#threading.BoundedSemaphore

因此,我最终使用了一种更可靠的方法(见下文(。旧的方法在线程之间协调以存储结果,而新的方法处理并存储每个线程的一个批。

def split_into_batches(all_users, batch_size=BATCH_SIZE):
"""h/t: https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks"""
for i in range(0, len(all_users), batch_size):
yield all_users[i : i + batch_size]
def process_and_save_batch(user_rows, bq):
print(generate_timestamp(), "|", current_thread().name, "|", "PROCESSING...")
bq.append_user_friends([user_with_friends(user_row) for user_row in user_rows])
print(generate_timestamp(), "|", current_thread().name, "|", "PROCESSED BATCH OF", len(user_rows))
return True
service = BigQueryService() # a custom class defined elsewhere
users = service.fetch_remaining_users(min_id=MIN_ID, max_id=MAX_ID, limit=LIMIT)
print("FETCHED UNIVERSE OF", len(users), "USERS")
batches = list(split_into_batches(users))
print(f"ASSEMBLED {len(batches)} BATCHES OF {BATCH_SIZE}")
with ThreadPoolExecutor(max_workers=MAX_THREADS, thread_name_prefix="THREAD") as executor:
for batch in batches:
executor.submit(process_and_save_batch, batch, service)

当我将线程数显著增加到2500这样的数字时,脚本几乎不会停止存储结果(我仍然想进一步研究这种行为(,但我能够以相对较低的线程数运行它,它正在完成任务。

最新更新