我试图写一个python脚本,它使用子进程模块将文件从一个s3桶复制到另一个。然而,为了提高性能,我正在尝试并行运行不同前缀的单独同步命令。
到目前为止,我所尝试的脚本没有终止,我不确定子进程是否并发运行。
import subprocess
prefix = ['prefix1','prefix2','prefix3']
source_bucket = 's3://source'
dest_bucket = 's3://dest'
commands = []
for p in prefix:
command = 'aws s3 sync source_bucket' + p + ' dest_bucket'
commands.append(command)
procs = [subprocess.Popen(i, shell=True, stdout=subprocess.PIPE) for i in commands]
for p in procs:
p.wait()
有更好的方法吗?如有任何帮助,不胜感激。
因为您传入的是subprocess.PIPE
,所以不同的进程在等待输出时会阻塞。您需要运行一个单独的进程来与每个aws实例通信。一种可能是使用Python的multiprocessing:
import subprocess
import multiprocessing
def worker(command, queue):
# Don't use shell here, we can avoid the overhead
proc = subprocess.Popen(command, stdout=subprocess.PIPE, bufsize=1, universal_newlines=True)
# Read from the aws command output and send it off the queue as soon
# as it's available
for line in proc.stdout:
queue.put(line.rstrip("rn"))
# Notify the listener that we're done
queue.put(None)
def main():
# Move all work to a function so it's multiprocessing safe, see
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
# Note: Adding trailing slash so "source_bucket" + "prefix" is a valid S3 URI
prefixes = ['prefix1/', 'prefix2/', 'prefix3/']
source_bucket = 's3://source/'
dest_bucket = 's3://dest/'
# Create a queue to gather final messages from each worker
queue = multiprocessing.Queue()
procs = []
for p in prefixes:
# Pass in --no-progress so the progress messages aren't shown
# displaying those messages is complicated, and requires quite a bit
# of work to make sure they don't interfer with each other
# Correct the command syntax here to use all the variables
# Need to pass in the prefix to the dest URI as well so the same structure is
# maintained
# Use a argv style call here so we can avoid bringing the shell into this
command = ['aws', 's3', 'sync', '--no-progress', source_bucket + p, dest_bucket + p]
# Hand off the work to a worker to read from the pipe to prevent each
# spawned aws instance from blocking
proc = multiprocessing.Process(target=worker, args=(command, queue))
proc.start()
procs.append(proc)
# Read from the Queue to show the output
left = len(procs)
while left > 0:
msg = queue.get()
if msg is None:
# This means a worker is done
left -= 1
else:
# Just print out the output, doing it in one process to prevent any
# collision possibilities
print(msg)
# Clean up
for proc in procs:
proc.join()
if __name__ == "__main__":
main()