我有这个多线程脚本,它对数据集进行操作。每个线程获取数据集的一块,然后每个线程在数据帧上迭代并调用和api(MS Graph Create(。我所看到的是,我的剧本在几乎完成的时候往往会卡住。我在linux Ubuntu服务器上运行这个。8vCpus。但只有当数据集的总大小为数百万时,才会发生这种情况。(200万条记录大约需要9-10个小时(
我是第一次写剧本(长期(。如果我做事正确的话,我想听听你的意见。
请:
- 我想知道我的代码是否是我的脚本挂起的原因
- 我是否正确地执行了多线程?我是否创建并等待线程正确结束
UPDATE使用下面的答案,线程似乎仍然卡在最后。
import pandas as pd
import sys
import os
import logging
import string
import secrets
import random
##### ----- Logging Setup -------
logging.basicConfig(filename="pylogs.log", format='%(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
# Creating an object
logger = logging.getLogger()
# Setting the threshold of logger to DEBUG
logger.setLevel(logging.ERROR)
#####------ Function Definitions -------
# generates random password
def generateRandomPassword(lengthOfPassword):
# logic for random password gen
# the most important funtion
#
def createAccounts(splitData, threadID):
batchProgress = 0
batch_size = splitData.shape[0]
for row in splitData.itertuples():
try:
headers = {"Content-Type": "application/json", "Authorization":"Bearer "+access_token}
randomLength = [8,9,12,13,16]
passwordLength = random.choice(randomLength)
password = generateRandomPassword(passwordLength) # will be generated randomly - for debugging purpose
batchProgress+=1
post_request_body = {
"accountEnabled": True,
"displayName": row[5],
"givenName": row[3],
"surname": row[4],
"mobilePhone": row[1],
"mail": row[2],
"passwordProfile" : {
"password": password,
"forceChangePasswordNextSignIn": False
},
"state":"",
"identities": [
{
"signInType": "emailAddress",
"issuer": tenantName,
"issuerAssignedId": row[2]
}
]
}
# if phone number exists then only add - since phone number needs to have length between 1 and 64, cannot leave empty
if(len(row[4])):
post_request_body["identities"].append({"signInType": "phoneNumber","issuer": tenantName,"issuerAssignedId": row[1]})
responseFromApi = requests.post(graph_api_create, headers=headers, json=post_request_body)
status = responseFromApi.status_code
if(status == 201): #success
id = responseFromApi.json().get("id")
print(f" {status} | {batchProgress} / {batch_size} | Success {id}")
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}{row[11]}{row[12]}{row[13]}^Success'
elif(status == 429): #throttling issues
print(f" Thread {threadID} | Throttled by server ! Sleeping for 150 seconds")
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Throttled'
time.sleep(150)
elif(status == 401): #token expiry
print(f" Thread {threadID} | Token Expired. Getting it back !")
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Token Expired'
getRefreshToken()
else: #any other error
msg = ""
try:
msg = responseFromApi.json().get("error").get("message")
except Exception as e:
msg = f"Error {e}"
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^{msg}'
print(f" {status} | {batchProgress} / {batch_size} | {msg} {row[2]}")
logger.error(errorDict)
except Exception as e:
# check for refresh token errors
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Exception_{e}'
logger.error(errorDict)
msg = " Error "
print(f" {status} | {batchProgress} / {batch_size} | {msg} {row[2]}")
print(f"Thread {threadID} completed ! {batchProgress} / {batch_size}")
batchProgress = 0
###### ------ Main Script ------
if __name__ == "__main__":
# get file name and appid from command line arguments
storageFileName = sys.argv[1]
appId = sys.argv[2]
# setup credentials
bigFilePath = f"./{storageFileName}"
CreatUserUrl = "https://graph.microsoft.com/v1.0/users"
B2C_Tenant_Name = "tenantName"
tenantName = B2C_Tenant_Name + ".onmicrosoft.com"
applicationID = appId
accessSecret = "" # will be taken from command line in future revisions
token_api_body = {
"grant_type": "client_credentials",
"scope": "https://graph.microsoft.com/.default",
"client_Id" : applicationID,
"client_secret": accessSecret
}
# Get initial access token from MS
print("Connecting to MS Graph API")
token_api = "https://login.microsoftonline.com/"+tenantName+"/oauth2/v2.0/token"
response = {}
try:
responseFromApi = requests.post(token_api, data=token_api_body)
responseJson = responseFromApi.json()
print(f"Token API Success ! Expires in {responseJson.get('expires_in')} seconds")
except Exception as e:
print("ERROR | Token auth failed ")
# if we get the token proceed else abort
if(responseFromApi.status_code == 200):
migrationData = pd.read_csv(bigFilePath)
print(" We got the data from Storage !", migrationData.shape[0])
global access_token
access_token = responseJson.get('access_token')
graph_api_create = "https://graph.microsoft.com/v1.0/users"
dataSetSize = migrationData.shape[0]
partitions = 50 # No of partitions # will be taken from command line in future revisions
size = int(dataSetSize/partitions) # No of rows per file
remainder = dataSetSize%partitions
print(f"Data Set Size : {dataSetSize} | Per file size = {size} | Total Files = {partitions} | Remainder: {remainder} | Start...... n")
##### ------- Dataset partioning.
datasets = []
range_val = partitions + 1 if remainder !=0 else partitions
for partition in range(range_val):
if(partition == partitions):
df = migrationData[size*partition:dataSetSize]
else:
df = migrationData[size*partition:size*(partition+1)]
datasets.append(df)
number_of_threads = len(datasets)
start_time = time.time()
spawned_threads = []
######## ---- Threads are spawned ! here --------
for i in range(number_of_threads): # spawn threads
t = threading.Thread(target=createAccounts, args=(datasets[i], i))
t.start()
spawned_threads.append(t)
number_spawned = len(spawned_threads)
print(f"Started {number_spawned} threads !")
###### - Threads are killed here ! ---------
for thread in spawned_threads: # let the script wait for thread execution
thread.join()
print(f"Done! It took {time.time() - start_time}s to execute") # time check
#### ------ Retry Mechanism -----
print("RETRYING....... !")
os.system(f'python3 retry.py pylogs.log {appId}')
else:
print(f"Token Missing ! API response {responseJson}")```
为了简单起见,这里对代码进行了重构,以使用标准库multiprocessing.ThreadPool
。
当然,我不可能测试它,因为我没有你的数据,但基本的想法应该可行。我删除了日志记录并重试,因为我真的不明白你为什么需要它(但可以随意添加回来(;如果问题似乎是暂时性的,这将尝试重试每一行。
import random
import sys
import time
from multiprocessing.pool import ThreadPool
import pandas as pd
import requests
sess = requests.Session()
# globals filled in by `main`
tenantName = None
access_token = None
def submit_user_create(row):
headers = {"Content-Type": "application/json", "Authorization": "Bearer " + access_token}
randomLength = [8, 9, 12, 13, 16]
passwordLength = random.choice(randomLength)
password = generateRandomPassword(passwordLength) # will be generated randomly - for debugging purpose
post_request_body = {
"accountEnabled": True,
"displayName": row[5],
"givenName": row[3],
"surname": row[4],
"mobilePhone": row[1],
"mail": row[2],
"passwordProfile": {"password": password, "forceChangePasswordNextSignIn": False},
"state": "",
"identities": [{"signInType": "emailAddress", "issuer": tenantName, "issuerAssignedId": row[2]}],
}
# if phone number exists then only add - since phone number needs to have length between 1 and 64, cannot leave empty
if len(row[4]):
post_request_body["identities"].append({"signInType": "phoneNumber", "issuer": tenantName, "issuerAssignedId": row[1]})
return sess.post("https://graph.microsoft.com/v1.0/users", headers=headers, json=post_request_body)
def get_access_token(tenantName, applicationID, accessSecret):
token_api_body = {
"grant_type": "client_credentials",
"scope": "https://graph.microsoft.com/.default",
"client_Id": applicationID,
"client_secret": accessSecret,
}
token_api = f"https://login.microsoftonline.com/{tenantName}/oauth2/v2.0/token"
resp = sess.post(token_api, data=token_api_body)
if resp.status_code != 200:
raise RuntimeError(f"Token Missing ! API response {resp.content}")
json = resp.json()
print(f"Token API Success ! Expires in {json.get('expires_in')} seconds")
return json["access_token"]
def process_row(row):
while True:
response = submit_user_create(row)
status = response.status_code
if status == 201: # success
id = response.json().get("id")
print(f"Success {id}")
return True
if status == 429: # throttling issues
print(f"Throttled by server ! Sleeping for 150 seconds")
time.sleep(150)
continue
if status == 401: # token expiry?
print(f"Token Expired. Getting it back !")
getRefreshToken() # TODO
continue
try:
msg = response.json().get("error").get("message")
except Exception as e:
msg = f"Error {e}"
print(f" {status} | {msg} {row[2]}")
return False
def main():
global tenantName, access_token
# get file name and appid from command line arguments
bigFilePath = sys.argv[1]
appId = sys.argv[2]
# setup credentials
B2C_Tenant_Name = "tenantName"
tenantName = f"{B2C_Tenant_Name}.onmicrosoft.com"
accessSecret = "" # will be taken from command line in future revisions
access_token = get_access_token(tenantName, appId, accessSecret)
migrationData = pd.read_csv(bigFilePath)
start_time = time.time()
with ThreadPool(10) as pool:
for i, result in enumerate(pool.imap_unordered(process_row, migrationData.itertuples()), 1):
progress = i / len(migrationData) * 100
print(f"{i} / {len(migrationData)} | {progress:.2f}% | {time.time() - start_time:.2f} seconds")
print(f"Done! It took {time.time() - start_time}s to execute")
if __name__ == "__main__":
main()
不公平使用MS Graph
由于服务器可能会进行节流,因此MS Graph资源的使用在线程之间可能不公平。我在资源匮乏的意义上使用公平。
elif(status == 429): #throttling issues
print(f" Thread {threadID} | Throttled by server ! Sleeping for 150 seconds")
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Throttled'
time.sleep(150)
一个线程进行一百万次调用可能会得到不成比例的429
响应,每个响应会受到150秒的惩罚。这种睡眠并不能阻止其他线程进行调用并实现前进。
这将导致一个线程远远落后于其他线程,并呈现出被卡住的外观。