草庐IT

ffplay学习之PacketQueue队列(一)

dyer gan 2023-09-13 原文

今天天气雨后天晴,秋风微凉,写一写ffplay的数据结构。各位看官如果觉得过于啰嗦,请点击右上方x按钮。

一、封装自己的数据结构,存自己想存的东西

MyAVPacketList是对ffmpeg中AVPacket进行了封装,同时里面的serial被用作识别pkt是否为当前播放序列,如果不是则会丢弃。

typedef struct MyAVPacketList {
        AVPacket *pkt;   //demux后的数据包
        int serial;      //播放序列
} MyAVPacketList;

PacketQueue是用来存储MyAVPacketList的一个结构体,在函数packet_queue_put_private()会将外部传入的包存储进PacketQueuet中。AVFifo 是一个按字节存储数据的结构体,以前版本的ffplay此处使用的是“MyAVPacketList *first_pkt, *last_pkt” 队列指针进行数据存储。⾳频、视频、字幕流都有⾃⼰独⽴的PacketQueue。

typedef struct PacketQueue{
	    AVFifo *pkt_list;  //数据存储缓存区域
	    int nb_packets;   //队列内一共有多少元素
	    int size;         //队列内所有元素的大小
	    int64_t duration; //队列里的元素一共可以播放多长时间
	    int abort_request; //视频播放是否退出
	    int serial;        //播放序列来自于MyAVPacketList的播放序列
	    SDL_mutex *mutex;  //线程锁,保证多线程时数据正确
	    SDL_cond *cond;    //用于读写线程互相通知
}PacketQueue;
struct AVFifo {
    uint8_t *buffer;  //字节流Buffer

    size_t elem_size, nb_elems;
    size_t offset_r, offset_w;
    // distinguishes the ambiguous situation offset_r == offset_w
    int    is_empty;

    unsigned int flags;
    size_t       auto_grow_limit;
};

二、对自己的数据结构,做自己想做的事情

兵马未动粮草先行,先使用packet_queue_init()把packet初始化一下。

static int packet_queue_init(PacketQueue *q)
{
    memset(q, 0, sizeof(PacketQueue));
    q->pkt_list = av_fifo_alloc2(1, sizeof(MyAVPacketList), AV_FIFO_FLAG_AUTO_GROW); //申请pkt_list的空间
    if (!q->pkt_list)
        return AVERROR(ENOMEM);
    q->mutex = SDL_CreateMutex(); //申请mutex的空间
    if (!q->mutex) {
        av_log(NULL, AV_LOG_FATAL, "SDL_CreateMutex(): %s\n", SDL_GetError());
        return AVERROR(ENOMEM);
    }
    q->cond = SDL_CreateCond(); //申请cond的空间
    if (!q->cond) {
        av_log(NULL, AV_LOG_FATAL, "SDL_CreateCond(): %s\n", SDL_GetError());
        return AVERROR(ENOMEM);
    }
    q->abort_request = 1;
    return 0;
}

大军出征前的宣誓,启用PacketQueue, 初始化的时候 abort_request 被置为了1 在启动时置为0,只有为0的时候才可以正常播放,为1时无法播放。同时会将serial 进行加 1 重置播放序列。

static void packet_queue_start(PacketQueue *q)
{
    SDL_LockMutex(q->mutex);
    q->abort_request = 0;
    q->serial++;
    SDL_UnlockMutex(q->mutex);
}

粮草存储,packet_queue_put通过调用packet_queue_put_private将一个数据包封装成MyAVPacketList并存储进PacketQueue中。

static int packet_queue_put(PacketQueue *q, AVPacket *pkt)
{
    AVPacket *pkt1;
    int ret;

    pkt1 = av_packet_alloc();
    if (!pkt1) {
        av_packet_unref(pkt);
        return -1;
    }
    av_packet_move_ref(pkt1, pkt);

    SDL_LockMutex(q->mutex);
    ret = packet_queue_put_private(q, pkt1);
    SDL_UnlockMutex(q->mutex);

    if (ret < 0)
        av_packet_free(&pkt1);

    return ret;
}

packet_queue_put_private****做以下事情来存储数据
1.计算serial。serial标记了这个节点内的数据是何时的。⼀般情况下新增节点与上⼀个节点的serial是⼀
样的,但出现seek操作后队列中会加⼊⼀个flush_pkt,从而serial会加1,后续节点的serial就会比之前大一,以此来区别不同的播放序列。
2.通过av_fifo_write进行节点⼊队列操作。
3.队列属性操作。更新队列中节点的数⽬、占⽤字节数(含AVPacket.data的⼤⼩)及其时⻓。可以通过限制size、duration、nb_packets的最大值,来控制PacketQueue队列的大小。

static int packet_queue_put_private(PacketQueue *q, AVPacket *pkt)
{
    MyAVPacketList pkt1;
    int ret;

    if (q->abort_request)
       return -1;

    pkt1.pkt = pkt;
    pkt1.serial = q->serial;

    ret = av_fifo_write(q->pkt_list, &pkt1, 1);
    if (ret < 0)
        return ret;
    q->nb_packets++;
    q->size += pkt1.pkt->size + sizeof(pkt1);
    q->duration += pkt1.pkt->duration;
    /* XXX: should duplicate packet data in DV case */
    SDL_CondSignal(q->cond);
    return 0;
}

av_packet_move_ref()会将src完整拷贝给dst,拷贝完成之后会通过get_packet_defaults()函数将src的数据清除,因为dst和src有各自的地址空间,所以src被清空不会影响到dst。

void av_packet_move_ref(AVPacket *dst, AVPacket *src)
{
    *dst = *src;
    get_packet_defaults(src);
}

边走边补充粮草,吃饱了才能干活呀。packet_queue_get目标就是获取AvPacket,主要是av_fifo_read()读取pkt_list中存储的帧,读成功后将 nb_packet 进行减一,注意size要减去的长度不仅仅是pkt1中数据的大小,还有pkt1自己的大小。后续只需要把取出来的帧av_packet_move_ref()拷贝到要返回的空间地址里就行了。

static int packet_queue_get(PacketQueue *q, AVPacket *pkt, int block, int *serial)
{
    MyAVPacketList pkt1;
    int ret;

    SDL_LockMutex(q->mutex);
    for (;;) {
        if (q->abort_request) {
            ret = -1;
            break;
        }
        
        if (av_fifo_read(q->pkt_list, &pkt1, 1) >= 0) { // 表示队列中有数据,成功读取
            q->nb_packets--;
            q->size -= pkt1.pkt->size + sizeof(pkt1);
            q->duration -= pkt1.pkt->duration;
            av_packet_move_ref(pkt, pkt1.pkt); //将pkt返回出去
            if (serial) //如果需要输出serial,把serial传出
                *serial = pkt1.serial;
            av_packet_free(&pkt1.pkt);
            ret = 1;
            break;
        } else if (!block) {
            //队列中无数据,且为⾮阻塞调⽤
            ret = 0;
            break;
        } else {
            //此处不break。等待满足条件变量,重复上述步骤取数据        
            SDL_CondWait(q->cond, q->mutex);
        }
    }
    SDL_UnlockMutex(q->mutex);
    return ret;
}

放置空包进PacketQueue 2022/10/27号 github ffplay.c中 packet_queue_put_nullpacket 函数是需要传入一个包,再将这个包存储进PackeQueue队列中。看调用此函数的四个地方,三个是会将eof的值置为1表示播放结束,另外一个是关于mp3的图片显示调用,此处修改后就和名字对应不上了,所以对ffmpeg作者修改这个函数,本人还是存在一定疑问的,下面会贴出之前的此函数。

static int packet_queue_put_nullpacket(PacketQueue *q, AVPacket *pkt, int stream_index)
{
    pkt->stream_index = stream_index;
    return packet_queue_put(q, pkt);
}

原先的packet_queue_put_nullpacket 是真的会插入一个空的pkt

static int packet_queue_put_nullpacket(PacketQueue *q, int stream_index)
{
	AVPacket pkt1, *pkt = &pkt1;
	av_init_packet(pkt);
	pkt->data = NULL;
	pkt->size = 0;
	pkt->stream_index = stream_index;
	return packet_queue_put(q, pkt);
}

打完仗准备跑路了,先清理一下物资,packet_queue_flush中先看看队列里还有木有包,有就将包全部取出释放了,释放完成后再将PacketQueue队列各参数初始化。主要用在:
1.退出播放时,清空PacketQueue
2.seek操作后,需清空PacketQueue之前缓存的节点数据,以便插⼊新节点数据

static void packet_queue_flush(PacketQueue *q)
{
    MyAVPacketList pkt1;

    SDL_LockMutex(q->mutex);
    while (av_fifo_read(q->pkt_list, &pkt1, 1) >= 0)
        av_packet_free(&pkt1.pkt);
    q->nb_packets = 0;
    q->size = 0;
    q->duration = 0;
    q->serial++;
    SDL_UnlockMutex(q->mutex);
}

packet_queue_destroy()会销毁PacketQueue内部资源并清理mutex和cond,防止内存泄露。

static void packet_queue_destroy(PacketQueue *q)
{
    packet_queue_flush(q);
    av_fifo_freep2(&q->pkt_list);
    SDL_DestroyMutex(q->mutex);
    SDL_DestroyCond(q->cond);
}

三、总结

1.PacketQueue的内存管理:

PacketQueue会维护MyAVPackeList的内,在put时(packet_queue_put_private函数内调用av_malloc)malloc,在get时(packet_queue_get函数内调用av_packet_free)free。
AVPacket内存分为两块空间:
1.AVPacket结构体的内存,这部分内存会和MyAVPacketList共存亡的。
2.AVPacket字段指向的内存,这部分需要通过 av_packet_unref 函数释放。
2.serial的变化:
每放入一个flush_pkt,serial就会加1.

3.PacketQueue设计思路:

  1. 设计⼀个多线程安全的队列,保存AVPacket,同时统计队列内已缓存的数据⼤⼩。(这个统计数据会
    ⽤来后续设置要缓存的数据量)
  2. 引⼊serial的概念,区别前后数据包是否连续,主要应⽤于seek操作。
  3. 设计了两类特殊的packet——flush_pkt和nullpkt(类似⽤于多线程编程的事件模型——往队列中放⼊
    flush事件、放⼊null事件)

有关ffplay学习之PacketQueue队列(一)的更多相关文章

  1. ruby - 分布式事务和队列,ruby,erlang,scala - 2

    我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和

  2. ruby-on-rails - Ruby 长时间运行的进程对队列事件使用react - 2

    我有一个将某些事件写入队列的Rails3应用。现在我想在服务器上创建一个服务,每x秒轮询一次队列,并按计划执行其他任务。除了创建ruby​​脚本并通过cron作业运行它之外,还有其他稳定的替代方案吗? 最佳答案 尽管启动基于Rails的持久任务是一种选择,但您可能希望查看更有序的系统,例如delayed_job或Starling管理您的工作量。我建议不要在cron中运行某些东西,因为启动整个Rails堆栈的开销可能很大。每隔几秒运行一次它是不切实际的,因为Rails上的启动时间通常为5-15秒,具体取决于您的硬件。不过,每天这样做几

  3. ruby - 在不提供其所有属性的情况下获取队列 - 2

    我正在尝试为现有队列编写消费者。RabbbitMQ在一个单独的实例中运行,名为“org-queue”的队列已经创建并绑定(bind)到一个交换器。org-queue是一个持久队列,它还有一些额外的属性。现在我需要从这个队列接收消息。我使用下面的代码来获取队列的实例conn=Bunny.newconn.startch=conn.create_channelq=ch.queue("org-queue")它抛出一个错误,指出不同的耐用属性。默认情况下,Bunny似乎使用durable=false。所以我添加了durabletrue作为参数。现在它说明了其他参数之间的区别。我是否需要指定所有参

  4. ruby - 如何在特定队列中推送作业并使用 sidekiq 限制工作人员数量? - 2

    我知道我们可以做到:sidekiq_optionsqueue:"Foo"但在这种情况下,Worker只分配给一个队列:“Foo”。我需要在特定队列中分配作业(而不是worker)。使用Resque很容易:Resque.enqueue_to(queue_name,my_job)另外,为了并发问题,我需要限制每个队列的Worker数量为1。我该怎么做? 最佳答案 您可能会使用https://github.com/brainopia/sidekiq-limit_fetch然后:Sidekiq::Client.push({'class'=>

  5. Python:每日一题之小张的衣服(优先队列、哈夫曼编码) - 2

    题目描述小张买了 n 件白色的衣服,他觉得所有衣服都是一种颜色太单调,希望对这些衣服进行染色,每次染色时,他会将某种颜色的所有衣服寄去染色厂,第 i 件衣服的邮费为 ai​ 元,染色厂会按照小张的要求将其中一部分衣服染成同一种任意的颜色,之后将衣服寄给小张,请问小张要将 n 件衣服染成不同颜色的最小代价是多少?输入描述第一行为一个整数 n ,表示衣服的数量。第二行包括 n 个整数a1​,a2​...an​ 表示第 i 件衣服的邮费为 ai​ 元。(1≤n≤10^5,1≤ai​≤10^9 )输出描述输出一个整数表示小张所要花费的最小代价。输入输出样例输入551321输出25 思考🤔:题意:意思是

  6. ruby - Resque:每个队列一个 worker - 2

    我目前有一个Rails3.0项目,使用Ruby1.9.2和Resque。我的应用程序有多个工作类和多个队列,它们是动态创建的(在运行时)。此外,有多个worker已启动,可以自由地在任何队列上工作,因为在启动时没有任何现有队列,并且无法预测它们:$COUNT=3QUEUE=*rakeresque:workers根据project的id创建队列:@queue="project_#{project.id}".to_sym对于给定的队列,他们的作业必须按顺序处理,一次处理一个。我的问题是,通过拥有多个工作人员,可以并行处理多个作业。有没有办法设置每个队列的最大worker数(为1)?有没有办

  7. ruby - Amazon SQS 优先级队列 - 2

    是否可以使用Amazon简单排队服务创建优先级队列?最初我找不到关于这个主题的任何内容,这就是我创建两个队列的原因。一个普通队列和一个优先队列。我正在根据我定义的规则将消息排入此队列,但在出列消息时会出现困惑。如何对队列进行长时间轮询,使我的队列组合表现得像一个优先级队列? 最佳答案 我认为您通过创建两个队列走在正确的轨道上-一个普通队列和一个优先级队列。在这种情况下,您不一定需要长时间轮询。由于优先队列中的消息优先于普通队列中的消息,您可以采用如下方法:轮询优先级队列,直到没有更多消息为止。轮询普通队列并在普通队列中的每条消息后重

  8. ruby - AMQP 动态创建订阅队列 - 2

    我正在尝试使用AMQP、Websockets和Ruby构建一个简单的聊天应用程序。我知道这可能不是理解AMQP的最佳用例,但我想了解我哪里出错了。以下是我的amqp-server代码require'rubygems'require'amqp'require'mongo'require'em-websocket'require'json'classMessageParser#messageformat=>"room:harry_potter,nickname:siddharth,room:members"defself.parse(message)parsed_message=JSON.

  9. ruby-on-rails - 限制 Sidekiq 中每个队列的工作人员数量 - 2

    我一直在尝试使用sidekiq-limit_fetch来限制每个队列的工作人员数量gem,而Sidekiq似乎在日志中“看到”了强加的限制,但是当我观察工作人员时,这些限制被忽略了。这是日志中Sidekiq看到限制的部分:2013-04-02T05:47:19Z748TID-11ilcwDEBUG:{:queues=>["recommendvariations","recommendvariations","recommendvariations","recommendphenotypes","recommendphenotypes","recommendphenotypes","pr

  10. ruby - 订阅一个队列,收到1条消息,然后取消订阅 - 2

    我有一个场景,我需要极快地分发和处理作业。我将在队列中快速填充大约45个作业,我可以同时处理大约20个(5台机器,每台机器4个内核)。每项工作花费的时间长短不一,而且垃圾收集使事情变得复杂,因此我需要能够让消费者离线以进行垃圾收集。目前,我的一切都与pop一起工作(每个消费者每5毫秒弹出一次)。这似乎是不可取的,因为它转换为每秒600个pop请求到rabbitmq。如果有一个类似于订阅的pop命令,但只针对一条消息,我会很高兴。(进程会阻塞,等待来自rabbitMQ连接的输入,通过类似于Kernel.select的东西)我试图欺骗AMQPgem做这样的事情,但它不起作用:我似乎无法取消

随机推荐