草庐IT

记录监控摄像头的接入过程及web端播放

shame11 2023-03-28 原文

1.rtsp视频流网页播放概述

需求:当我们通过ONVIF协议,获取到了摄像头的rtsp流地址(长这样:rtsp://admin:123456789@192.168.9.16:554/cam/realmonitor?channel=1&subtype=1&unicast=true&proto=Onvif)后,通过vlc播放器,我们可以查看监控视频内容,可是,我们应该如何在网页上查看视频内容呢?因为现在的浏览器都不支持rtsp流(详见:https://blog.csdn.net/SY__CSDN/article/details/129255690?utm_medium=distribute.pc_relevant.none-task-blog-2defaultbaidujs_baidulandingword~default-0-129255690-blog-113454774.pc_relevant_landingrelevant&spm=1001.2101.3001.4242.1&utm_relevant_index=3),因此我所选用的解决方案便是推流 + 转码

(1)转码推流工具ffmpeg(安装教程详见:https://www.cnblogs.com/h2285409/p/16982120.html),安装好之后,便可使用命令 ffmpeg -re -rtsp_transport tcp -i rtsp://admin:123456789@192.168.9.16:554/cam/realmonitor?channel=1&subtype=1&unicast=true&proto=Onvif -c:v copy -c:a copy -f flv rtmp://127.0.0.1/live/16 将我们的rtsp视频流转码并推至流媒体服务器上,在这个命令中含有两个URL,前面的是我们的rtsp流地址,而后面的URL便是我们流媒体服务器的地址,以及一个-f参数,指定了我们视频流转码后的格式为flv

(2)流媒体服务器,主要调研了2款,一是整合了Rtmp模块的Nginx,二是SRS视频服务器,而我所选用的是SRS(官方文档:http://ossrs.net/lts/zh-cn/),在使用ffmpeg推流上SRS后,便可直接从SRS获得HTTP-FLV视频流地址(如本例:http://127.0.0.1/live/16.flv ),然后,前端通过flv.js组件库便可直接在页面上播放该视频流

SRS与ffmpeg参考:https://blog.csdn.net/diyangxia/article/details/120172920

ffmpeg进阶参考:https://segmentfault.com/a/1190000039782685

flv.js参考:http://www.kaotop.com/it/446261.html

2.rtsp推流转码相关代码实现

//ffmpeg安装路径
@Value("${ffmpegPath}")
private String ffmpegPathPrefix;

//srs视频服务器地址
@Value("${srsAddress}")
private String srsAddress;

//srs端口,默认为8080
@Value("${srsPort}")
private String srsPort;

//srs-http-api端口,默认为1985
@Value("${srsHttpApiPort}")
private String srsHttpApiPort;

@Resource
private MonitorMapper monitorMapper;

@Resource
private RedisTemplate redisTemplate;

@Resource
private RestTemplate restTemplate;

@Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor;

private ConcurrentHashMap<String, TranscodeModel> id2transcodeModelMap = new ConcurrentHashMap<>();

/**
* 进行推流转码
* @param id ipc的主键id
* @return 转码推流后的http-flv地址,前端可通过flv.js直接播放
*/
public String transcodeAndPushStream(String id) {
    Ipc ipc = monitorMapper.getIpcInfoById(id);
    try {
        //先给这个流加锁,防止其他用户请求该流信息
        while(!redisTemplate.opsForValue().setIfAbsent(id, 1, Duration.ofSeconds(60))) {
            Thread.sleep(200);
        }
        //避免重复对某一个流的推流工作
        if(!id2transcodeModelMap.containsKey(id)) {
            String command = String.format("%sffmpeg -re -rtsp_transport tcp -i %s -c:v copy -c:a copy -f flv %s",this.ffmpegPathPrefix,  ipc.getRtspUrl(), "rtmp://" + this.srsAddress + "/live/" + id);
            //通过命令行执行推流转码
            System.out.println("启动推流转码, 其命令为: " + command);
            Process process = Runtime.getRuntime().exec(command);
            //可选,开启异步线程,观察推流进程所打印的日志
            Future<Void> processOutputHandler = threadPoolTaskExecutor.submit(() -> {
                BufferedReader br = new BufferedReader(new InputStreamReader(process.getErrorStream()));
                String msg = null;
                try {
                    while ((msg = br.readLine()) != null) {
                        if (Thread.currentThread().isInterrupted()) {
                            System.out.println("关闭推流进程的日志输出线程:  " + Thread.currentThread().getName());
                            break;
                        }
                        if (msg.contains("fail") || msg.contains("miss")) {
                            System.err.println(ipc.getId() + " 在推流过程中发生故障或丢包: " + msg);
                        }
                    }
                } catch (IOException e) {
                    System.err.println(ipc.getId() + " 在推流转码过程中发生异常错误,原因: " + e.getMessage());
                } finally {
                    if (Thread.currentThread().isAlive()) {
                        Thread.currentThread().interrupt();
                    }
                }
                return null;
            });
            id2transcodeModelMap.put(id, new TranscodeModel(id, command, process, processOutputHandler));
        }
    } catch (Exception e) {
        this.closeTranscode(id);
        throw new RuntimeException("启动对 " + id + " 的推流转码失败,原因: " + e.getMessage());
    } finally {
        redisTemplate.delete(id);
    }
    //返回转码后的flv流地址
    return "http://" + this.srsAddress + ":" + this.srsPort + "/live/" + id + ".flv";
}

/**
* 关闭推流进程
*/
private void closeTranscode(String id) {
    TranscodeModel transcodeModel = null;
    if((transcodeModel = id2transcodeModelMap.get(id)) != null) {
        Future<Void> outputHandler = transcodeModel.getOutputHandler();
        //关闭输出线程
        if(outputHandler != null && !outputHandler.isDone()) {
            outputHandler.cancel(true);
        }
        //停止推流转码进程
        if (transcodeModel.getProcess() != null) {
            transcodeModel.getProcess().destroy();
        }
        id2transcodeModelMap.remove(id);
        System.out.println("关闭对 " + id + " 的推流转码");
    }
}

/**
* 客户端结束播放流后,srs可配置触发一个on_stop回调,通过该回调,我们就可以知道哪些流可能没人看了,然后结束对该流进行的推流转码工作
* @param data srs触发回调时所携带的参数
*/
public void stopPlay(CallbackOnStopPlay data) {
    String clientId = data.getClient_id();
    JSONObject srsClient = this.requestSrsClientById(clientId);
    if(!srsClient.isEmpty()) {
        String streamId = srsClient.getString("stream");
        if (!StringUtils.hasText(streamId)) {
            System.err.println("获取client " + clientId +" 的流失败, 未关联流");
            return;
        }
        //在请求这个流的信息之前,先给这个流加锁,防止其他用户预览该流
        try {
            while(!redisTemplate.opsForValue().setIfAbsent(data.getStream(), 1, Duration.ofSeconds(60))) {
                Thread.sleep(200);
            }
            JSONObject vidiconStream = this.requestSrsStreamById(streamId);
            if(!vidiconStream.isEmpty()) {
                Integer clients = vidiconStream.getInteger("clients");
                //当前观看该流的人数 <= 2时,说明没人看了可以停止推流,至于为什么是2,可以自己观察打印日志看看
                if(clients <= 2) {
                    this.closeTranscode(vidiconStream.getString("name"));
                }
            }
        } catch (Exception e) {
            System.err.println("关闭视频流 " + streamId + " 失败, 原因: " + e.getMessage());
        } finally {
            redisTemplate.delete(data.getStream());
        }
    }
}

/**
 * 根据clientId获取某个client信息
 */
private JSONObject requestSrsClientById(String clientId) {
    if(!StringUtils.hasText(clientId)) {
        return new JSONObject();
    }
    String url = "http://" + this.srsAddress + ":" + this.srsHttpApiPort + "/api/v1/clients/" + clientId;
    ResponseEntity<JSONObject> exchange = null;
    try {
        exchange = restTemplate.exchange(url, HttpMethod.GET, new HttpEntity<>(null, new HttpHeaders()), JSONObject.class);
    } catch (Exception e) {
        System.err.println("请求srs的client " + clientId + " 失败,原因: " + e.getMessage());
        return new JSONObject();
    }
    if (exchange == null || exchange.getBody() == null || exchange.getBody().getInteger("code") != 0) {
        System.err.println("请求srs中client " + clientId + " 失败");
        return new JSONObject();
    }
    System.out.println("请求到client " + clientId + " 的信息为: " + exchange.getBody());
    return exchange.getBody().getJSONObject("client");
}

/**
 * 根据流的id获取某个流
 */
private JSONObject requestSrsStreamById(String streamId) {
    if(!StringUtils.hasText(streamId)) {
        return new JSONObject();
    }
    String url = "http://" + this.srsAddress + ":" + this.srsHttpApiPort + "/api/v1/streams/" + streamId;
    ResponseEntity<JSONObject> exchange = null;
    try {
        exchange = restTemplate.exchange(url, HttpMethod.GET, new HttpEntity<>(null, new HttpHeaders()), JSONObject.class);
    } catch (Exception e) {
        System.err.println("请求srs中的流 " + streamId + " 失败,原因: " + e.getMessage());
        return new JSONObject();
    }
    if (exchange == null || exchange.getBody() == null || exchange.getBody().getInteger("code") != 0) {
        System.err.println("请求srs中的流 " + streamId + " 失败, 由于服务器重启或其他原因,该流已失效");
        return new JSONObject();
    }
    System.out.println("请求到流 " + streamId + " 的信息为: " + exchange.getBody());
    return exchange.getBody().getJSONObject("stream");
}

public class TranscodeModel {

    private String id;

    private String command;

    private Process process;
    //推流过程中的输出线程
    private Future<Void> outputHandler;
}

//客户端关闭流时触发的回调所传递的参数
public class CallbackOnStopPlay {
    private String server_id;

    private String action;

    private String client_id;

    private String ip;

    private String vhost;

    private String app;

    private String stream;

    private String param;
}

//ipc类
public class Ipc {
    //ipc的主键id
    private String id;
    //ipc的rtsp流地址
    private String rtspUrl;
}

对推流进程的关闭,可以选择定时任务轮询srs中流的信息,然后对那些没人看的流进行关闭,也可以选择配置srs客户端关闭流时的回调,来进行关闭,至于回调如何配置使用,可以详看官方文档中开放接口相关内容和这篇文章:https://blog.csdn.net/weixin_44341110/article/details/120829847

3.通过大华NVR来接入IPC

(1) 到大华官网(https://support.dahuatech.com/tools/sdkExploit),下载sdk,我用的是JAVA_Linux64这一版本的

(2) 下载完成后,使用IDEA打开该项目,并将libs目录下的INetSDK.jar包安装到本地仓库中,命令如下: mvn install:install-file -Dfile=/data/home/work/大华SDK/General_NetSDK_ChnEng_JAVA_Linux64_IS_V3.055.0000000.1.R.221027/libs/INetSDK.jar -DgroupId=com.dahua.netsdk -DartifactId=dahua-netsdk-jni -Dversion=1.0.0 -Dpackaging=jar -DgeneratePom=true

(3) 配置pom文件,重点是依赖项以及so文件打包


<!-- 依赖项 -->
<dependencies>
    <dependency>
        <groupId>net.java.dev.jna</groupId>
        <artifactId>jna</artifactId>
        <version>5.4.0</version>
    </dependency>
    <dependency>
        <groupId>com.dahua.netsdk</groupId>
        <artifactId>dahua-netsdk-jni</artifactId>
        <version>1.0.0</version>
    </dependency>
</dependencies>

<!-- linux环境下,打包libs目录下的so文件-->
<resources>
   <resource>
        <directory>libs</directory>
        <filtering>false</filtering>
        <includes>
            <include>**/*.so</include>
        </includes>
    </resource>
</resources>

(4) 执行mvn-install,打包到本地maven后,整体结构如下

(5) 然后在我们的项目中,引入我们所打包的大华sdk依赖,便可直接使用了,如下

有关记录监控摄像头的接入过程及web端播放的更多相关文章

  1. ruby - Sinatra:运行 rspec 测试时记录噪音 - 2

    Sinatra新手;我正在运行一些rspec测试,但在日志中收到了一堆不需要的噪音。如何消除日志中过多的噪音?我仔细检查了环境是否设置为:test,这意味着记录器级别应设置为WARN而不是DEBUG。spec_helper:require"./app"require"sinatra"require"rspec"require"rack/test"require"database_cleaner"require"factory_girl"set:environment,:testFactoryGirl.definition_file_paths=%w{./factories./test/

  2. ruby-on-rails - Rails 5 Active Record 记录无效错误 - 2

    我有两个Rails模型,即Invoice和Invoice_details。一个Invoice_details属于Invoice,一个Invoice有多个Invoice_details。我无法使用accepts_nested_attributes_forinInvoice通过Invoice模型保存Invoice_details。我收到以下错误:(0.2ms)BEGIN(0.2ms)ROLLBACKCompleted422UnprocessableEntityin25ms(ActiveRecord:4.0ms)ActiveRecord::RecordInvalid(Validationfa

  3. unity---接入Admob - 2

    目录1.AdmobSDK下载地址2.将下载好的unityPackagesdk导入到unity里​编辑 3.解析依赖到项目中

  4. Observability:从零开始创建 Java 微服务并监控它 (二) - 2

    这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/

  5. ruby-on-rails - 事件记录 : Select max of limit - 2

    我正在尝试将以下SQL查询转换为ActiveRecord,它正在融化我的大脑。deletefromtablewhereid有什么想法吗?我想做的是限制表中的行数。所以,我想删除少于最近10个条目的所有内容。编辑:通过结合以下几个答案找到了解决方案。Temperature.where('id这给我留下了最新的10个条目。 最佳答案 从您的SQL来看,您似乎想要从表中删除前10条记录。我相信到目前为止的大多数答案都会如此。这里有两个额外的选择:基于MurifoX的版本:Table.where(:id=>Table.order(:id).

  6. Ruby 守护进程导致 ActiveRecord 记录器 IOError - 2

    我目前正在用Ruby编写一个项目,它使用ActiveRecordgem进行数据库交互,我正在尝试使用ActiveRecord::Base.logger记录所有数据库事件具有以下代码的属性ActiveRecord::Base.logger=Logger.new(File.open('logs/database.log','a'))这适用于迁移等(出于某种原因似乎需要启用日志记录,因为它在禁用时会出现NilClass错误)但是当我尝试运行包含调用ActiveRecord对象的线程守护程序的项目时脚本失败并出现以下错误/System/Library/Frameworks/Ruby.frame

  7. ruby-on-rails - 在 Rails 中更高效地查找或创建多条记录 - 2

    我有一个应用需要发送用户事件邀请。当用户邀请friend(用户)参加事件时,如果尚不存在将用户连接到该事件的新记录,则会创建该记录。我的模型由用户、事件和events_user组成。classEventdefinvite(user_id,*args)user_id.eachdo|u|e=EventsUser.find_or_create_by_event_id_and_user_id(self.id,u)e.save!endendend用法Event.first.invite([1,2,3])我不认为以上是完成我的任务的最有效方法。我设想了一种方法,例如Model.find_or_cr

  8. ruby - 如何配置 Ruby Mechanize 代理以通过 Charles Web 代理工作? - 2

    我正在使用Ruby/Mechanize编写一个“自动填写表格”应用程序。它几乎可以工作。我可以使用精彩CharlesWeb代理以查看服务器和我的Firefox浏览器之间的交换。现在我想使用Charles查看服务器和我的应用程序之间的交换。Charles在端口8888上代理。假设服务器位于https://my.host.com。.一件不起作用的事情是:@agent||=Mechanize.newdo|agent|agent.set_proxy("my.host.com",8888)end这会导致Net::HTTP::Persistent::Error:...lib/net/http/pe

  9. ruby - 在模块/类之间共享全局记录器 - 2

    在许多ruby​​类之间共享记录器实例的最佳(正确)方法是什么?现在我只是将记录器创建为全局$logger=Logger.new变量,但我觉得有更好的方法可以在不使用全局变量的情况下执行此操作。如果我有以下内容:moduleFooclassAclassBclassC...classZend在所有类之间共享记录器实例的最佳方式是什么?我是以某种方式在Foo模块中声明/创建记录器还是只是使用全局$logger没问题? 最佳答案 在模块中添加常量:moduleFooLogger=Logger.newclassAclassBclassC..

  10. ruby - Sinatra 中的全局救援和日志记录异常 - 2

    如何在出现异常时指定全局救援,如果您将Sinatra用于API或应用程序,您将如何处理日志记录? 最佳答案 404可以在not_found方法的帮助下处理,例如:not_founddo'Sitedoesnotexist.'end500s可以通过调用带有block的错误方法来处理,例如:errordo"Applicationerror.Plstrylater."end错误的详细信息可以通过request.env中的sinatra.error访问,如下所示:errordo'Anerroroccured:'+request.env['si

随机推荐