Python缓冲IO结束了具有多个管道的早期流



我正在尝试通过yt-dlp下载视频的连续直播。我需要将这个(正在工作的(bash命令移植到Python中。

(
youtube-dl -v --buffer-size 16k https://youtube.com/watch?v=QiInzFHIDp4 -o - | ffmpeg -i - -f mpegts -c copy - ;
youtube-dl -v --buffer-size 16k https://youtube.com/watch?v=QiInzFHIDp4 -o - | ffmpeg -i - -f mpegts -c copy - ;
) | ffmpeg -re -i - -c:v libx264 -f flv rtmp://127.0.0.1/live/H1P_x5WPF

我的Python尝试是剪掉每个视频的最后2秒。我的怀疑是,尽管第一个管道yt-dlp有一个空的stdout,但仍有数据在第二个和第三个管道之间传输。我一直想不出一种方法来正确处理视频结尾那两个管道之间的数据。

from subprocess import Popen, PIPE, DEVNULL
COPY_BUFSIZE = 65424
playlist = [
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
]
if __name__ == "__main__":
stream_cmd = [
"ffmpeg", "-loglevel", "error",
"-hide_banner", "-re", "-i", "-",
"-c:v", "libx264",
"-f", "flv",
"-b:v", "3000k", "-minrate", "3000k",
"-maxrate", "3000k", "-bufsize", "3000k",
"-r", "25", "-pix_fmt", "yuv420p",
"rtmp://127.0.0.1/live/H1P_x5WPF"
]
print(f'Stream command:n"{" ".join(stream_cmd)}"')
encoder_cmd = [
"ffmpeg", "-re", "-i", "-", "-f", "mpegts",
"-c", "copy", "-"
]
print(f'Encoder command:n"{" ".join(encoder_cmd)}"')
stream_p = Popen(stream_cmd, stdin=PIPE, stderr=DEVNULL)
for video in playlist:
yt_dlp_cmd = [
"yt-dlp", "-q",
video["url"],
"-o", "-"
]
print("Now playing: " + video["url"])
with Popen(yt_dlp_cmd, stdout=PIPE) as yt_dlp_p:
with Popen(encoder_cmd, stdin=PIPE, stdout=PIPE, stderr=DEVNULL) as encoder_p:
while True:
yt_dlp_buf = yt_dlp_p.stdout.read(COPY_BUFSIZE)
print("READ: yt_dlp")
if not yt_dlp_buf:
print("yt-dlp buffer empty")
# Handle any data in 2nd/3rd pipes before breaking?
break
written = encoder_p.stdin.write(yt_dlp_buf)
print("WRITE: encoder. Bytes: " + str(written))
encoder_buf = encoder_p.stdout.read(COPY_BUFSIZE)
# if not encoder_buf:
#     print("encoder_buf empty")
#     break
print("READ: encoder")
stream_bytes_written = stream_p.stdin.write(encoder_buf)
print("WRITE: stream, Bytes: " + str(stream_bytes_written))

在MacOS上运行Python 3.6.9。

需要关闭stdin管道;推动";子进程将剩余(缓冲的(数据发送到CCD_ 2管道。

例如,在将所有数据写入encoder_p.stdin之后添加encoder_p.stdin.close()


我不明白你的代码是如何工作的
在我的机器中,它在encoder_buf = encoder_p.stdout.read(COPY_BUFSIZE)处堆叠。

我用";作家线程">
;写入线程";从CCD_ 6读取数据并将其写入CCD_。

注意:在您的特定情况下,它可以在没有线程的情况下工作(因为数据只是通过FFmpeg传递的,而不是经过编码的(,但通常,在将输入写入FFmpeg之后,编码的数据还没有准备好。


我的代码示例使用FFplay子进程来播放视频(我们需要视频播放器,因为RTMP流需要一个"监听器"来保持流(。


这是一个完整的代码示例:

from subprocess import Popen, PIPE, DEVNULL
import threading
import time
COPY_BUFSIZE = 65424
playlist = [
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
]

# Writer thread (read from yt-dlp and write to FFmpeg in chunks of COPY_BUFSIZE bytes).
def writer(yt_dlp_proc, encoder_proc):
while True:
yt_dlp_buf = yt_dlp_proc.stdout.read(COPY_BUFSIZE)
print("READ: yt_dlp")
if not yt_dlp_buf:
print("yt-dlp buffer empty")
break
written = encoder_proc.stdin.write(yt_dlp_buf)
print("WRITE: encoder. Bytes: " + str(written))

encoder_proc.stdin.close()  # Close stdin pipe (closing stdin "pushes" the remaining data to stdout).
encoder_proc.wait()  # Wait for sub-process finish execution.

if __name__ == "__main__":
rtmp_url = "rtmp://127.0.0.1/live/H1P_x5WPF"
ffplay_cmd = ['ffplay', '-listen', '1', '-i', rtmp_url] # Start the TCP server first, before the sending client.    
ffplay_process = Popen(ffplay_cmd, stderr=DEVNULL)  # Use FFplay sub-process for receiving the RTMP video.
stream_cmd = [
"ffmpeg", "-loglevel", "error",
"-hide_banner", "-re", "-i", "-",
"-c:v", "libx264",
"-f", "flv",
"-b:v", "3000k", "-minrate", "3000k",
"-maxrate", "3000k", "-bufsize", "3000k",
"-r", "25", "-pix_fmt", "yuv420p",
rtmp_url #"rtmp://127.0.0.1/live/H1P_x5WPF"
]
print(f'Stream command:n"{" ".join(stream_cmd)}"')
encoder_cmd = [
"ffmpeg", "-re", "-i", "-", "-f", "mpegts",
"-c", "copy", "-"
]
print(f'Encoder command:n"{" ".join(encoder_cmd)}"')
stream_p = Popen(stream_cmd, stdin=PIPE, stderr=DEVNULL)
for video in playlist:
yt_dlp_cmd = [
"yt-dlp", "-q",
video["url"],
"-o", "-"
]
print("Now playing: " + video["url"])
with Popen(yt_dlp_cmd, stdout=PIPE) as yt_dlp_p:
with Popen(encoder_cmd, stdin=PIPE, stdout=PIPE, stderr=DEVNULL) as encoder_p:
thread = threading.Thread(target=writer, args=(yt_dlp_p, encoder_p))
thread.start()  # Start writer thread.
while True:
encoder_buf = encoder_p.stdout.read(COPY_BUFSIZE)
if not encoder_buf:
print("encoder_buf empty")
break
print("READ: encoder")
stream_bytes_written = stream_p.stdin.write(encoder_buf)
print("WRITE: stream, Bytes: " + str(stream_bytes_written))
thread.join()  # Wait for writer thread to end.
yt_dlp_p.wait()
stream_p.stdin.close()  # Close stdin pipe (closing stdin "pushes" the remaining data to stdout).
stream_p.wait()  # Wait for sub-process finish execution.

time.sleep(3)  # Wait 3 seconds before closing FFplay
ffplay_process.kill()  # Forcefully close FFplay sub-process

更新:

我发现了一个更简单的解决方案,使用pytube和concat过滤器(没有管道(。

我不知道这个解决方案是否与你有关。。。

代码示例:

from pytube import YouTube
from subprocess import Popen, run, PIPE, DEVNULL
import time
playlist = [
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
{
# 15 second video
"url": "https://youtube.com/watch?v=QiInzFHIDp4"
},
]
n = len(playlist)
# Build string for concat demuxer https://video.stackexchange.com/a/18256/18277
filter_complex_str = ''
for i in range(n):
filter_complex_str += f'[{i}:v]setpts=PTS-STARTPTS[v{i}];[{i}:a]asetpts=PTS-STARTPTS[a{i}];'   # "[0:v]setpts=PTS-STARTPTS[v0];[0:a]asetpts=PTS-STARTPTS[a0];[1:v]setpts=PTS-STARTPTS[v1];[1:a]asetpts=PTS-STARTPTS[a1];[2:v]setpts=PTS-STARTPTS[v2];[2:a]asetpts=PTS-STARTPTS[a2]"
for i in range(n):
filter_complex_str += f'[v{i}][a{i}]'  # ";[v0][a0][v1][a1][v2][a2]"
filter_complex_str += f'concat=n={n}:v=1:a=1[v][a]'
# Get the video stream URL of every YouTube HTTP URL.
# Add -i before each URL (to be used as FFmpeg input).
playlist_url = []
for video in playlist:
yt = YouTube(video["url"])
# https://github.com/pytube/pytube/issues/301
stream_url = yt.streams[0].url  # Get the URL of the video stream
playlist_url.append('-i')
playlist_url.append(stream_url)

rtmp_url = "rtmp://127.0.0.1/live/H1P_x5WPF"
ffplay_cmd = ['ffplay', '-listen', '1', '-i', rtmp_url]  # Start the TCP server first, before the sending client.
ffplay_process = Popen(ffplay_cmd, stderr=DEVNULL)  # Use FFplay sub-process for receiving the RTMP video.
stream_cmd = [
"ffmpeg", "-loglevel", "error",
"-hide_banner", "-re"] + playlist_url + ["-filter_complex",
filter_complex_str,  # '[0:v][0:a][1:v][1:a][2:v][2:a]concat=n=3:v=1:a=1[v][a]'
"-map", "[v]", "-map", "[a]",
"-c:v", "libx264",
"-f", "flv",
"-b:v", "3000k", "-minrate", "3000k",
"-maxrate", "3000k", "-bufsize", "3000k",
"-r", "25", "-pix_fmt", "yuv420p",
rtmp_url]
run(stream_cmd)
time.sleep(60)  # Wait 60 seconds before closing FFplay
ffplay_process.kill()  # Forcefully close FFplay sub-process

最新更新