由于某种原因,在脚本执行(macOS(开始时,我在多处理池中有8个进程派生并正在工作,但在启动后的几分钟内,只剩下1个进程在工作。
我有这个代码(它比那个大得多,但它会解释图片(:
def GetStatesDataset(dataset):
df_states = pd.read_csv(dataset)
return df_states
def UploadDataFrameToBigQuery(table_name, prefix_name, project_id, if_exists, df):
table_name = table_name + prefix_name
pd.DataFrame.to_gbq(df,
table_name,
project_id=project_id,
if_exists=if_exists)
def InitGetDataFromGCP(data, prefix):
client = storage.Client()
files = []
blobs = client.list_blobs(data, prefix=prefix)
for blob in blobs:
files.append(f'{data}/{blob.name}')
return files
def GetDataFromGCP(file):
fs = gcsfs.GCSFileSystem() # GCP's Google Cloud Storage (GCS) File System (FS)
with fs.open(file, 'r') as f:
# Reading json into Pandas DataFrame
gcs_data = [json.loads(line) for line in f]
data = [gcs_data] if isinstance(gcs_data, dict) else gcs_data
df = pd.DataFrame(data)
df = pd.merge_asof(df,
df_states,
left_on="start_time",
right_on="state_reached_at",
by="car_id",
direction="backward")
UploadDataFrameToBigQuery(table_name, prefix_name, project_id, if_exists, df)
logging.info(str(multiprocessing.current_process()) + 'Finished: execution time: ' + str(exec_time))
#######################
df_states = GetStatesDataset('gs://link-to-my.csv')
dataset_name = 'one'
prefix_name = 'two'
# config for uploading data to BigQuery
table_name = 'one-two.'
project_id = 'one-two-three'
if_exists = 'append'
def main():
files = InitGetDataFromGCP(dataset_name, prefix_name)
with multiprocessing.Pool(processes=8) as pool:
pool.map(GetDataFromGCP, files)
if __name__ == '__main__':
main()
由于我记录了所有的事情,我可以在开始时看到所有的过程(一切都很好(:
2020-08-29 15:55:13,957 <SpawnProcess name='SpawnPoolWorker-8' parent=1420 started daemon>Finished: execution time: 22.53874
2020-08-29 15:55:15,947 <SpawnProcess name='SpawnPoolWorker-7' parent=1420 started daemon>Finished: execution time: 23.259828000000002
2020-08-29 15:55:17,219 <SpawnProcess name='SpawnPoolWorker-3' parent=1420 started daemon>Finished: execution time: 8.758934000000004
2020-08-29 15:55:19,094 <SpawnProcess name='SpawnPoolWorker-6' parent=1420 started daemon>Finished: execution time: 7.409976
2020-08-29 15:55:21,755 <SpawnProcess name='SpawnPoolWorker-6' parent=1420 started daemon>Finished: execution time: 0.25443099999999674
但过了一段时间,我得到了这个:
2020-08-29 16:24:28,494 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 10.398635000000013
2020-08-29 16:24:36,077 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 4.782628999999929
2020-08-29 16:24:40,220 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 1.1638890000000401
2020-08-29 16:24:44,032 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 1.519871999999964
2020-08-29 16:24:50,449 <SpawnProcess name='SpawnPoolWorker-1' parent=1420 started daemon>Finished: execution time: 3.1979730000000473
我还可以通过查看我的CPU活动来确认只有一个进程在工作。生成了8个Python进程,但只有1个接近100%。我是多处理的新手,也许我不知道自己在做什么,但我希望所有8名工人都能执行任务,直到我的";文件";终止
这是显而易见的。我只需要指定块大小。由于我有将近17000个文件需要处理,一次处理一个文件,chunksize=1似乎很有魅力:
with multiprocessing.Pool(processes=8) as pool:
result = pool.map(GetDataFromGCP, files, chunksize=1)