1、前端是无法直接播放rstp推流来的视频,所以需要用ffmpeg进行转码。
2、ffmpeg只能推送TCP或者HTTP协议还不支持ws协议。
大致流程图。

代码

效果图。

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 工具类-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.3.3</version>
</dependency>
<!-- netty4.1.42-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
<!--ffmpeg流转换器-->
<dependency>
<groupId>org.bytedeco</groupId>
<artifactId>ffmpeg-platform</artifactId>
<version>4.3.1-1.5.4</version>
</dependency>
netty部分代码
1、简简单单一个Server服务端。
package com.kang.rtsp.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.unix.PreferredDirectByteBufAllocator;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.cors.CorsConfig;
import io.netty.handler.codec.http.cors.CorsConfigBuilder;
import io.netty.handler.codec.http.cors.CorsHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* @program: netty-demo
* @description: netty实现websocket任务
* @author: mink
* @create: 2022-02-23 09:16
**/
@Component
@Slf4j
public class NettyServer {
@Autowired
private RtspHandler rtspHandler;
@Value("${netty.port}")
private Integer port;
public void start() {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(200);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
CorsConfig corsConfig = CorsConfigBuilder.forAnyOrigin().allowNullOrigin().allowCredentials().build();
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpResponseEncoder())
.addLast(new HttpRequestDecoder())
.addLast(new ChunkedWriteHandler())
/**
* http数据再传输中是分段的 HttpObjectAggregator ,就是可以将多个段聚合
* 这就就是为什么,当浏览器发送大量数据时,就会发出多次http请求
*/
.addLast(new HttpObjectAggregator(64*1024))
/**
* 1. 对应websocket ,它的数据是以 帧(frame) 形式传递
* 2. 可以看到WebSocketFrame 下面有六个子类
* 3. 浏览器请求时 ws://localhost:7000/hello 表示请求的uri
* 4. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 保持长连接
* 5. 是通过一个 状态码 101
*/
.addLast(new CorsHandler(corsConfig))
.addLast(rtspHandler);
}
})
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.option(ChannelOption.ALLOCATOR, PreferredDirectByteBufAllocator.DEFAULT)
.childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.SO_RCVBUF, 128 * 1024).childOption(ChannelOption.SO_SNDBUF, 1024 * 1024).childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 1024 / 2, 1024 * 1024));
ChannelFuture sync = bootstrap.bind(port).sync();
log.info("netty启动成功监听端口号{}",port);
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
2、重点是rtspHandler的编写。因为要同时实现http协议和ws协议所以需要自己进行判断。
1、读取信息先判断是不是请求,如果是请求先判断后缀、后面可以扩展通过后缀参数来获取RTSP的链接参数。
2、判断是什么请求,如果是http请求就直接通过http进行播放,如果是ws就需要进行协议升级。
3、WebSocketServerHandshakerFactory(getWebSocketLocation(req), “null”, true, 5 * 1024 * 1024);
第二个参数很坑原本我填的null但是前端报错,说传的null不为null后端不给出回应,但是我看着就是null,我试着改成字符串就好了。
4、将通道交给WebServer保存起来netty的路就只能陪你走就到这了,后面直接把流数据往通道里面发就完事了。
package com.kang.rtsp.netty;
import com.kang.rtsp.controller.TestController;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @program: netty-demo
* @description:
* @author: mink
* @create: 2022-02-23 09:32
**/
@Slf4j
@Component
@Sharable //不new,采用共享handler
public class RtspHandler extends SimpleChannelInboundHandler<Object> {
@Autowired
private TestController testController;
private WebSocketServerHandshaker handshaker;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest){
FullHttpRequest req = (FullHttpRequest) msg;
QueryStringDecoder decoder = new QueryStringDecoder(req.uri());
if (!"/live".equals(decoder.path())) {
System.err.println("uri有误");
sendError(ctx, HttpResponseStatus.BAD_REQUEST);
return;
}
if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))){
// http请求
log.info("HTTP请求");
sendFlvReqHeader(ctx);
// playServer.playForHttp(ctx,ClientType.HTTP);
}else {
// websocket握手,请求升级
// 参数分别是ws地址,子协议,是否扩展,最大frame长度
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req), "null", true, 5 * 1024 * 1024);
handshaker = factory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
// todo rstp->ppfmge->http->ws
testController.setWsClients(ctx);
}
}
} else if (msg instanceof WebSocketFrame) {
handleWebSocketRequest(ctx, (WebSocketFrame) msg);
}
System.err.println("发送消息走完");
}
private String getWebSocketLocation(FullHttpRequest request) {
String location = request.headers().get(HttpHeaderNames.HOST) + request.uri();
return "ws://" + location;
}
private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
System.err.println("请求地址有错误");
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
Unpooled.copiedBuffer("请求地址有误: " + status + "\r\n", CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
/**
* websocket处理
*
* @param ctx
* @param frame
*/
private void handleWebSocketRequest(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
// 关闭
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
// 握手PING/PONG
if (frame instanceof PingWebSocketFrame) {
ctx.write(new PongWebSocketFrame(frame.content().retain()));
return;
}
// 文本
if (frame instanceof TextWebSocketFrame) {
frame.retain(1);
ctx.channel().writeAndFlush(new TextWebSocketFrame(((TextWebSocketFrame) frame).text()));
return;
}
if (frame instanceof BinaryWebSocketFrame) {
return;
}
}
/**
* 发送req header,告知浏览器是flv格式
* @param ctx
*/
private void sendFlvReqHeader(ChannelHandlerContext ctx) {
HttpResponse rsp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
rsp.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE)
.set(HttpHeaderNames.CONTENT_TYPE, "video/x-flv").set(HttpHeaderNames.ACCEPT_RANGES, "bytes")
.set(HttpHeaderNames.PRAGMA, "no-cache").set(HttpHeaderNames.CACHE_CONTROL, "no-cache")
.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED).set(HttpHeaderNames.SERVER, "zhang");
ctx.writeAndFlush(rsp);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("web客户端被链接"+ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常发生 " + cause.getMessage());
ctx.close(); //关闭连接
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved 被调用" + ctx.channel().id().asLongText());
}
}
1、我们要优雅的启动netty服务器和提前初始化一下ffmpeg获取他的文件路径
package com.kang.rtsp.init;
import com.kang.rtsp.netty.NettyServer;
import lombok.extern.slf4j.Slf4j;
import org.bytedeco.javacpp.Loader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @program: rtsp-netty-demo
* @description: 开机自启
* @author: mink
* @create: 2022-02-25 11:11
**/
@Slf4j
@Component
public class InitServer implements CommandLineRunner {
@Autowired
private NettyServer nettyServer;
@Override
public void run(String... args) throws Exception {
log.info("启动netty服务器");
nettyServer.start();
}
/**
* 提前初始化,可避免推拉流启动耗时太久
*/
@PostConstruct
public void loadFFmpeg() {
log.info("正在初始化资源,请稍等...");
String ffmpeg = Loader.load(org.bytedeco.ffmpeg.ffmpeg.class);
String path = ffmpeg.substring(0,ffmpeg.indexOf(".exe"));
System.setProperty("ffmpeg",path);
log.info(System.getProperty("ffmpeg"));
log.info("初始化成功");
}
}
package com.kang.rtsp.ffmpeg;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
/**
* 转换视频流
* @author dell
*/
@Component
public class ConversionVideo implements ApplicationRunner {
public Process process;
public Integer pushVideoAsRTSP(String id, String fileName){
int flag = -1;
// ffmpeg位置,最好写在配置文件中
// String ffmpegPath = "D:\\tools\\ffmpeg-5.0-essentials_build\\bin\\";
String ffmpegPath = System.getProperty("ffmpeg");
try {
// 视频切换时,先销毁进程,全局变量Process process,方便进程销毁重启,即切换推流视频
if(process != null){
process.destroy();
System.out.println(">>>>>>>>>>推流视频切换<<<<<<<<<<");
}
// cmd命令拼接,注意命令中存在空格
String command = ffmpegPath;
// ffmpeg开头,-re代表按照帧率发送,在推流时必须有
// command += "ffmpeg ";
// 指定要推送的视频
command += " -i \"" + id + "\"";
// 指定推送服务器,-f:指定格式
command += " -q 0 -f mpegts -codec:v mpeg1video -s 800x600 " + fileName;
System.out.println("ffmpeg推流命令:" + command);
// 运行cmd命令,获取其进程
process = Runtime.getRuntime().exec(command);
}catch (Exception e){
e.printStackTrace();
}
return flag;
}
@Override
public void run(ApplicationArguments args) throws Exception {
ConversionVideo conversionVideo = new ConversionVideo();
conversionVideo.pushVideoAsRTSP("你的rtsp地址或者你的视频地址都可以搞", "http://127.0.0.1:8080/receive");
}
}
1、看看控制层代码,netty调用setwsClients,把通道信息保存在controller中。然后receive获取ffmpge传过来的二进制流
2、调用sendVideo给socket通道发送消息。
package com.kang.rtsp.controller;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @program: rtsp
* @description:
* @author: mink
* @create: 2022-04-01 11:14
**/
@Controller
public class TestController {
/**
* ws客户端
*/
private ConcurrentHashMap<String, ChannelHandlerContext> wsClients = new ConcurrentHashMap<>();
@RequestMapping("/receive")
@ResponseBody
public String receive(HttpServletRequest request) {
try {
ServletInputStream inputStream = request.getInputStream();
int len = -1;
while ((len =inputStream.available()) !=-1) {
byte[] data = new byte[len];
inputStream.read(data);
sendVideo(data);
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("over");
return "1";
}
public void sendVideo(byte[] data) {
// ws
for (Map.Entry<String, ChannelHandlerContext> entry : wsClients.entrySet()) {
try {
if (entry.getValue().channel().isWritable()) {
entry.getValue().writeAndFlush(new BinaryWebSocketFrame(Unpooled.copiedBuffer(data)));
} else {
wsClients.remove(entry.getKey());
}
} catch (java.lang.Exception e) {
wsClients.remove(entry.getKey());
e.printStackTrace();
}
}
}
public void setWsClients(ChannelHandlerContext ctx) {
wsClients.put(ctx.channel().id().asLongText(),ctx);
}
}
前端代码也给上
<!DOCTYPE html>
<html>
<head>
</head>
<body>
<canvas id="video"></canvas>
<script type="text/javascript" src="jsmpeg.min.js"></script>
<script type="text/javascript">
var canvas = document.getElementById('video');
var url = 'ws://127.0.0.1:8866/live';
var player = new JSMpeg.Player(url, {canvas: canvas});
</script>
</body>
</html>
jsmpeg.min.js地址,自己随便下下得了。
https://gitcode.net/mirrors/phoboslab/jsmpeg/-/raw/master/jsmpeg.min.js?inline=false
是的,我知道最好使用webmock,但我想知道如何在RSpec中模拟此方法:defmethod_to_testurl=URI.parseurireq=Net::HTTP::Post.newurl.pathres=Net::HTTP.start(url.host,url.port)do|http|http.requestreq,foo:1endresend这是RSpec:let(:uri){'http://example.com'}specify'HTTPcall'dohttp=mock:httpNet::HTTP.stub!(:start).and_yieldhttphttp.shou
我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden
我目前正在使用以下方法获取页面的源代码:Net::HTTP.get(URI.parse(page.url))我还想获取HTTP状态,而无需发出第二个请求。有没有办法用另一种方法做到这一点?我一直在查看文档,但似乎找不到我要找的东西。 最佳答案 在我看来,除非您需要一些真正的低级访问或控制,否则最好使用Ruby的内置Open::URI模块:require'open-uri'io=open('http://www.example.org/')#=>#body=io.read[0,50]#=>"["200","OK"]io.base_ur
华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o
C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.
1.错误信息:Errorresponsefromdaemon:Gethttps://registry-1.docker.io/v2/:net/http:requestcanceledwhilewaitingforconnection(Client.Timeoutexceededwhileawaitingheaders)或者:Errorresponsefromdaemon:Gethttps://registry-1.docker.io/v2/:net/http:TLShandshaketimeout2.报错原因:docker使用的镜像网址默认为国外,下载容易超时,需要修改成国内镜像地址(首先阿里
MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO
2022/8/4更新支持加入水印水印必须包含透明图像,并且水印图像大小要等于原图像的大小pythonconvert_image_to_video.py-f30-mwatermark.pngim_dirout.mkv2022/6/21更新让命令行参数更加易用新的命令行使用方法pythonconvert_image_to_video.py-f30im_dirout.mkvFFMPEG命令行转换一组JPG图像到视频时,是将这组图像视为MJPG流。我需要转换一组PNG图像到视频,FFMPEG就不认了。pyav内置了ffmpeg库,不需要系统带有ffmpeg工具因此我使用ffmpeg的python包装p
遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg
Rails中有没有一种方法可以提取与路由关联的HTTP动词?例如,给定这样的路线:将“users”匹配到:“users#show”,通过:[:get,:post]我能实现这样的目标吗?users_path.respond_to?(:get)(显然#respond_to不是正确的方法)我最接近的是通过执行以下操作,但它似乎并不令人满意。Rails.application.routes.routes.named_routes["users"].constraints[:request_method]#=>/^GET$/对于上下文,我有一个设置cookie然后执行redirect_to:ba