我正在尝试在并行任务中使用multiprocessing
运行ffmpeg
命令。我要并行化的pythonffmpeg
调用如下:
def load_audio(args, kwargs):
url = args
start = kwargs["start"]
end = kwargs["end"]
sr = kwargs["sr"]
n_channels = kwargs["n_channels"]
mono = kwargs["mono"]
cmd = ["ffmpeg", "-i", url, "-acodec", "pcm_s16le", "-ac", str(n_channels), "-ar", str(sr), "-ss", _to_ffmpeg_time(start), "-t", _to_ffmpeg_time(end - start), "-sn", "-vn", "-y", "-f", "wav", "pipe:1"]
process = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=10 ** 8
)
buffer = process.stdout
waveform = np.frombuffer(buffer=process.stdout, dtype=np.uint16, offset=8 * 44)
waveform = waveform.astype(dtype)
return waveform
然后我通过偏移60 seconds
:读取音频
_THREAD_POOL = BoundedThreadPoolExecutor(max_workers=NTHREADS)
tasks = []
cur_start = start
for i in range(NTHREADS):
msg = {'start':cur_start,
'end':min(cur_start+60,end)}
t = execute_callback(load_audio,
'https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_5MG.mp3',
msg)
cur_start+=60
tasks.append(t)
其中execute_callback
将线程提交到池:
def execute_callback(fn, args, kwargs):
try:
futures_thread = _THREAD_POOL.submit(fn, args, kwargs)
return futures_thread
except Exception as e:
return None
我最终检索结果,并连接到numpy
数组(将转到soundfile
进行读取)
futures_results = get_results_as_completed(tasks, return_when=ALL_COMPLETED)
waveform = []
for i,r in enumerate(futures_results):
if not i:
waveform = r
print(type(r))
else:
waveform = np.append(waveform,r)
其中get_results_as_completed
是
def get_results_as_completed(futures, return_when=ALL_COMPLETED):
finished = as_completed(futures)
for f in finished:
try:
yield f.result()
except Exception as e:
pass
我在这里和这里使用有界池执行器类。我使用as_completed
来检索处于已完成状态的期货,这导致输出不保留在输入顺序中,而是";"完成";顺序,这会导致音频输出错误。我的问题是
ffmpeg
期货实际上是并行执行的吗?在我的测试下载整个音频像:args = 'https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_5MG.mp3' kwargs = {'start':start, 'end':end} waveform = load_audio(args,kwargs)
是否可以在不使用信号量的情况下保留结果的输入顺序,而只使用
multiprocessing
函数(map
可能是?)。如果是,如何?
我想,如果您使用BoundedThreadPoolExecutor
,那么从技术上讲,您是多线程的,并且每个线程都在运行一个进程(稍后将对此进行详细介绍)。
无论如何,您的函数execute_callback
(其名称让我有点困惑)实际上向池提交了一个任务,并返回了一个Future
实例,然后将其附加到列表tasks
中。然后将tasks
传递给get_results_as_completed
,这将按完成顺序生成任务的返回值。但这不是你想要的。
因此,首先回答您的第二个问题:如果您不希望结果按完成顺序排列,请不要使用函数as_completed
。改为呼叫:
def get_results(futures):
for f in futures:
try:
yield f.result()
except Exception as e:
pass
为了回答您的第一个问题:对于CPU密集型Python代码,多线程通常不是您想要的,因为存在全局解释器锁的争用。但是,由于每个线程都在启动一个进程并等待进程结束,我认为您应该实现并行化,我认为没有理由因为我之前的回答而不维护输入顺序。
现在我的问题和评论:
- 为什么
BoundedThreadPoolExecutor
而不只是使用标准的ThreadPoolExecutor
- 在函数定义中,您有以
args
和kwargs
为自变量的自变量签名,例如load_audio
,它将更";正常的";将这些参数指定为*args
和**kwargs
,然后调用此函数:load_audio('https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_5MG.mp3', start=start, end=end)