草庐IT

python - 如何在 OSX 上的单独进程中读取网络摄像头?

coder 2023-08-13 原文

我正在读取 OSX 上的网络摄像头,使用这个简单的脚本可以正常工作:

import cv2
camera = cv2.VideoCapture(0)

while True:
    try:
        (grabbed, frame) = camera.read()  # grab the current frame
        frame = cv2.resize(frame, (640, 480))  # resize the frame
        cv2.imshow("Frame", frame)  # show the frame to our screen
        cv2.waitKey(1)  # Display it at least one ms before going to the next frame
    except KeyboardInterrupt:
        # cleanup the camera and close any open windows
        camera.release()
        cv2.destroyAllWindows()
        print "\n\nBye bye\n"
        break

我现在想在一个单独的进程中阅读视频,我有一个更长的脚本,它在 Linux 上的一个单独的进程中正确读出视频:

import numpy as np
import time
import ctypes
import argparse

from multiprocessing import Array, Value, Process
import cv2


class VideoCapture:
    """
    Class that handles video capture from device or video file
    """
    def __init__(self, device=0, delay=0.):
        """
        :param device: device index or video filename
        :param delay: delay between frame captures in seconds(floating point is allowed)
        """
        self._cap = cv2.VideoCapture(device)
        self._delay = delay

    def _proper_frame(self, delay=None):
        """
        :param delay: delay between frames capture(in seconds)
        :param finished: synchronized wrapper for int(see multiprocessing.Value)
        :return: frame
        """
        snapshot = None
        correct_img = False
        fail_counter = -1
        while not correct_img:
            # Capture the frame
            correct_img, snapshot = self._cap.read()
            fail_counter += 1
            # Raise exception if there's no output from the device
            if fail_counter > 10:
                raise Exception("Capture: exceeded number of tries to capture the frame.")
            # Delay before we get a new frame
            time.sleep(delay)
        return snapshot

    def get_size(self):
        """
        :return: size of the captured image
        """
        return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
                int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)

    def get_stream_function(self):
        """
        Returns stream_function object function
        """

        def stream_function(image, finished):
            """
            Function keeps capturing frames until finished = 1
            :param image: shared numpy array for multiprocessing(see multiprocessing.Array)
            :param finished: synchronized wrapper for int(see multiprocessing.Value)
            :return: nothing
            """
            # Incorrect input array
            if image.shape != self.get_size():
                raise Exception("Capture: improper size of the input image")
            print("Capture: start streaming")
            # Capture frame until we get finished flag set to True
            while not finished.value:
                image[:, :, :] = self._proper_frame(self._delay)
            # Release the device
            self.release()

        return stream_function

    def release(self):
        self._cap.release()


def main():
    # Add program arguments
    parser = argparse.ArgumentParser(description='Captures the video from the webcamera and \nwrites it into the output file with predefined fps.', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('-output', dest="output",  default="output.avi", help='name of the output video file')
    parser.add_argument('-log', dest="log",  default="frames.log", help='name of the log file')
    parser.add_argument('-fps', dest="fps",  default=25., help='frames per second value')

    # Read the arguments if any
    result = parser.parse_args()
    fps = float(result.fps)
    output = result.output
    log = result.log

    # Initialize VideoCapture object and auxilary objects
    cap = VideoCapture()
    shape = cap.get_size()
    stream = cap.get_stream_function()

    # Define shared variables(which are synchronised so race condition is excluded)
    shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
    frame = np.ctypeslib.as_array(shared_array_base.get_obj())
    frame = frame.reshape(shape[0], shape[1], shape[2])
    finished = Value('i', 0)

    # Start processes which run in parallel
    video_process = Process(target=stream, args=(frame, finished))
    video_process.start()  # Launch capture process

    # Sleep for some time to allow videocapture start working first
    time.sleep(2)

    # Termination function
    def terminate():
        print("Main: termination")
        finished.value = True
        # Wait for all processes to finish
        time.sleep(1)
        # Terminate working processes
        video_process.terminate()

    # The capturing works until keyboard interrupt is pressed.
    while True:
        try:
            # Display the resulting frame
            cv2.imshow('frame', frame)
            cv2.waitKey(1)  # Display it at least one ms before going to the next frame
            time.sleep(0.1)

        except KeyboardInterrupt:
            cv2.destroyAllWindows()
            terminate()
            break

if __name__ == '__main__':
    main()

这在 Linux 上运行良好,但在 OSX 上我遇到了麻烦,因为它似乎无法执行 .read()在创建cv2.VideoCapture(device)对象(存储在 var self._cap 中)。

经过一番搜索,我找到了 this SO answer , 建议使用 Billiard ,pythons 多处理的替代品,据说有一些非常有用的改进。所以在文件的顶部,我只是在我之前的多处理导入之后添加了导入(有效地覆盖了 multiprocessing.Process ):

from billiard import Process, forking_enable

就在 video_process 的实例化之前我使用的变量 forking_enable如下:

forking_enable(0)  # Supposedly this is all I need for billiard to do it's magic
video_process = Process(target=stream, args=(frame, finished))

所以在这个版本 ( here on pastebin ) 中,我再次运行该文件,这给了我这个错误:

pickle.PicklingError: Can't pickle : it's not found as main.stream_function

搜索该错误后我找到了 an SO question with a long list of answers其中一个给了我使用 dill serialization lib 的建议解决这个问题。但是,该库应该与 Pathos multiprocessing fork 一起使用.所以我只是尝试更改我的多处理导入行

from multiprocessing import Array, Value, Process

from pathos.multiprocessing import Array, Value, Process

但是 Array 都没有, ValueProcess似乎存在于 pathos.multiprocessing包。

从这一点来看,我完全迷路了。我正在搜索我几乎没有足够知识的东西,我什至不知道我需要向哪个方向搜索或调试。

那么,有没有比我聪明的人可以帮助我在单独的过程中捕捉视频?欢迎所有提示!

最佳答案

您的第一个问题是您无法在 forked 进程中访问网络摄像头。当外部库与 fork 一起使用时会出现几个问题,因为 fork 操作不会清除父进程打开的所有文件描述符,从而导致奇怪的行为。该库通常对 Linux 上的此类问题更为稳健,但在两个进程之间共享诸如 cv2.VideoCapture 之类的 IO 对象并不是一个好主意。

当您使用 billard.forking_enabled 并将其设置为 False 时,您要求库不要使用 fork 来生成新进程,但是 spawnforkserver 方法,它们关闭所有文件描述符时更干净,但启动速度也较慢,这在您的情况下应该不是问题。如果您使用的是 python3.4+,您可以使用 multiprocessing.set_start_method('forkserver') 来执行此操作。

当您使用这些方法之一时,目标函数和参数需要序列化才能传递给子进程。默认情况下,序列化是使用 pickle 完成的,它有多个流程,如您提到的那样,无法序列化本地定义的对象以及 cv2.VideoCapture。但是您可以简化您的程序,使您的 Process 的所有参数都可以 picklelisable。这是解决您的问题的尝试:

import numpy as np
import time
import ctypes

from multiprocessing import set_start_method
from multiprocessing import Process, Array, Value
import cv2


class VideoCapture:
    """
    Class that handles video capture from device or video file
    """
    def __init__(self, device=0, delay=0.):
        """
        :param device: device index or video filename
        :param delay: delay between frame captures in seconds(float allowed)
        """
        self._delay = delay
        self._device = device
        self._cap = cv2.VideoCapture(device)
        assert self._cap.isOpened()

    def __getstate__(self):
        self._cap.release()
        return (self._delay, self._device)

    def __setstate__(self, state):
        self._delay, self._device = state
        self._cap = cv2.VideoCapture(self._device)
        assert self._cap.grab(), "The child could not grab the video capture"

    def _proper_frame(self, delay=None):
        """
        :param delay: delay between frames capture(in seconds)
        :param finished: synchronized wrapper for int
        :return: frame
        """
        snapshot = None
        correct_img = False
        fail_counter = -1
        while not correct_img:
            # Capture the frame
            correct_img, snapshot = self._cap.read()
            fail_counter += 1
            # Raise exception if there's no output from the device
            if fail_counter > 10:
                raise Exception("Capture: exceeded number of tries to capture "
                                "the frame.")
            # Delay before we get a new frame
            time.sleep(delay)
        return snapshot

    def get_size(self):
        """
        :return: size of the captured image
        """
        return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
                int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)

    def release(self):
        self._cap.release()


def stream(capturer, image, finished):
    """
    Function keeps capturing frames until finished = 1
    :param image: shared numpy array for multiprocessing
    :param finished: synchronized wrapper for int
    :return: nothing
    """
    shape = capturer.get_size()

    # Define shared variables
    frame = np.ctypeslib.as_array(image.get_obj())
    frame = frame.reshape(shape[0], shape[1], shape[2])

    # Incorrect input array
    if frame.shape != capturer.get_size():
        raise Exception("Capture: improper size of the input image")
    print("Capture: start streaming")
    # Capture frame until we get finished flag set to True
    while not finished.value:
        frame[:, :, :] = capturer._proper_frame(capturer._delay)

    # Release the device
    capturer.release()


def main():

    # Initialize VideoCapture object and auxilary objects
    cap = VideoCapture()
    shape = cap.get_size()

    # Define shared variables
    shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
    frame = np.ctypeslib.as_array(shared_array_base.get_obj())
    frame = frame.reshape(shape[0], shape[1], shape[2])
    finished = Value('i', 0)

    # Start processes which run in parallel
    video_process = Process(target=stream,
                            args=(cap, shared_array_base, finished))
    video_process.start()  # Launch capture process

    # Sleep for some time to allow videocapture start working first
    time.sleep(2)

    # Termination function
    def terminate():
        print("Main: termination")
        finished.value = True
        # Wait for all processes to finish
        time.sleep(1)
        # Terminate working processes
        video_process.join()

    # The capturing works until keyboard interrupt is pressed.
    while True:
        try:
            # Display the resulting frame
            cv2.imshow('frame', frame)
            # Display it at least one ms before going to the next frame
            time.sleep(0.1)
            cv2.waitKey(1)

        except KeyboardInterrupt:
            cv2.destroyAllWindows()
            terminate()
            break


if __name__ == '__main__':
    set_start_method("spawn")
    main()

我目前无法在 mac 上测试它,所以它可能无法开箱即用,但不应该有 multiprocessing 相关错误。一些注意事项:

  • 我在新子对象中实例化 cv2.VideoCapture 对象并抓取摄像头,因为只有一个进程应该从摄像头读取数据。
  • 也许您使用 fork 的第一个程序中的问题只是由于共享 cv2.VideoCapture 并在 stream 函数中重新创建它会解决您的问题。
  • 您不能将 numpy 包装器传递给 child ,因为它不会共享 mp.Array 缓冲区(这真的很奇怪,我花了一段时间才弄清楚)。您需要显式传递 Array 并重新创建包装器。
  • 也许您使用 fork 的第一个程序中的问题只是由于共享 cv2.VideoCapture 并在 stream 中重新创建它> 功能将解决您的问题。

  • 我假设您在 python3.4+ 中运行代码,所以我没有使用 billard,而是使用了 forking_enabled(False) 而不是 set_start_method 应该有点相似。

让我知道这是否有效!

关于python - 如何在 OSX 上的单独进程中读取网络摄像头?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43572744/

有关python - 如何在 OSX 上的单独进程中读取网络摄像头?的更多相关文章

  1. ruby - 如何在 Ruby 中顺序创建 PI - 2

    出于纯粹的兴趣,我很好奇如何按顺序创建PI,而不是在过程结果之后生成数字,而是让数字在过程本身生成时显示。如果是这种情况,那么数字可以自行产生,我可以对以前看到的数字实现垃圾收集,从而创建一个无限系列。结果只是在Pi系列之后每秒生成一个数字。这是我通过互联网筛选的结果:这是流行的计算机友好算法,类机器算法:defarccot(x,unity)xpow=unity/xn=1sign=1sum=0loopdoterm=xpow/nbreakifterm==0sum+=sign*(xpow/n)xpow/=x*xn+=2sign=-signendsumenddefcalc_pi(digits

  2. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  3. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

    我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

  4. ruby - 如何在 buildr 项目中使用 Ruby 代码? - 2

    如何在buildr项目中使用Ruby?我在很多不同的项目中使用过Ruby、JRuby、Java和Clojure。我目前正在使用我的标准Ruby开发一个模拟应用程序,我想尝试使用Clojure后端(我确实喜欢功能代码)以及JRubygui和测试套件。我还可以看到在未来的不同项目中使用Scala作为后端。我想我要为我的项目尝试一下buildr(http://buildr.apache.org/),但我注意到buildr似乎没有设置为在项目中使用JRuby代码本身!这看起来有点傻,因为该工具旨在统一通用的JVM语言并且是在ruby中构建的。除了将输出的jar包含在一个独特的、仅限ruby​​

  5. ruby - 什么是填充的 Base64 编码字符串以及如何在 ruby​​ 中生成它们? - 2

    我正在使用的第三方API的文档状态:"[O]urAPIonlyacceptspaddedBase64encodedstrings."什么是“填充的Base64编码字符串”以及如何在Ruby中生成它们。下面的代码是我第一次尝试创建转换为Base64的JSON格式数据。xa=Base64.encode64(a.to_json) 最佳答案 他们说的padding其实就是Base64本身的一部分。它是末尾的“=”和“==”。Base64将3个字节的数据包编码为4个编码字符。所以如果你的输入数据有长度n和n%3=1=>"=="末尾用于填充n%

  6. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  7. ruby - 在 jRuby 中使用 'fork' 生成进程的替代方案? - 2

    在MRIRuby中我可以这样做:deftransferinternal_server=self.init_serverpid=forkdointernal_server.runend#Maketheserverprocessrunindependently.Process.detach(pid)internal_client=self.init_client#Dootherstuffwithconnectingtointernal_server...internal_client.post('somedata')ensure#KillserverProcess.kill('KILL',

  8. ruby - 如何在续集中重新加载表模式? - 2

    鉴于我有以下迁移:Sequel.migrationdoupdoalter_table:usersdoadd_column:is_admin,:default=>falseend#SequelrunsaDESCRIBEtablestatement,whenthemodelisloaded.#Atthispoint,itdoesnotknowthatusershaveais_adminflag.#Soitfails.@user=User.find(:email=>"admin@fancy-startup.example")@user.is_admin=true@user.save!ende

  9. Ruby 写入和读取对象到文件 - 2

    好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信

  10. ruby - 通过 ruby​​ 进程共享变量 - 2

    我正在编写一个gem,我必须在其中fork两个启动两个webrick服务器的进程。我想通过基类的类方法启动这个服务器,因为应该只有这两个服务器在运行,而不是多个。在运行时,我想调用这两个服务器上的一些方法来更改变量。我的问题是,我无法通过基类的类方法访问fork的实例变量。此外,我不能在我的基类中使用线程,因为在幕后我正在使用另一个不是线程安全的库。所以我必须将每个服务器派生到它自己的进程。我用类变量试过了,比如@@server。但是当我试图通过基类访问这个变量时,它是nil。我读到在Ruby中不可能在分支之间共享类变量,对吗?那么,还有其他解决办法吗?我考虑过使用单例,但我不确定这是

随机推荐