Как читать веб-камеру в отдельном процессе на OSX?

Я читаю веб-камеру на OSX, которая отлично работает с этим простым script:

import cv2
camera = cv2.VideoCapture(0)

while True:
    try:
        (grabbed, frame) = camera.read()  # grab the current frame
        frame = cv2.resize(frame, (640, 480))  # resize the frame
        cv2.imshow("Frame", frame)  # show the frame to our screen
        cv2.waitKey(1)  # Display it at least one ms before going to the next frame
    except KeyboardInterrupt:
        # cleanup the camera and close any open windows
        camera.release()
        cv2.destroyAllWindows()
        print "\n\nBye bye\n"
        break

Теперь я хочу прочитать видео в отдельном процессе, для которого у меня есть script, который намного длиннее и который правильно считывает видео в отдельном процессе в Linux:

import numpy as np
import time
import ctypes
import argparse

from multiprocessing import Array, Value, Process
import cv2


class VideoCapture:
    """
    Class that handles video capture from device or video file
    """
    def __init__(self, device=0, delay=0.):
        """
        :param device: device index or video filename
        :param delay: delay between frame captures in seconds(floating point is allowed)
        """
        self._cap = cv2.VideoCapture(device)
        self._delay = delay

    def _proper_frame(self, delay=None):
        """
        :param delay: delay between frames capture(in seconds)
        :param finished: synchronized wrapper for int(see multiprocessing.Value)
        :return: frame
        """
        snapshot = None
        correct_img = False
        fail_counter = -1
        while not correct_img:
            # Capture the frame
            correct_img, snapshot = self._cap.read()
            fail_counter += 1
            # Raise exception if there no output from the device
            if fail_counter > 10:
                raise Exception("Capture: exceeded number of tries to capture the frame.")
            # Delay before we get a new frame
            time.sleep(delay)
        return snapshot

    def get_size(self):
        """
        :return: size of the captured image
        """
        return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
                int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)

    def get_stream_function(self):
        """
        Returns stream_function object function
        """

        def stream_function(image, finished):
            """
            Function keeps capturing frames until finished = 1
            :param image: shared numpy array for multiprocessing(see multiprocessing.Array)
            :param finished: synchronized wrapper for int(see multiprocessing.Value)
            :return: nothing
            """
            # Incorrect input array
            if image.shape != self.get_size():
                raise Exception("Capture: improper size of the input image")
            print("Capture: start streaming")
            # Capture frame until we get finished flag set to True
            while not finished.value:
                image[:, :, :] = self._proper_frame(self._delay)
            # Release the device
            self.release()

        return stream_function

    def release(self):
        self._cap.release()


def main():
    # Add program arguments
    parser = argparse.ArgumentParser(description='Captures the video from the webcamera and \nwrites it into the output file with predefined fps.', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('-output', dest="output",  default="output.avi", help='name of the output video file')
    parser.add_argument('-log', dest="log",  default="frames.log", help='name of the log file')
    parser.add_argument('-fps', dest="fps",  default=25., help='frames per second value')

    # Read the arguments if any
    result = parser.parse_args()
    fps = float(result.fps)
    output = result.output
    log = result.log

    # Initialize VideoCapture object and auxilary objects
    cap = VideoCapture()
    shape = cap.get_size()
    stream = cap.get_stream_function()

    # Define shared variables(which are synchronised so race condition is excluded)
    shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
    frame = np.ctypeslib.as_array(shared_array_base.get_obj())
    frame = frame.reshape(shape[0], shape[1], shape[2])
    finished = Value('i', 0)

    # Start processes which run in parallel
    video_process = Process(target=stream, args=(frame, finished))
    video_process.start()  # Launch capture process

    # Sleep for some time to allow videocapture start working first
    time.sleep(2)

    # Termination function
    def terminate():
        print("Main: termination")
        finished.value = True
        # Wait for all processes to finish
        time.sleep(1)
        # Terminate working processes
        video_process.terminate()

    # The capturing works until keyboard interrupt is pressed.
    while True:
        try:
            # Display the resulting frame
            cv2.imshow('frame', frame)
            cv2.waitKey(1)  # Display it at least one ms before going to the next frame
            time.sleep(0.1)

        except KeyboardInterrupt:
            cv2.destroyAllWindows()
            terminate()
            break

if __name__ == '__main__':
    main()

Это отлично работает в Linux, но на OSX у меня проблемы, потому что он не может сделать .read() для созданного объекта cv2.VideoCapture(device) (сохраненного в var self._cap).

После некоторого поиска я нашел этот SO-ответ, в котором предлагается использовать Billiard, замена многопроцессорности питонов, которая, предположительно, имеет некоторые очень полезные улучшения. Поэтому в верхней части файла я просто добавил импорт после моего предыдущего многопроцессорного импорта (фактически переопределяя multiprocessing.Process):

from billiard import Process, forking_enable

и непосредственно перед созданием переменной video_process я использую forking_enable следующим образом:

forking_enable(0)  # Supposedly this is all I need for billiard to do it magic
video_process = Process(target=stream, args=(frame, finished))

Итак, в этой версии (здесь, на pastebin) Затем я снова запустил файл, что дает мне эту ошибку:

pickle.PicklingError: Невозможно рассолить: он не найден как main.stream_function

Поиск этой ошибки привел меня к вопросу SO с длинным списком ответов, в котором мне было предложено использовать dill serialization lib, чтобы решить эту проблему. Однако этот lib следует использовать с вилкой для многопроцессорности Pathos. Поэтому я просто попытался изменить строку импорта многопроцессорности из

from multiprocessing import Array, Value, Process

to

from pathos.multiprocessing import Array, Value, Process

Но ни один из Array, Value и Process не существует в пакете pathos.multiprocessing.

И с этого момента я полностью потерялся. Я ищу вещи, о которых у меня едва хватает знаний, и я даже не знаю, в каком направлении мне нужно искать или отлаживать больше.

Так может ли какая-нибудь более яркая душа, чем я, помогать мне снимать видео в отдельном процессе? Все советы приветствуются!

Ответы

Ответ 1

Основная проблема с multiprocessing заключается в понимании модели памяти в случае разделенных адресных пространств памяти.

Python делает вещи еще более запутанными, поскольку он абстрагирует многие из этих аспектов, скрывая несколько механизмов под несколькими невинными API-интерфейсами.

Когда вы пишете эту логику:

# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
stream = cap.get_stream_function()

...

# Start processes which run in parallel
video_process = Process(target=stream, args=(frame, finished))
video_process.start()  # Launch capture process

Вы переходите к Process stream_function, который относится к внутренним компонентам класса VideoCapture (self.get_size), но, что более важно, который недоступен как функция верхнего уровня.

Детский процесс не сможет перестроить требуемый объект как то, что получает его только как имя функции. Он пытается найти его в главном модуле, следовательно, сообщение:

pickle.PicklingError: Невозможно рассолковать: он не найден как main.stream_function

Детский процесс пытается разрешить функцию в основном модуле как main.stream_function, и поиск не выполняется.

Мое первое предложение состояло в том, чтобы изменить вашу логику так, чтобы вы передавали дочернему процессу метод, возвращающий stream_function.

video_process = Process(target=cap.get_stream_function, args=(...))

Тем не менее вы можете столкнуться с проблемами, так как вы состояние совместного использования между двумя процессами.

То, что я обычно предлагаю людям, когда они подходят к многопроцессорным парадигмам в Python, - это думать о процессах, как если бы они работали на отдельных машинах. В этих случаях было бы очевидно, что ваша архитектура является проблематичной.

Я бы рекомендовал вам разделить обязанности двух процессов, чтобы один процесс (ребенок) имел дело со всем захватом видео, а другой (родительский или третий процесс) имеет дело с обработкой кадры.

Эта парадигма известна как Продюсер и проблема потребителей, и она очень хорошо подходит для вашей системы. Процесс захвата видео будет производителем, а другой - потребителем. Простой multiprocessing.Pipe или multiprocessing.Queue гарантирует, что кадры будут переданы от производителя потребителю, как только они будут готовы.

Добавление примера в псевдокод, поскольку я не знаю API захвата видео. Речь идет о всей логике захвата видео в процессе производителя, абстрагируя ее от потребителя. Только вещи, которые должен знать потребитель, это то, что он получает объект кадра через трубу.

def capture_video(writer):
    """This runs in the producer process."""
    # The VideoCapture class wraps the video acquisition logic
    cap = VideoCapture()

    while True:
        frame = cap.get_next_frame()  # the method returns the next frame
        writer.send(frame)  # send the new frame to the consumer process

def main():
    reader, writer = multiprocessing.Pipe(False)

    # producer process
    video_process = Process(target=capture_video, args=[writer])
    video_process.start()  # Launch capture process

    while True:
        try:
            frame = reader.recv()  # receive next frame from the producer
            process_frame(frame)
        except KeyboardInterrupt:
            video_process.terminate()
            break

Обратите внимание на то, что между процессами нет общего состояния (нет необходимости использовать какой-либо массив). Связь проходит через Pipes и является однонаправленной, что делает логику очень простой. Как я сказал выше, эта логика будет работать и на разных машинах. Вам просто нужно заменить трубку розеткой.

Вам может потребоваться более простой подход к завершению процесса производителя. Я предлагаю вам использовать multiprocessing.Event. Просто установите его из родителя в блоке KeyboardInterrupt и проверьте его статус в дочернем элементе на каждой итерации (while not event.is_set()).

Ответ 2

Ваша первая проблема заключалась в том, что вы не могли получить доступ к веб-камере в процессе forked. Возникает несколько проблем, когда внешние библиотеки используются с fork, поскольку операция fork не очищает все дескрипторы файлов, открытые родительским процессом, что приводит к странному поведению. Библиотека часто более надежна для такого рода проблем в Linux, но не рекомендуется совместно использовать объект IO, такой как cv2.VideoCapture между двумя процессами.

Когда вы используете billard.forking_enabled и устанавливаете его на False, вы просите библиотеку не использовать fork для создания нового процесса, но spawn или forkserver методов, которые являются более чистыми, когда они закрывают весь файл дескрипторы, но также медленнее начать, это не должно быть проблемой в вашем случае. Если вы используете python3.4+, вы можете сделать это, используя multiprocessing.set_start_method('forkserver'), например.

При использовании одного из этих методов целевая функция и аргументы должны быть сериализованы для передачи дочернему процессу. Сериализация выполняется по умолчанию с помощью pickle, который имеет несколько потоков, как вы упомянули, не в состоянии сериализовать локально определенные объекты, а также cv2.VideoCapture. Но вы можете упростить свою программу, чтобы сделать все аргументы для вашего Process picklelisable. Ниже приведено предварительное решение вашей проблемы:

import numpy as np
import time
import ctypes

from multiprocessing import set_start_method
from multiprocessing import Process, Array, Value
import cv2


class VideoCapture:
    """
    Class that handles video capture from device or video file
    """
    def __init__(self, device=0, delay=0.):
        """
        :param device: device index or video filename
        :param delay: delay between frame captures in seconds(float allowed)
        """
        self._delay = delay
        self._device = device
        self._cap = cv2.VideoCapture(device)
        assert self._cap.isOpened()

    def __getstate__(self):
        self._cap.release()
        return (self._delay, self._device)

    def __setstate__(self, state):
        self._delay, self._device = state
        self._cap = cv2.VideoCapture(self._device)
        assert self._cap.grab(), "The child could not grab the video capture"

    def _proper_frame(self, delay=None):
        """
        :param delay: delay between frames capture(in seconds)
        :param finished: synchronized wrapper for int
        :return: frame
        """
        snapshot = None
        correct_img = False
        fail_counter = -1
        while not correct_img:
            # Capture the frame
            correct_img, snapshot = self._cap.read()
            fail_counter += 1
            # Raise exception if there no output from the device
            if fail_counter > 10:
                raise Exception("Capture: exceeded number of tries to capture "
                                "the frame.")
            # Delay before we get a new frame
            time.sleep(delay)
        return snapshot

    def get_size(self):
        """
        :return: size of the captured image
        """
        return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
                int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)

    def release(self):
        self._cap.release()


def stream(capturer, image, finished):
    """
    Function keeps capturing frames until finished = 1
    :param image: shared numpy array for multiprocessing
    :param finished: synchronized wrapper for int
    :return: nothing
    """
    shape = capturer.get_size()

    # Define shared variables
    frame = np.ctypeslib.as_array(image.get_obj())
    frame = frame.reshape(shape[0], shape[1], shape[2])

    # Incorrect input array
    if frame.shape != capturer.get_size():
        raise Exception("Capture: improper size of the input image")
    print("Capture: start streaming")
    # Capture frame until we get finished flag set to True
    while not finished.value:
        frame[:, :, :] = capturer._proper_frame(capturer._delay)

    # Release the device
    capturer.release()


def main():

    # Initialize VideoCapture object and auxilary objects
    cap = VideoCapture()
    shape = cap.get_size()

    # Define shared variables
    shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
    frame = np.ctypeslib.as_array(shared_array_base.get_obj())
    frame = frame.reshape(shape[0], shape[1], shape[2])
    finished = Value('i', 0)

    # Start processes which run in parallel
    video_process = Process(target=stream,
                            args=(cap, shared_array_base, finished))
    video_process.start()  # Launch capture process

    # Sleep for some time to allow videocapture start working first
    time.sleep(2)

    # Termination function
    def terminate():
        print("Main: termination")
        finished.value = True
        # Wait for all processes to finish
        time.sleep(1)
        # Terminate working processes
        video_process.join()

    # The capturing works until keyboard interrupt is pressed.
    while True:
        try:
            # Display the resulting frame
            cv2.imshow('frame', frame)
            # Display it at least one ms before going to the next frame
            time.sleep(0.1)
            cv2.waitKey(1)

        except KeyboardInterrupt:
            cv2.destroyAllWindows()
            terminate()
            break


if __name__ == '__main__':
    set_start_method("spawn")
    main()

Я не мог проверить его на mac в данный момент, поэтому он может не работать из коробки, но не должно быть multiprocessing связанных ошибок. Некоторые примечания:

  • Я создаю экземпляр объекта cv2.VideoCapture в новом дочернем элементе и захватываю камеру, поскольку только один процесс следует читать с камеры.
  • Возможно, проблема в вашей первой программе с fork происходит только из-за общего cv2.VideoCapture и воссоздания ее в функции stream поможет решить вашу проблему.
  • Вы не можете передать оболочку numpy в дочерний элемент, поскольку он не будет использовать буфер mp.Array (это действительно странно, и мне потребовалось некоторое время, чтобы понять). Вы должны явно передать Array и воссоздать оболочку.
  • Возможно, проблема в вашей первой программе с fork связана только с общим cv2.VideoCapture и воссозданием ее в функции stream разрешит вашу проблему.

  • Я предположил, что вы запускаете свой код в python3.4+, поэтому я не использовал billard, но использование forking_enabled(False) вместо set_start_method должно быть похоже.

Сообщите мне, если это работает!