我正在使用API调用请求数据,然后将返回的数据存储到SQL Server中。我不确定如何共享从API调用的数据与函数返回的数据以将数据写入SQL Server
def req_Data(row,q):
"""
function to request data from the API
"""
for record in ds.request_realtime():
if record.RP_ENTITY_ID in RIC.keys():
row = [record.data['TIMESTAMP_TZ'],record.RP_STORY_ID,record.RP_ENTITY_ID,record.entity_name,RIC[record.RP_ENTITY_ID], round(record.event_sentiment_score,2),(record.relevance/100)]
q.put(row)
def write_data(q):
row1 = q.get()
cursor.execute('''INSERT INTO DB()
VALUES (?,?,?,?,?,?,?,?,?,?,?)''',row1)
cnxn.commit()
if __name__ == "__main__":
# creating thread
row = []
q = queue.Queue
t1 = threading.Thread(target=req_Data, name = 'Thread1', args=(row,q))
t2 = threading.Thread(target=write_data,name = 'Thread2', args=(q))
# starting thread 1
t1.start()
# starting thread 2
t2.start()
# wait until thread 1 is completely executed
t1.join()
# wait until thread 2 is completely executed
t2.join()
这不是一个很大的MCVE,但是我会尽力解决它(看到我无法自己测试(。要注意的事情:
- 您需要带有括号的
q = Queue()
来创建队列对象。 - 不需要
row = []
,您可以使用本地row
变量,如图所示 - 使用
q.task_done()
从Queue()
中删除项目,您可以在没有更多的项目中使用q.join()
来继续(而不是加入线程,但是如果您想要的话,也可以这样做(
(
在这些考虑方面,它看起来更像是:
import threading
from queue import Queue
import time
def req_Data(q):
""" function to request data from the API """
for record in ds.request_realtime():
if record.RP_ENTITY_ID in RIC.keys():
row = [record.data['TIMESTAMP_TZ'], record.RP_STORY_ID, record.RP_ENTITY_ID, record.entity_name, RIC[record.RP_ENTITY_ID], round(record.event_sentiment_score, 2), (record.relevance/100)]
q.put(row)
def write_data(q):
while True:
row = q.get()
cursor.execute('''INSERT INTO DB()
VALUES (?,?,?,?,?,?,?,?,?,?,?)''', row)
cnxn.commit()
q.task_done()
if __name__ == "__main__":
# creating thread
q = Queue() # you were missing the ()
t1 = threading.Thread(target=req_Data, name='Thread1', args=[q])
t2 = threading.Thread(target=write_data, name='Thread2', args=[q])
t1.start()
time.sleep(10) # give our queue some time to fill
t2.start()
q.join()
但是,如果我要使用多线程,我可能希望多个线程加载/卸载数据。由于要对您的脚本进行更深入的了解来加载数据,因此我将展示卸载数据的示例。它看起来像:
import threading
from queue import Queue
def req_Data(q):
""" function to request data from the API """
for record in ds.request_realtime():
if record.RP_ENTITY_ID in RIC.keys():
row = [record.data['TIMESTAMP_TZ'], record.RP_STORY_ID, record.RP_ENTITY_ID, record.entity_name, RIC[record.RP_ENTITY_ID], round(record.event_sentiment_score, 2), (record.relevance/100)]
q.put(row)
def write_data(q):
while True:
row = q.get()
cursor.execute('''INSERT INTO DB()
VALUES (?,?,?,?,?,?,?,?,?,?,?)''', row)
cnxn.commit()
q.task_done()
if __name__ == "__main__":
# creating thread
q = Queue() # you were missing the ()
req_Data(q)
# q is now full
workers = 10
thread_list = []
for i in range(workers):
t = threading.Thread(target=write_data, args=[q])
t.start()
thread_list.append(t)
q.join()
for thread in thread_list:
thread.join()
这是理想的,因为现在扩展工作线的#只不过是调整线workers = 10
线。您可能会使用此脚本运行10,000个线程(尽管可能不应该!您仍然在计算完成后仍在制作线程,这将是浪费CPU时间并减慢程序的速度(
希望这会有所帮助!