尝试使用多线程使用API调用请求数据并将数据存储在数据库中



我正在使用API调用请求数据,然后将返回的数据存储到SQL Server中。我不确定如何共享从API调用的数据与函数返回的数据以将数据写入SQL Server

def req_Data(row,q): 
    """ 
    function to request data from the API
    """
    for record in ds.request_realtime():
        if record.RP_ENTITY_ID in RIC.keys():
            row = [record.data['TIMESTAMP_TZ'],record.RP_STORY_ID,record.RP_ENTITY_ID,record.entity_name,RIC[record.RP_ENTITY_ID], round(record.event_sentiment_score,2),(record.relevance/100)]
        q.put(row)
def write_data(q): 
      row1 = q.get()
      cursor.execute('''INSERT INTO DB()
                       VALUES (?,?,?,?,?,?,?,?,?,?,?)''',row1)
      cnxn.commit()
if __name__ == "__main__": 
    # creating thread 
    row = []
    q = queue.Queue
    t1 = threading.Thread(target=req_Data, name = 'Thread1', args=(row,q)) 
    t2 = threading.Thread(target=write_data,name = 'Thread2', args=(q)) 
    # starting thread 1 
    t1.start() 
    # starting thread 2 
    t2.start() 
    # wait until thread 1 is completely executed 
    t1.join() 
    # wait until thread 2 is completely executed 
    t2.join() 

这不是一个很大的MCVE,但是我会尽力解决它(看到我无法自己测试(。要注意的事情:

  1. 您需要带有括号的q = Queue()来创建队列对象。
  2. 不需要row = [],您可以使用本地row变量,如图所示
  3. 使用q.task_done()Queue()中删除项目,您可以在没有更多的项目中使用q.join()来继续(而不是加入线程,但是如果您想要的话,也可以这样做(
  4. (

在这些考虑方面,它看起来更像是:

import threading
from queue import Queue
import time

def req_Data(q):
    """ function to request data from the API """
    for record in ds.request_realtime():
        if record.RP_ENTITY_ID in RIC.keys():
            row = [record.data['TIMESTAMP_TZ'], record.RP_STORY_ID, record.RP_ENTITY_ID, record.entity_name, RIC[record.RP_ENTITY_ID], round(record.event_sentiment_score, 2), (record.relevance/100)]
            q.put(row)

def write_data(q):
    while True:
        row = q.get()
        cursor.execute('''INSERT INTO DB()
                       VALUES (?,?,?,?,?,?,?,?,?,?,?)''', row)
        cnxn.commit()
        q.task_done()

if __name__ == "__main__":
    # creating thread
    q = Queue() # you were missing the ()
    t1 = threading.Thread(target=req_Data, name='Thread1', args=[q])
    t2 = threading.Thread(target=write_data, name='Thread2', args=[q])
    t1.start()
    time.sleep(10)  # give our queue some time to fill
    t2.start()
    q.join()

但是,如果我要使用多线程,我可能希望多个线程加载/卸载数据。由于要对您的脚本进行更深入的了解来加载数据,因此我将展示卸载数据的示例。它看起来像:

import threading
from queue import Queue

def req_Data(q):
    """ function to request data from the API """
    for record in ds.request_realtime():
        if record.RP_ENTITY_ID in RIC.keys():
            row = [record.data['TIMESTAMP_TZ'], record.RP_STORY_ID, record.RP_ENTITY_ID, record.entity_name, RIC[record.RP_ENTITY_ID], round(record.event_sentiment_score, 2), (record.relevance/100)]
            q.put(row)

def write_data(q):
    while True:
        row = q.get()
        cursor.execute('''INSERT INTO DB()
                       VALUES (?,?,?,?,?,?,?,?,?,?,?)''', row)
        cnxn.commit()
        q.task_done()

if __name__ == "__main__":
    # creating thread
    q = Queue() # you were missing the ()
    req_Data(q)
    # q is now full
    workers = 10
    thread_list = []
    for i in range(workers):
        t = threading.Thread(target=write_data, args=[q])
        t.start()
        thread_list.append(t)
    q.join()
    for thread in thread_list:
        thread.join()

这是理想的,因为现在扩展工作线的#只不过是调整线workers = 10线。您可能会使用此脚本运行10,000个线程(尽管可能不应该!您仍然在计算完成后仍在制作线程,这将是浪费CPU时间并减慢程序的速度(

(

希望这会有所帮助!

最新更新