github源码:https://github.com/jeffbass/imagezmq
imageZMQ是一组 Python 类,它们使用 PyZMQ 消息传递将 OpenCV 图像从一台计算机传输到另一台计算机。例如,这是 Mac 计算机上的一个屏幕,显示来自 8 个 Raspberry Pi 摄像头的同步视频流:
使用imageZMQ,在每个 Raspberry Pi 上使用 11 行 Python 并在 Mac 上使用 8 行 Python 就可以做到这一点。
首先,在 Mac(或其他显示计算机)上运行此代码
1 # run this program on the Mac to display image streams from multiple RPis
2 import cv2
3 import imagezmq
4 image_hub = imagezmq.ImageHub()
5 while True: # show streamed images until Ctrl-C
6 rpi_name, image = image_hub.recv_image()
7 cv2.imshow(rpi_name, image) # 1 window for each RPi
8 cv2.waitKey(1)
9 image_hub.send_reply(b'OK')
然后,在每个 Raspberry Pi 上,运行:
1 # run this program on each RPi to send a labelled image stream
2 import socket
3 import time
4 from imutils.video import VideoStream
5 import imagezmq
6
7 sender = imagezmq.ImageSender(connect_to='tcp://jeff-macbook:5555')
8
9 rpi_name = socket.gethostname() # send RPi hostname with each image
10 picam = VideoStream(usePiCamera=True).start()
11 time.sleep(2.0) # allow camera sensor to warm up
12 while True: # send images as stream until Ctrl-C
13 image = picam.read()
14 sender.send_image(rpi_name, image)
以上为github中提到的一些用法,我们现目前使用的是opencv,所以进行以下代码的修改
新建一个server.py文件,用来发送视频流
import socket
import time
import cv2
import imagezmq
import traceback
import simplejpeg
image_width = 1920
image_height = 1080
rtsp_latency = 10
uri = "rtsp://admin:123456@192.168.3.64:554/Streaming/Channels/1"
gst_str = ("rtspsrc location={} latency={} ! rtph264depay ! avdec_h264 ! videorate ! videoconvert ! appsink sync=false").format(uri, rtsp_latency)
print(f'use gstream {gst_str}')
cap = cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
sender = imagezmq.ImageSender(connect_to='tcp://0.0.0.0:8000', REQ_REP=False)
rpi_name = socket.gethostname() # 获取主机名
time.sleep(2.0)
jpeg_quality = 10 #调整图片压缩质量,95%
time_stamp = time.time()
image_count = 0
image_size = 0
while(True):
try:
ref, frame=capture.read(0)
time.sleep(1/60)
image = cv2.resize(frame,(1280,720))
curtime = time.time()
msg = rpi_name+'*'+str(curtime)
# 通过simplejpeg函数将图片编码为jpeg格式,提高传输效率
jpg_buffer = simplejpeg.encode_jpeg(image, quality=jpeg_quality,
colorspace='BGR')
image_size += len(jpg_buffer)
# 打印1s内发送的帧数,并计算1s内发送的图像的大小,显示kb,只显示在一行
if curtime - time_stamp > 1:
print(f'1s内发送的帧数:{image_count}, 图片大小:{image_size/1024}kb', end='\r')
time_stamp = curtime
image_size = 0
image_count = 0
sender.send_jpg(msg, jpg_buffer)
image_count += 1
cv2.imshow(rpi_name, image)
cv2.waitKey(1)
except:
print(traceback.print_exc())
break
上面代码可以查看1s中发送的数据量,可以通过改变jpeg_quality来改变压缩的视频质量,同时也可以通过cv2.resize来改变画面的大小,同样也可以进行视频质量的压缩,局域网的话不需要考虑带宽的影响可以尽量将质量放高一点,考虑到后面需要使用frp进行穿透,阿里云带宽的限制,这边进行了相应的质量压缩
下面添加一个client.py用来拉取视频流
import cv2
import imagezmq
import traceback
import time
import simplejpeg
# 这边填写server端的ip地址,局域网地址,如果使用frp穿透,也可以直接改成服务器地址
socket_ip = "*.*.*.*"
# 接收发送端数据,输入发送端的ip地址
image_hub = imagezmq.ImageHub(open_port='tcp://'+ socket_ip +':6001',REQ_REP=False)
frame_count = 1
time1 = 0
while True:
try:
time1 = time.time() if frame_count == 1 else time1
name, image = image_hub.recv_jpg()
# 解码
image = simplejpeg.decode_jpeg(image, colorspace='BGR', fastdct=True, fastupsample=True)
# image = cv2.resize(image, (1920, 1080))
cv2.imshow('server', image)
cv2.waitKey(1)
time2 = time.time()
print(image.shape[:2], int(frame_count/(time2-time1)))
frame_count += 1
except:
print(traceback.format_exc())
break
上面代码实现了之后,局域网间的视频流传输就实现完成了
下面我们来实现通过frp的方式进行外网访问到内网的视频流
frp下载地址:https://github.com/fatedier/frp/releases
需要阿里云服务器上跑frps,在自己本机中跑frpc
下面就是简单的修改下frps.ini和frpc.ini
frps.ini
[common]
bind_addr = 0.0.0.0 #绑定本地地址,这边直接写0.0.0.0
bind_port = **** #frps服务的端口号,需要注意阿里云的安全组设置
vhost_http_port = 8001 #开放的http端口的端口号,后续使用域名进行http内网穿透时候,需要在编写的域名后面添加此端口号
max_pool_count = 50
frpc.ini
[common]
server_addr = 123.123.123.123 # 阿里云的ip地址
server_port = 9808 # 阿里云frps绑定的服务端口号
[web01]
type = http # 如果需要使用本地http穿透可以使用此type类型
local_ip = 192.168.3.106 # 本地需要内网穿透的ip地址,如果是本机就是127.0.0.1
local_port = 8001 # 本地需要内网穿透的http的端口号
custom_domains = a.b.com # 需要动态解析的域名
[tcp01]
type = tcp # imagezmq使用的是tcp的方式进行数据的收发,所以这边使用tcp模式
local_ip = 192.168.3.106 # 本地需要内网穿透的ip地址,如果是本机就是127.0.0.1
local_port = 6002 # 本地imagezmq server绑定的端口号
remote_port = 6002 # 需要绑定的阿里云转发的端口号,需要注意安全组
阿里云服务器上使用运行frps服务,开机自启什么的这边就不做过多介绍
./frps -c ./frps.ini
本地同样也需要启动frpc服务
./frpc -c ./frpc.ini
将上面client.py代码中的socket_ip修改成阿里云服务器的ip,同时端口号也改成配置中穿透后的端口,就可以正常查看视频了,阿里云服务器上可以使用nethogs的命令来查看带宽占用
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
在控制台中反复尝试之后,我想到了这种方法,可以按发生日期对类似activerecord的(Mongoid)对象进行分组。我不确定这是完成此任务的最佳方法,但它确实有效。有没有人有更好的建议,或者这是一个很好的方法?#eventsisanarrayofactiverecord-likeobjectsthatincludeatimeattributeevents.map{|event|#converteventsarrayintoanarrayofhasheswiththedayofthemonthandtheevent{:number=>event.time.day,:event=>ev
我正在编写一个包含C扩展的gem。通常当我写一个gem时,我会遵循TDD的过程,我会写一个失败的规范,然后处理代码直到它通过,等等......在“ext/mygem/mygem.c”中我的C扩展和在gemspec的“扩展”中配置的有效extconf.rb,如何运行我的规范并仍然加载我的C扩展?当我更改C代码时,我需要采取哪些步骤来重新编译代码?这可能是个愚蠢的问题,但是从我的gem的开发源代码树中输入“bundleinstall”不会构建任何native扩展。当我手动运行rubyext/mygem/extconf.rb时,我确实得到了一个Makefile(在整个项目的根目录中),然后当
这是一道面试题,我没有答对,但还是很好奇怎么解。你有N个人的大家庭,分别是1,2,3,...,N岁。你想给你的大家庭拍张照片。所有的家庭成员都排成一排。“我是家里的friend,建议家庭成员安排如下:”1岁的家庭成员坐在这一排的最左边。每两个坐在一起的家庭成员的年龄相差不得超过2岁。输入:整数N,1≤N≤55。输出:摄影师可以拍摄的照片数量。示例->输入:4,输出:4符合条件的数组:[1,2,3,4][1,2,4,3][1,3,2,4][1,3,4,2]另一个例子:输入:5输出:6符合条件的数组:[1,2,3,4,5][1,2,3,5,4][1,2,4,3,5][1,2,4,5,3][
我已经构建了一些serverspec代码来在多个主机上运行一组测试。问题是当任何测试失败时,测试会在当前主机停止。即使测试失败,我也希望它继续在所有主机上运行。Rakefile:namespace:specdotask:all=>hosts.map{|h|'spec:'+h.split('.')[0]}hosts.eachdo|host|begindesc"Runserverspecto#{host}"RSpec::Core::RakeTask.new(host)do|t|ENV['TARGET_HOST']=hostt.pattern="spec/cfengine3/*_spec.r
我们的git存储库中目前有一个Gemfile。但是,有一个gem我只在我的环境中本地使用(我的团队不使用它)。为了使用它,我必须将它添加到我们的Gemfile中,但每次我checkout到我们的master/dev主分支时,由于与跟踪的gemfile冲突,我必须删除它。我想要的是类似Gemfile.local的东西,它将继承从Gemfile导入的gems,但也允许在那里导入新的gems以供使用只有我的机器。此文件将在.gitignore中被忽略。这可能吗? 最佳答案 设置BUNDLE_GEMFILE环境变量:BUNDLE_GEMFI
这似乎非常适得其反,因为太多的gem会在window上破裂。我一直在处理很多mysql和ruby-mysqlgem问题(gem本身发生段错误,一个名为UnixSocket的类显然在Windows机器上不能正常工作,等等)。我只是在浪费时间吗?我应该转向不同的脚本语言吗? 最佳答案 我在Windows上使用Ruby的经验很少,但是当我开始使用Ruby时,我是在Windows上,我的总体印象是它不是Windows原生系统。因此,在主要使用Windows多年之后,开始使用Ruby促使我切换回原来的系统Unix,这次是Linux。Rub
这个问题在这里已经有了答案:关闭10年前。PossibleDuplicate:Pythonconditionalassignmentoperator对于这样一个简单的问题表示歉意,但是谷歌搜索||=并不是很有帮助;)Python中是否有与Ruby和Perl中的||=语句等效的语句?例如:foo="hey"foo||="what"#assignfooifit'sundefined#fooisstill"hey"bar||="yeah"#baris"yeah"另外,类似这样的东西的通用术语是什么?条件分配是我的第一个猜测,但Wikipediapage跟我想的不太一样。
什么是ruby的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht