Как читать веб-камеру в отдельном процессе на OSX?
Я читаю веб-камеру на OSX, которая отлично работает с этим простым script:
import cv2
camera = cv2.VideoCapture(0)
while True:
try:
(grabbed, frame) = camera.read() # grab the current frame
frame = cv2.resize(frame, (640, 480)) # resize the frame
cv2.imshow("Frame", frame) # show the frame to our screen
cv2.waitKey(1) # Display it at least one ms before going to the next frame
except KeyboardInterrupt:
# cleanup the camera and close any open windows
camera.release()
cv2.destroyAllWindows()
print "\n\nBye bye\n"
break
Теперь я хочу прочитать видео в отдельном процессе, для которого у меня есть script, который намного длиннее и который правильно считывает видео в отдельном процессе в Linux:
import numpy as np
import time
import ctypes
import argparse
from multiprocessing import Array, Value, Process
import cv2
class VideoCapture:
"""
Class that handles video capture from device or video file
"""
def __init__(self, device=0, delay=0.):
"""
:param device: device index or video filename
:param delay: delay between frame captures in seconds(floating point is allowed)
"""
self._cap = cv2.VideoCapture(device)
self._delay = delay
def _proper_frame(self, delay=None):
"""
:param delay: delay between frames capture(in seconds)
:param finished: synchronized wrapper for int(see multiprocessing.Value)
:return: frame
"""
snapshot = None
correct_img = False
fail_counter = -1
while not correct_img:
# Capture the frame
correct_img, snapshot = self._cap.read()
fail_counter += 1
# Raise exception if there no output from the device
if fail_counter > 10:
raise Exception("Capture: exceeded number of tries to capture the frame.")
# Delay before we get a new frame
time.sleep(delay)
return snapshot
def get_size(self):
"""
:return: size of the captured image
"""
return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)
def get_stream_function(self):
"""
Returns stream_function object function
"""
def stream_function(image, finished):
"""
Function keeps capturing frames until finished = 1
:param image: shared numpy array for multiprocessing(see multiprocessing.Array)
:param finished: synchronized wrapper for int(see multiprocessing.Value)
:return: nothing
"""
# Incorrect input array
if image.shape != self.get_size():
raise Exception("Capture: improper size of the input image")
print("Capture: start streaming")
# Capture frame until we get finished flag set to True
while not finished.value:
image[:, :, :] = self._proper_frame(self._delay)
# Release the device
self.release()
return stream_function
def release(self):
self._cap.release()
def main():
# Add program arguments
parser = argparse.ArgumentParser(description='Captures the video from the webcamera and \nwrites it into the output file with predefined fps.', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-output', dest="output", default="output.avi", help='name of the output video file')
parser.add_argument('-log', dest="log", default="frames.log", help='name of the log file')
parser.add_argument('-fps', dest="fps", default=25., help='frames per second value')
# Read the arguments if any
result = parser.parse_args()
fps = float(result.fps)
output = result.output
log = result.log
# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
stream = cap.get_stream_function()
# Define shared variables(which are synchronised so race condition is excluded)
shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
frame = np.ctypeslib.as_array(shared_array_base.get_obj())
frame = frame.reshape(shape[0], shape[1], shape[2])
finished = Value('i', 0)
# Start processes which run in parallel
video_process = Process(target=stream, args=(frame, finished))
video_process.start() # Launch capture process
# Sleep for some time to allow videocapture start working first
time.sleep(2)
# Termination function
def terminate():
print("Main: termination")
finished.value = True
# Wait for all processes to finish
time.sleep(1)
# Terminate working processes
video_process.terminate()
# The capturing works until keyboard interrupt is pressed.
while True:
try:
# Display the resulting frame
cv2.imshow('frame', frame)
cv2.waitKey(1) # Display it at least one ms before going to the next frame
time.sleep(0.1)
except KeyboardInterrupt:
cv2.destroyAllWindows()
terminate()
break
if __name__ == '__main__':
main()
Это отлично работает в Linux, но на OSX у меня проблемы, потому что он не может сделать .read()
для созданного объекта cv2.VideoCapture(device)
(сохраненного в var self._cap
).
После некоторого поиска я нашел этот SO-ответ, в котором предлагается использовать Billiard, замена многопроцессорности питонов, которая, предположительно, имеет некоторые очень полезные улучшения. Поэтому в верхней части файла я просто добавил импорт после моего предыдущего многопроцессорного импорта (фактически переопределяя multiprocessing.Process
):
from billiard import Process, forking_enable
и непосредственно перед созданием переменной video_process
я использую forking_enable
следующим образом:
forking_enable(0) # Supposedly this is all I need for billiard to do it magic
video_process = Process(target=stream, args=(frame, finished))
Итак, в этой версии (здесь, на pastebin) Затем я снова запустил файл, что дает мне эту ошибку:
pickle.PicklingError: Невозможно рассолить: он не найден как main.stream_function
Поиск этой ошибки привел меня к вопросу SO с длинным списком ответов, в котором мне было предложено использовать dill serialization lib, чтобы решить эту проблему. Однако этот lib следует использовать с вилкой для многопроцессорности Pathos. Поэтому я просто попытался изменить строку импорта многопроцессорности из
from multiprocessing import Array, Value, Process
to
from pathos.multiprocessing import Array, Value, Process
Но ни один из Array
, Value
и Process
не существует в пакете pathos.multiprocessing
.
И с этого момента я полностью потерялся. Я ищу вещи, о которых у меня едва хватает знаний, и я даже не знаю, в каком направлении мне нужно искать или отлаживать больше.
Так может ли какая-нибудь более яркая душа, чем я, помогать мне снимать видео в отдельном процессе? Все советы приветствуются!
Ответы
Ответ 1
Основная проблема с multiprocessing
заключается в понимании модели памяти в случае разделенных адресных пространств памяти.
Python делает вещи еще более запутанными, поскольку он абстрагирует многие из этих аспектов, скрывая несколько механизмов под несколькими невинными API-интерфейсами.
Когда вы пишете эту логику:
# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
stream = cap.get_stream_function()
...
# Start processes which run in parallel
video_process = Process(target=stream, args=(frame, finished))
video_process.start() # Launch capture process
Вы переходите к Process
stream_function
, который относится к внутренним компонентам класса VideoCapture
(self.get_size
), но, что более важно, который недоступен как функция верхнего уровня.
Детский процесс не сможет перестроить требуемый объект как то, что получает его только как имя функции. Он пытается найти его в главном модуле, следовательно, сообщение:
pickle.PicklingError: Невозможно рассолковать: он не найден как main.stream_function
Детский процесс пытается разрешить функцию в основном модуле как main.stream_function
, и поиск не выполняется.
Мое первое предложение состояло в том, чтобы изменить вашу логику так, чтобы вы передавали дочернему процессу метод, возвращающий stream_function
.
video_process = Process(target=cap.get_stream_function, args=(...))
Тем не менее вы можете столкнуться с проблемами, так как вы состояние совместного использования между двумя процессами.
То, что я обычно предлагаю людям, когда они подходят к многопроцессорным парадигмам в Python, - это думать о процессах, как если бы они работали на отдельных машинах. В этих случаях было бы очевидно, что ваша архитектура является проблематичной.
Я бы рекомендовал вам разделить обязанности двух процессов, чтобы один процесс (ребенок) имел дело со всем захватом видео, а другой (родительский или третий процесс) имеет дело с обработкой кадры.
Эта парадигма известна как Продюсер и проблема потребителей, и она очень хорошо подходит для вашей системы. Процесс захвата видео будет производителем, а другой - потребителем. Простой multiprocessing.Pipe
или multiprocessing.Queue
гарантирует, что кадры будут переданы от производителя потребителю, как только они будут готовы.
Добавление примера в псевдокод, поскольку я не знаю API захвата видео. Речь идет о всей логике захвата видео в процессе производителя, абстрагируя ее от потребителя. Только вещи, которые должен знать потребитель, это то, что он получает объект кадра через трубу.
def capture_video(writer):
"""This runs in the producer process."""
# The VideoCapture class wraps the video acquisition logic
cap = VideoCapture()
while True:
frame = cap.get_next_frame() # the method returns the next frame
writer.send(frame) # send the new frame to the consumer process
def main():
reader, writer = multiprocessing.Pipe(False)
# producer process
video_process = Process(target=capture_video, args=[writer])
video_process.start() # Launch capture process
while True:
try:
frame = reader.recv() # receive next frame from the producer
process_frame(frame)
except KeyboardInterrupt:
video_process.terminate()
break
Обратите внимание на то, что между процессами нет общего состояния (нет необходимости использовать какой-либо массив). Связь проходит через Pipes и является однонаправленной, что делает логику очень простой. Как я сказал выше, эта логика будет работать и на разных машинах. Вам просто нужно заменить трубку розеткой.
Вам может потребоваться более простой подход к завершению процесса производителя. Я предлагаю вам использовать multiprocessing.Event
. Просто установите его из родителя в блоке KeyboardInterrupt
и проверьте его статус в дочернем элементе на каждой итерации (while not event.is_set()
).
Ответ 2
Ваша первая проблема заключалась в том, что вы не могли получить доступ к веб-камере в процессе forked
. Возникает несколько проблем, когда внешние библиотеки используются с fork
, поскольку операция fork не очищает все дескрипторы файлов, открытые родительским процессом, что приводит к странному поведению. Библиотека часто более надежна для такого рода проблем в Linux, но не рекомендуется совместно использовать объект IO, такой как cv2.VideoCapture
между двумя процессами.
Когда вы используете billard.forking_enabled
и устанавливаете его на False
, вы просите библиотеку не использовать fork
для создания нового процесса, но spawn
или forkserver
методов, которые являются более чистыми, когда они закрывают весь файл дескрипторы, но также медленнее начать, это не должно быть проблемой в вашем случае. Если вы используете python3.4+
, вы можете сделать это, используя multiprocessing.set_start_method('forkserver')
, например.
При использовании одного из этих методов целевая функция и аргументы должны быть сериализованы для передачи дочернему процессу. Сериализация выполняется по умолчанию с помощью pickle
, который имеет несколько потоков, как вы упомянули, не в состоянии сериализовать локально определенные объекты, а также cv2.VideoCapture
. Но вы можете упростить свою программу, чтобы сделать все аргументы для вашего Process
picklelisable. Ниже приведено предварительное решение вашей проблемы:
import numpy as np
import time
import ctypes
from multiprocessing import set_start_method
from multiprocessing import Process, Array, Value
import cv2
class VideoCapture:
"""
Class that handles video capture from device or video file
"""
def __init__(self, device=0, delay=0.):
"""
:param device: device index or video filename
:param delay: delay between frame captures in seconds(float allowed)
"""
self._delay = delay
self._device = device
self._cap = cv2.VideoCapture(device)
assert self._cap.isOpened()
def __getstate__(self):
self._cap.release()
return (self._delay, self._device)
def __setstate__(self, state):
self._delay, self._device = state
self._cap = cv2.VideoCapture(self._device)
assert self._cap.grab(), "The child could not grab the video capture"
def _proper_frame(self, delay=None):
"""
:param delay: delay between frames capture(in seconds)
:param finished: synchronized wrapper for int
:return: frame
"""
snapshot = None
correct_img = False
fail_counter = -1
while not correct_img:
# Capture the frame
correct_img, snapshot = self._cap.read()
fail_counter += 1
# Raise exception if there no output from the device
if fail_counter > 10:
raise Exception("Capture: exceeded number of tries to capture "
"the frame.")
# Delay before we get a new frame
time.sleep(delay)
return snapshot
def get_size(self):
"""
:return: size of the captured image
"""
return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)
def release(self):
self._cap.release()
def stream(capturer, image, finished):
"""
Function keeps capturing frames until finished = 1
:param image: shared numpy array for multiprocessing
:param finished: synchronized wrapper for int
:return: nothing
"""
shape = capturer.get_size()
# Define shared variables
frame = np.ctypeslib.as_array(image.get_obj())
frame = frame.reshape(shape[0], shape[1], shape[2])
# Incorrect input array
if frame.shape != capturer.get_size():
raise Exception("Capture: improper size of the input image")
print("Capture: start streaming")
# Capture frame until we get finished flag set to True
while not finished.value:
frame[:, :, :] = capturer._proper_frame(capturer._delay)
# Release the device
capturer.release()
def main():
# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
# Define shared variables
shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
frame = np.ctypeslib.as_array(shared_array_base.get_obj())
frame = frame.reshape(shape[0], shape[1], shape[2])
finished = Value('i', 0)
# Start processes which run in parallel
video_process = Process(target=stream,
args=(cap, shared_array_base, finished))
video_process.start() # Launch capture process
# Sleep for some time to allow videocapture start working first
time.sleep(2)
# Termination function
def terminate():
print("Main: termination")
finished.value = True
# Wait for all processes to finish
time.sleep(1)
# Terminate working processes
video_process.join()
# The capturing works until keyboard interrupt is pressed.
while True:
try:
# Display the resulting frame
cv2.imshow('frame', frame)
# Display it at least one ms before going to the next frame
time.sleep(0.1)
cv2.waitKey(1)
except KeyboardInterrupt:
cv2.destroyAllWindows()
terminate()
break
if __name__ == '__main__':
set_start_method("spawn")
main()
Я не мог проверить его на mac в данный момент, поэтому он может не работать из коробки, но не должно быть multiprocessing
связанных ошибок. Некоторые примечания:
- Я создаю экземпляр объекта
cv2.VideoCapture
в новом дочернем элементе и захватываю камеру, поскольку только один процесс следует читать с камеры.
- Возможно, проблема в вашей первой программе с
fork
происходит только из-за общего cv2.VideoCapture
и воссоздания ее в функции stream
поможет решить вашу проблему.
- Вы не можете передать оболочку numpy в дочерний элемент, поскольку он не будет использовать буфер
mp.Array
(это действительно странно, и мне потребовалось некоторое время, чтобы понять). Вы должны явно передать Array
и воссоздать оболочку.
-
Возможно, проблема в вашей первой программе с fork
связана только с общим cv2.VideoCapture
и воссозданием ее в функции stream
разрешит вашу проблему.
-
Я предположил, что вы запускаете свой код в python3.4+
, поэтому я не использовал billard
, но использование forking_enabled(False)
вместо set_start_method
должно быть похоже.
Сообщите мне, если это работает!