我在python中使用Opencv使用多处理来获取视频帧。
我的类是这样的:-
import cv2
from multiprocessing import Process, Queue
class StreamVideos:
def __init__(self):
self.image_data = Queue()
def start_proces(self):
p = Process(target=self.echo)
p.start()
def echo(self):
cap = cv2.VideoCapture('videoplayback.mp4')
while cap.isOpened():
ret,frame = cap.read()
self.image_data.put(frame)
# print("frame")
我开始这个过程"echo"使用:-
p = Process(target=self.echo)
p.start()
echo函数是这样的:-
def echo(self):
cap = cv2.VideoCapture('videoplayback.mp4')
while cap.isOpened():
ret,frame = cap.read()
self.image_data.put(frame)
我在里面使用了队列我在里面放了这些帧
self.image_data.put(frame)
然后在另一个过程中我开始恢复这些帧
self.obj = StreamVideos()
def start_process(self):
self.obj.start_proces()
p = Process(target=self.stream_videos)
p.start()
def stream_videos(self):
while True:
self.img = self.obj.image_data.get()
print(self.img)
但是当我开始将帧放入队列时,ram很快就会被填满,系统就会卡住。我使用的视频只有25fps和39mb大小,所以它没有任何意义。
我注意到的一件事是"回声"进程在stream_videos_quot;之前将许多帧放入队列中。进程检索它。
这个问题的根源是什么?
提前感谢。
预期: -
- 能够连续检索帧。
尝试:-
- 不将帧放入队列,在这种情况下ram未被填充。
下面是一个通用的单生产者/多消费者实现。生产者(类StreamVideos
)创建一个共享内存数组,其大小是视频帧中的字节数。一个或多个消费者(指定StreamVideos
的消费者数量)可以调用StreamVideos.get_next_frame()
来检索下一帧。该方法将共享数组转换回numpy
数组以供后续处理。生产者只有在所有消费者调用get_next_frame
:
#!/usr/bin/env python3
import multiprocessing
import numpy as np
import ctypes
import cv2
class StreamVideos:
def __init__(self, path, n_consumers):
"""
path is the path to the video:
n_consumers is the number of tasks to which we will be sreaming this.
"""
self._path = path
self._event = multiprocessing.Event()
self._barrier = multiprocessing.Barrier(n_consumers + 1, self._reset_event)
# Discover how large a framesize is by getting the first frame
cap = cv2.VideoCapture(self._path)
ret, frame = cap.read()
if ret:
self._shape = frame.shape
frame_size = self._shape[0] * self._shape[1] * self._shape[2]
self._arr = multiprocessing.RawArray(ctypes.c_ubyte, frame_size)
else:
self._arr = None
cap.release()
def _reset_event(self):
self._event.clear()
def start_streaming(self):
cap = cv2.VideoCapture(self._path)
while True:
self._barrier.wait()
ret, frame = cap.read()
if not ret:
# No more readable frames:
break
# Store frame into shared array:
temp = np.frombuffer(self._arr, dtype=frame.dtype)
temp[:] = frame.flatten(order='C')
self._event.set()
cap.release()
self._arr = None
self._event.set()
def get_next_frame(self):
# Tell producer that this consumer is through with the previous frame:
self._barrier.wait()
# Wait for next frame to be read by the producer:
self._event.wait()
if self._arr is None:
return None
# Return shared array as a numpy array:
return np.ctypeslib.as_array(self._arr).reshape(self._shape)
def consumer(producer, id):
frame_name = f'Frame - {id}'
while True:
frame = producer.get_next_frame()
if frame is None:
break
cv2.imshow(frame_name, frame)
cv2.waitKey(1)
cv2.destroyAllWindows()
def main():
producer = StreamVideos('videoplayback.mp4', 2)
consumer1 = multiprocessing.Process(target=consumer, args=(producer, 1))
consumer1.start()
consumer2 = multiprocessing.Process(target=consumer, args=(producer, 2))
consumer2.start()
"""
# Run as a child process:
producer_process = multiprocessing.Process(target=producer.start_streaming)
producer_process.start()
producer_process.join()
"""
# Run in main process:
producer.start_streaming()
consumer1.join()
consumer2.join()
if __name__ == '__main__':
main()