如何在不复制选择查询结果的情况下将 MYSQL 选择查询与线程一起使用?



简短的上下文:我正在使用mysql表来选择一个值,通过使用API+值,我获取了一个结果,结果被保存到同一个表中。

问题:如何同时处理多行? 每当我使用线程启动函数时,它为每个线程选择相同的值(即光标为每个线程返回相同的值(。 我需要为每个线程处理不同的值。 这样我就会减少一些时间。

我的程序是

import requests
import os
import json
import pymysql
import threading
conn = pymysql.connect(host='localhost', user=USER, passwd=PASSWORD, db='sampledb',charset='utf8mb4',autocommit=True)
url = "http://www.someapi.com/somelink/"
cur = conn.cursor()
def main():
cur.execute("select asset_id from getprocessid where status =%s LIMIT 1",("uploaded",))
idofassets = cur.fetchone()[0]
req = requests.Session()
resp = req.get(url+str(idofassets))
resp_json = json.loads(resp.text)
actual = resp_json['getResponse']
cur.execute("update getprocessid set class = %s ,status =%s where asset_id = %s",(str(actual),"completed",str(idofasset),))
while True:
# For threading purpose i added
thread1 = threading.Thread(target=main)
thread2 = threading.Thread(target=main)
thread3 = threading.Thread(target=main)
thread1.start()
thread2.start()
thread3.start()
thread1.join()
thread2.join()
thread3.join()

您的问题似乎分为两个主要的不同任务:

1 - 从getprocessidMySQL 表中获取结果

2 - 处理结果并更新相同的表(但不同的字段(

因此,优化代码的一种方法是让一个线程(可能是主线程(执行步骤 1,然后将步骤 2 中的问题分配给 3 个线程:

import requests
import os
import json
import pymysql
import threading
#you can create these dynamically if you 
#want more (or less) threads
batches = [[], [], []]
conn = pymysql.connect(host='localhost', user=USER, 
passwd=PASSWORD, 
db='sampledb',charset='utf8mb4',autocommit=True)
url = "http://www.someapi.com/somelink/"
cur = conn.cursor()
def fetch_and_split():
cur.execute("select asset_id from getprocessid 
where status =%s LIMIT 1",("uploaded",))
results = cur.fetchall()
count = 0
#this populates the lists to be processed with the ids
while count < size(results):
cur_batch = batches[size(batches) % count ]
cur_batch.append(results[count][0])
count++
def process_and_update(batch):
#each thread receives its own list
for idofassets in batch:
req = requests.Session()
resp = req.get(url+str(idofassets))
resp_json = json.loads(resp.text)
actual = resp_json['getResponse']
cur.execute("update getprocessid set class = %s 
,status =%s where asset_id = %s", 
(str(actual),"completed",str(idofasset),))

while True:
# For threading purpose i added
# The main thread splits the results
fetch_and_split()    
# The other threads process the 
# results and update the values
thread1 = threading.Thread(target=process_and_update, args=(batches[0],))
thread2 = threading.Thread(target=process_and_update, args=(batches[1],))
thread3 = threading.Thread(target=process_and_update, args=(batches[2],))
thread1.start()
thread2.start()
thread3.start()
thread1.join()
thread2.join()
thread3.join()

最简单的方法之一(语法是近似的(。

每个线程必须在变量中有自己的编号my_number该变量在所有线程中都是唯一的。

thread INT DEFAULT NULL字段添加到结构中。

线程尝试通过以下方式保留一条非保留记录

cur.execute("UPDATE getprocessid SET thread = %s WHERE thread IS NULL AND status=%s LIMIT 1",(my_number,"uploaded",))

然后线程处理此保留记录:

cur.execute("select asset_id from getprocessid where thread=%s",(my_number,))
row = cur.fetchone()
if row is not None:
process the record

如果预留成功,则处理预留记录。如果另一个线程覆盖了保留值,则不返回任何记录,并且IF将检测到它 - 跳过处理代码,并且线程尝试保留另一条记录。

相关内容

  • 没有找到相关文章

最新更新