如何通过 ZeroMQ 套接字发送 OpenCV 视频片段



我有一个简单的网络摄像头,我用OpenCV读出了它,我现在正在尝试使用ZeroMQ将此视频片段发送到不同的(Python)程序。所以我有以下简单的脚本来读出网络摄像头并使用 ZeroMQ 套接字发送它:

import cv2
import os
import zmq
import base64
context = zmq.Context()
footage_socket = context.socket(zmq.PUB)
footage_socket.connect('tcp://localhost:5555')
# init the camera
camera = cv2.VideoCapture(0)
while True:
try:
(grabbed, frame) = camera.read()            # grab the current frame
frame = cv2.resize(frame, (640, 480))       # resize the frame
footage_socket.send_string(base64.b64encode(frame))
# Show the video in a window
cv2.imshow("Frame", frame)                  # show the frame to our screen
cv2.waitKey(1)                              # Display it at least one ms
#                                           # before going to the next frame
except KeyboardInterrupt:
camera.release()
cv2.destroyAllWindows()
print "nnBye byen"
break

这很有效,因为它显示了视频并且不会给出任何错误。

我注释掉了显示图像的两行(cv2.imshow()cv2.waitKey(1))。然后,我以 paralel 的形式启动了下面的脚本。第二个脚本应接收视频片段并显示它。

import cv2
import zmq
import base64
import numpy as np
context = zmq.Context()
footage_socket = context.socket(zmq.SUB)
footage_socket.bind('tcp://*:5555')
footage_socket.setsockopt_string(zmq.SUBSCRIBE, unicode(''))
# camera = cv2.VideoCapture("output.avi")
while True:
try:
frame = footage_socket.recv_string()
frame = np.fromstring(base64.b64decode(frame), dtype=np.uint8)
cv2.imshow("Frame", frame)                  # show the frame to our screen
cv2.waitKey(1)                              # Display it at least one ms
#                                           # before going to the next frame
except KeyboardInterrupt:
cv2.destroyAllWindows()
break
print "nnBye byen"

不幸的是,这冻结在cv2.waitKey(1).

有人知道我在这里做错了什么吗?我需要以不同的方式解码素材吗?欢迎所有提示!

最后,我通过采取中间步骤解决了这个问题。我首先将单个图像写入磁盘,然后再次读出这些图像。这让我意识到我需要将帧编码为图像(我选择了jpg),并且使用cv2.imencode('.jpg', frame)cv2.imdecode(npimg, 1)的神奇方法,我可以使其工作。我在下面粘贴了完整的工作代码。

第一个脚本读出网络摄像头并通过 zeromq 套接字发送素材:

import cv2
import zmq
import base64
context = zmq.Context()
footage_socket = context.socket(zmq.PUB)
footage_socket.connect('tcp://localhost:5555')
camera = cv2.VideoCapture(0)  # init the camera
while True:
try:
(grabbed, frame) = camera.read()  # grab the current frame
frame = cv2.resize(frame, (640, 480))  # resize the frame
encoded, buffer = cv2.imencode('.jpg', frame)
footage_socket.send_string(base64.b64encode(buffer))

except KeyboardInterrupt:
camera.release()
cv2.destroyAllWindows()
print "nnBye byen"
break

第二个脚本接收帧图像并显示它们:

import cv2
import zmq
import base64
import numpy as np
context = zmq.Context()
footage_socket = context.socket(zmq.SUB)
footage_socket.bind('tcp://*:5555')
footage_socket.setsockopt_string(zmq.SUBSCRIBE, unicode(''))
while True:
try:
frame = footage_socket.recv_string()
img = base64.b64decode(frame)
npimg = np.fromstring(img, dtype=np.uint8)
source = cv2.imdecode(npimg, 1)
cv2.imshow("image", source)
cv2.waitKey(1)
except KeyboardInterrupt:
cv2.destroyAllWindows()
print "nnBye byen"
break

第 0 步:风险清单

鉴于目标明确,分布式应用程序基础架构的快速原型设计取决于几个风险点。

0)OpenCVcv2模块大量使用基于C的底层组件,如果试图使用cv2.imshow()设施和cv2(外部FSA)窗口管理和框架显示服务,python美女经常挂起。

1)ZeroMQ 框架可以帮助您的不仅仅是尝试强制将数据转换为(string)只是为了使用.send_string() / .recv_string()- 一旦在这里可能会变得更好,而不是将已知图像移动到几何图形( pxW * pxH )* RGB 深度在一些更智能的 BLOB 映射对象中(将在下面的架构展望中更多地讨论这方面)。

2)给定第0点和第1点,ZeroMQ基础设施(在原型设计中越多)应该变得强大,以应对未经处理的异常(这将使zmq.Context()实例及其关联的.Socket()子项在分配的资源上挂起,以防cv2.imshow()崩溃,因为它经常在快速原型循环中这样做。

因此,在try: except: finally:处理程序中对代码进行彻底和自律的框架,并在套接字实例化后立即显式初始.setsockopt( zmq.LINGER, 0 )+最终.close()+context.term()finally:处理程序部分内是公平的。


步骤 1:通过发送纯int-sSEQ来验证 ZeroMQ 部分

最好的第一级问题隔离是设置流,只是为了提供不受控制的整数SEQ.send( )PUB端广播。

...                                                   # FOR STREAMING, ALWAYS PREFER DESIGNS USING A NONBLOCKING MODE
SEQ += 1
footage_socket.send( SEQ, zmq.NOBLOCK )               # PUB.send( SEQ ) -> *SUB*
...

除非您的接收方证明它对 int-s 流的可靠处理,否则继续前进是没有意义的。

...
aMsgIN = footage_socket.recv( zmq.NOBLOCK )           # FOR STREAMING, ALWAYS PREFER DESIGNS USING A NONBLOCKING MODE
print "{0:}".format( aMsgIN if len( aMsgIN ) > 0 else "." ),
# sleep(...)                                          # backthrottle the loop a bit
...

第 2 步:将.imshow().recv()数据和事件循环分离

如果您的数据泵根据需要工作,远程显示器开始作为下一个目标有意义。

ZeroMQ要么提供完整的消息(BLOB),要么什么都不提供。这是事实。接下来,ZeroMQ不保证任何人提供无差错。这些是事实,你的设计必须接受。

采集是更简单的部分,只需抓取数据(也许一些色彩空间转换可能在这里集中进行,但除此之外,任务是(对于低于 4K/低于 30fps 的图像处理)通常没有错误在这一侧。

frame,如上所述,是一个numpy.ndarray实例。发送硬编码的二进制映射 BLOB 将获得最佳性能,没有任何"智能"转换,因为显然,frame只是一大堆位(尽管 ZeroMQ 的零拷贝机制可能有一些更高级的美味佳肴,但这些在发送端的现阶段不会有直接的好处)。

#                               struct.unpack() / .pack() seems "just"-enough for fast & smart custom Payload protocol designs
DecodeWireMSG_DATA( aMSG_DATA = struct.unpack( "!" + ( aPayloadHEADER + ( ( pxW * pxH ) * RGB_DEPTH * CELL_SIZE ) ) * "I", aMsgIN ) )

更难的部分是在接收器方面。如果尝试使用隐藏在.imshow()中的cv2内置事件循环引擎,此循环将与通过从PUB端发布的.recv()读取更新流的外部逻辑发生冲突。

作为一个合理的折衷方案,人们可以忽略所有"延迟"的frame-s,这些没有使过程与PUB端采集/广播节奏同步,而只显示最新的一个......在 ZeroMQ 传输基础设施上使用zmq.CONFLATE选项(如果重建事件流的目的只是视觉接收,延迟的"旧"图片就失去了意义,相反,如果目的是记录完整的采集 1:1,zmq.CONFLATE将是丢弃应该得到处理的frame实例, 因此,应该为这种 1:1 的文档目的添加一些其他架构,最好与数据流/处理的"仅视觉"分支分开)。

完成此操作后,.recv()(可能是Poller()+.recv()循环的组合)将为SUB方提供适当的数据泵,该泵独立于cv2工具,并且.imshow()(隐藏)-FSA事件循环。


架构和性能提示:

  • 对于更高的 fps + 全高清/2K/4K 项目,使用 zmq 系统地分析cv2处理的累积处理延迟/延迟。秒表() 实例的{ .start(), .stop() }方法。

  • 有了硬数据,你可能会及时发现,当额外的需求出现解决一些更难的实时限制时,所以考虑一下:

  • 原则上避免任何陷入任何无法控制的 Python 垃圾收集黑洞的风险——始终控制关键路径部分周围的{ gc.disable() | gc.enable();gc.collect() },并在您的设计知道可行的情况下启动显式gc.collect()

  • 避免新的内存分配延迟 -- 可以预先分配所有必要的numpy阵列,以后只强制numpy用于这些阵列的就地数据修改模式,从而避免任何进一步的临时内存管理相关的等待状态

  • 设计流量以提高抗错性,对整个大/(彩色)深图像的单独、多流、独立更新(条纹)(请记住零保修 - 获得完整的"胖"消息或None)

  • 使用zmq.AFFINITY将不同类别的 I/O 流量优先级映射到隔离的zmq.Context( N )I/O 线程上,调整.Context()的性能。

  • 在 PUB 端微调zmq.SNDBUF+zmq.SNDHWM,如果预期有多个订阅者并且不使用zmq.CONFLATE

  • 最后但并非最不重要的一点是,可以利用numba.jit()LLVM预编译的可重用代码加速用于关键路径函数(通常是繁重的numpy处理),其中额外的微秒剃除会对您的视频处理管道产生最有益的影响,同时仍然保持纯Python的舒适性(嗯,当然,仍然有一些cv2警告)。


原型设计阶段cv2的更多提示和技巧:

对于基于cv2的图像处理,可能会喜欢这个。

可能会喜欢cv2简单的 GUI 交互式参数调整方法。

可能会喜欢这样cv2处理管道分析,zmq.Stopwatch()详细信息低至[usec]

帧对象包含服务器状态的内存状态,当它发送到客户端时,它会冻结,因为它收到的帧是浅拷贝。 尝试查找如何为框架制作深拷贝。

最新更新