从Python中并行运行的函数创建进程



我有一个函数执行SELECTSQL查询(使用postgresql)。现在,我想在这个查询的执行时间插入到我的DB中的某个表,但是,我想并行地执行它,这样即使我的INSERT查询仍在运行,我也可以继续我的程序并调用其他函数。

I尝试使用多进程。流程然而,我的函数正在等待进程结束,我实际上失去了我想要的并行性的效果。

我的代码:

def select_func():
with connection.cursor() as cursor:
query = "SELECT * FROM myTable WHERE "UserName" = 'Alice'"
start = time.time()
cursor.execute(query)
end = time.time()
process = Process(target = insert_func, args = (query, (end-start)))
process.start()
process.join()
return cursor.fetchall()

def insert_func(query, time):
with connection.cursor() as cursor:
query = "INSERT INTO infoTable ("query", "exec_time")
VALUES ("" + query  + "", "" + time + "")"
cursor.execute(query)
connection.commit()

现在的问题是这个操作不是真正的异步,因为select_func正在等待insert_function完成。我希望这些函数的执行不会被依赖,即使insert_function仍在运行,select函数也可以结束,以便我能够继续在脚本中调用其他函数。

谢谢!

你的代码片段有很多问题,但让我们至少试着给出一个结构来实现。

def select_func():
with connection.cursor() as cursor: #I dont think the same global variable connectino should be used for read/write simultaneously
query = "SELECT * FROM myTable WHERE "UserName" = 'Alice'" #quotation issues
start = time.time()
cursor.execute(query)
end = time.time()
process = Process(target = insert_func, args = (query, (end-start)))
process.start() #you start the process here BUT
process.join() #you force python to wait for it here....
return cursor.fetchall()

def insert_func(query, time):
with connection.cursor() as cursor:
query = "INSERT INTO infoTable ("query", "exec_time")
VALUES ("" + query  + "", "" + time + "")"
cursor.execute(query)
connection.commit()

考虑替代方案:

def select_func():
read_con = sql.connect() #sqlite syntax but use your connection
with read_con.cursor() as cursor:
query = "SELECT * FROM myTable WHERE "UserName" = 'Alice'" #where does Alice come from? 
start = time.time()
cursor.execute(query)
end = time.time()
return cursor.fetchall(),(query,(end-start)) #Our tuple has query at position 0 and time at position 1

def insert_function(insert_queue): #The insert you want to parallleize

connection = sql.connect("db") #initialize your 'writer'. Note: May be good to initialize the connection on each insert. Not sure if optimal. 
while True: #We keep pulling from the pipe
data = insert_queue.get() # we pull from our pipe
if data == 'STOP': #Example of a kill instruction to stop our process
break #breaks the while loop and the function can 'exit'

with connection.cursor() as cursor:
query_data = data #I assume you would want to pass your query through the pipe
query= query_data[0] #see how we stored the tuple
time = query_data[1] #as above
insert_query = "INSERT INTO infoTable ("query", "exec_time")
VALUES ("" + query  + "", "" + time + "")" #Somehow query and time goes into the insert_query
cursor.execute(insert_query)
connection.commit()


if __name__ == '__main__': #Typical python main thread
query_pipe = Queue() #we initialize a Queue here to feed into your inserting function
process = Process(target = insert_func,args = (query_pipe,)
process.start() 
stuff = []
for i in range(5):
data,insert_query = select_function() #select function lets say it gets the data you want to insert. 
stuff.append(data)
query_pipe.put(insert_query)
#
#Do other stuff and even put more stuff into the pipe.
#
query_pipe.put('STOP') #we wanna kill our process so we send the stop command
process.join()

最新更新