Python运行多处理的ffmpeg



我正在尝试在并行任务中使用multiprocessing运行ffmpeg命令。我要并行化的pythonffmpeg调用如下:

def load_audio(args, kwargs):
url = args
start = kwargs["start"]
end = kwargs["end"]
sr = kwargs["sr"]
n_channels = kwargs["n_channels"]
mono = kwargs["mono"]
cmd = ["ffmpeg",  "-i", url, "-acodec", "pcm_s16le", "-ac", str(n_channels), "-ar", str(sr), "-ss", _to_ffmpeg_time(start), "-t", _to_ffmpeg_time(end - start), "-sn", "-vn", "-y", "-f", "wav", "pipe:1"]
process = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=10 ** 8
)
buffer = process.stdout
waveform = np.frombuffer(buffer=process.stdout, dtype=np.uint16, offset=8 * 44)
waveform = waveform.astype(dtype)
return waveform

然后我通过偏移60 seconds:读取音频

_THREAD_POOL = BoundedThreadPoolExecutor(max_workers=NTHREADS)
tasks = []
cur_start = start
for i in range(NTHREADS):
msg = {'start':cur_start,
'end':min(cur_start+60,end)}
t = execute_callback(load_audio, 
'https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_5MG.mp3', 
msg)
cur_start+=60
tasks.append(t)

其中execute_callback将线程提交到池:

def execute_callback(fn, args, kwargs):
try:
futures_thread = _THREAD_POOL.submit(fn, args, kwargs)
return futures_thread
except Exception as e:
return None

我最终检索结果,并连接到numpy数组(将转到soundfile进行读取)

futures_results = get_results_as_completed(tasks, return_when=ALL_COMPLETED)
waveform = []
for i,r in enumerate(futures_results):
if not i:
waveform = r
print(type(r))
else:
waveform = np.append(waveform,r)

其中get_results_as_completed

def get_results_as_completed(futures, return_when=ALL_COMPLETED):
finished = as_completed(futures)
for f in finished:
try:
yield f.result()
except Exception as e:
pass

我在这里和这里使用有界池执行器类。我使用as_completed来检索处于已完成状态的期货,这导致输出不保留在输入顺序中,而是";"完成";顺序,这会导致音频输出错误。我的问题是

  • ffmpeg期货实际上是并行执行的吗?在我的测试下载整个音频像:

    args = 'https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_5MG.mp3'
    kwargs = {'start':start, 
    'end':end}
    waveform = load_audio(args,kwargs)
    
  • 是否可以在不使用信号量的情况下保留结果的输入顺序,而只使用multiprocessing函数(map可能是?)。如果是,如何?

我想,如果您使用BoundedThreadPoolExecutor,那么从技术上讲,您是多线程的,并且每个线程都在运行一个进程(稍后将对此进行详细介绍)。

无论如何,您的函数execute_callback(其名称让我有点困惑)实际上向池提交了一个任务,并返回了一个Future实例,然后将其附加到列表tasks中。然后将tasks传递给get_results_as_completed,这将按完成顺序生成任务的返回值。但这不是你想要的。

因此,首先回答您的第二个问题:如果您不希望结果按完成顺序排列,请不要使用函数as_completed。改为呼叫:

def get_results(futures):
for f in futures:
try:
yield f.result()
except Exception as e:
pass

为了回答您的第一个问题:对于CPU密集型Python代码,多线程通常不是您想要的,因为存在全局解释器锁的争用。但是,由于每个线程都在启动一个进程并等待进程结束,我认为您应该实现并行化,我认为没有理由因为我之前的回答而不维护输入顺序。

现在我的问题和评论:

  1. 为什么BoundedThreadPoolExecutor而不只是使用标准的ThreadPoolExecutor
  2. 在函数定义中,您有以argskwargs为自变量的自变量签名,例如load_audio,它将更";正常的";将这些参数指定为*args**kwargs,然后调用此函数:load_audio('https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_5MG.mp3', start=start, end=end)

相关内容

  • 没有找到相关文章

最新更新