多处理。Pool().apply_async() 似乎没有运行我的函数



我正试图在多处理的帮助下,在两个不同的进程上运行两个函数。Pool((.apply_async((,但它似乎没有运行我的函数。没有错误消息。但是,当我在没有多处理的情况下尝试我的函数时,它会完美地工作。以下是我的代码(短版本(:

import multiprocessing, twitch_integration
p = multiprocessing.Pool()
p.apply_async(twitch_integration.get_user_followers, args=(userid1, "", conn,))
p.apply_async(twitch_integration.get_user_followers, args=(userid2, "", conn,))
p.close()
p.join()

我不知道是否需要注意,但get_user_followers函数是递归的,请求模块。我在get_user_followers的开头放了一个print,但它什么都不打印。

我已经搜索了4个小时了,我一点也不夸张。如果有人能帮助我,我将非常感激。非常感谢。

在询问@Steve提到的结果后,它引发了一个新的错误,下面是回溯:

Traceback (most recent call last):
File "main.py", line 89, in <module>
main(conn, cursor);
File "main.py", line 68, in main
r = result.get(timeout=1)
File "C:Python38libmultiprocessingpool.py", line 771, in get
raise self._value
File "C:Python38libmultiprocessingpool.py", line 537, in _handle_tasks
put(task)
File "C:Python38libmultiprocessingconnection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:Python38libmultiprocessingreduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "C:Python38libsocket.py", line 272, in __getstate__
raise TypeError(f"cannot pickle {self.__class__.__name__!r} object")
TypeError: cannot pickle 'SSLSocket' object

以下是get_user_followers函数以获取更多详细信息(我知道它很难看,但我刚开始使用Python和编程(:

import requests, json, sys, time
from mysql.connector import Error
def get_user_followers(user_id, pagination, conn):
global FOLLOWER_COUNT
global ERROR_COUNT
global OAUTH_TOKEN
global TOTAL_FOLLOWER
global ERROR_IN_A_ROW
print("YES")
after = "&after={0}".format(pagination)
first = "&first=100"
query = 'users/follows?to_id={0}{1}{2}'.format(user_id, first, pagination if pagination == "" else after)
try:
response = get_response(query)
except requests.exceptions.RequestException as e:
print("pause 1 seconde. . . Retrying same request")
time.sleep(1)
print("Request error: ", e)
get_user_followers(user_id, pagination, conn)
return
finally:
pass
if response.status_code == 200:
ERROR_IN_A_ROW = 0
elif response.status_code == 401:
print(response.json())
print("HTTP Error 401")
ERROR_IN_A_ROW += 1
if ERROR_IN_A_ROW == 3:
print("AFTER 3 HTTP ERROR IN A ROW - EXITING PROGRAM")
print("pagination: ", pagination, " user_id", user_id)
return
OAUTH_TOKEN = requests.post(POST_URL, data=POST_PARAMS).json()['access_token']
get_user_followers(user_id, pagination, conn)
return
elif response.status_code == 429:
print(response.json())
print("HTTP Error 429")
time.sleep(int(response.json()['Ratelimti-Reset']))
get_user_followers(user_id, pagination, conn)
return
else:
print(response.json())
print("HTTP Error {0}".format(response.status_code))
ERROR_IN_A_ROW += 1
if ERROR_IN_A_ROW == 3:
print("AFTER 3 HTTP ERROR IN A ROW - EXITING PROGRAM")
print("pagination: ", pagination, " user_id", user_id)
return
get_user_followers(user_id, pagination, conn)
return
response = response.json()
TOTAL_FOLLOWER = response['total']
length = len(response['data'])
if length == 0:
try:
print("{0}/{1}".format(FOLLOWER_COUNT, TOTAL_FOLLOWER) + " ({0}%)".format(format((FOLLOWER_COUNT * 100 / TOTAL_FOLLOWER), ".2f")) + "r", end="")
except:
print("{0}/{1}".format(FOLLOWER_COUNT, TOTAL_FOLLOWER))
FOLLOWER_COUNT = 0
ERROR_COUNT = 0
TOTAL_FOLLOWER = 0
return 1
pagination = response['pagination']['cursor']
i = 0
while i < length:
try:
conn.cursor().execute("""INSERT INTO user (id, username) VALUES ({0}, '{1}')""".format(response['data'][i]['from_id'], str(response['data'][i]['from_name'])))
except Error as e:
print("An error has occured : ", e)
ERROR_COUNT += 1
finally:
pass
i += 1
FOLLOWER_COUNT += 1
try:
print("{0}/{1}".format(FOLLOWER_COUNT, TOTAL_FOLLOWER) + " ({0}%)".format(format((FOLLOWER_COUNT * 100 / TOTAL_FOLLOWER), ".2f")) + " {0}".format(user_id))
except:
print("{0}/{1}".format(FOLLOWER_COUNT, TOTAL_FOLLOWER))
get_user_followers(user_id, pagination, conn)

要运行作业/函数,您必须询问它们的结果。这应该运行你的工作:

import multiprocessing, twitch_integration
p = multiprocessing.Pool()
result = p.apply_async(twitch_integration.get_user_followers, args=(userid1, "", conn,))
r = result.get(timeout=1)
result = p.apply_async(twitch_integration.get_user_followers, args=(userid2, "", conn,))
r = result.get(timeout=1)
p.close()
p.join()

相关内容

  • 没有找到相关文章

最新更新