今天天气雨后天晴,秋风微凉,写一写ffplay的数据结构。各位看官如果觉得过于啰嗦,请点击右上方x按钮。
MyAVPacketList是对ffmpeg中AVPacket进行了封装,同时里面的serial被用作识别pkt是否为当前播放序列,如果不是则会丢弃。
typedef struct MyAVPacketList {
AVPacket *pkt; //demux后的数据包
int serial; //播放序列
} MyAVPacketList;
PacketQueue是用来存储MyAVPacketList的一个结构体,在函数packet_queue_put_private()会将外部传入的包存储进PacketQueuet中。AVFifo 是一个按字节存储数据的结构体,以前版本的ffplay此处使用的是“MyAVPacketList *first_pkt, *last_pkt” 队列指针进行数据存储。⾳频、视频、字幕流都有⾃⼰独⽴的PacketQueue。
typedef struct PacketQueue{
AVFifo *pkt_list; //数据存储缓存区域
int nb_packets; //队列内一共有多少元素
int size; //队列内所有元素的大小
int64_t duration; //队列里的元素一共可以播放多长时间
int abort_request; //视频播放是否退出
int serial; //播放序列来自于MyAVPacketList的播放序列
SDL_mutex *mutex; //线程锁,保证多线程时数据正确
SDL_cond *cond; //用于读写线程互相通知
}PacketQueue;
struct AVFifo {
uint8_t *buffer; //字节流Buffer
size_t elem_size, nb_elems;
size_t offset_r, offset_w;
// distinguishes the ambiguous situation offset_r == offset_w
int is_empty;
unsigned int flags;
size_t auto_grow_limit;
};
兵马未动粮草先行,先使用packet_queue_init()把packet初始化一下。
static int packet_queue_init(PacketQueue *q)
{
memset(q, 0, sizeof(PacketQueue));
q->pkt_list = av_fifo_alloc2(1, sizeof(MyAVPacketList), AV_FIFO_FLAG_AUTO_GROW); //申请pkt_list的空间
if (!q->pkt_list)
return AVERROR(ENOMEM);
q->mutex = SDL_CreateMutex(); //申请mutex的空间
if (!q->mutex) {
av_log(NULL, AV_LOG_FATAL, "SDL_CreateMutex(): %s\n", SDL_GetError());
return AVERROR(ENOMEM);
}
q->cond = SDL_CreateCond(); //申请cond的空间
if (!q->cond) {
av_log(NULL, AV_LOG_FATAL, "SDL_CreateCond(): %s\n", SDL_GetError());
return AVERROR(ENOMEM);
}
q->abort_request = 1;
return 0;
}
大军出征前的宣誓,启用PacketQueue, 初始化的时候 abort_request 被置为了1 在启动时置为0,只有为0的时候才可以正常播放,为1时无法播放。同时会将serial 进行加 1 重置播放序列。
static void packet_queue_start(PacketQueue *q)
{
SDL_LockMutex(q->mutex);
q->abort_request = 0;
q->serial++;
SDL_UnlockMutex(q->mutex);
}
粮草存储,packet_queue_put通过调用packet_queue_put_private将一个数据包封装成MyAVPacketList并存储进PacketQueue中。
static int packet_queue_put(PacketQueue *q, AVPacket *pkt)
{
AVPacket *pkt1;
int ret;
pkt1 = av_packet_alloc();
if (!pkt1) {
av_packet_unref(pkt);
return -1;
}
av_packet_move_ref(pkt1, pkt);
SDL_LockMutex(q->mutex);
ret = packet_queue_put_private(q, pkt1);
SDL_UnlockMutex(q->mutex);
if (ret < 0)
av_packet_free(&pkt1);
return ret;
}
packet_queue_put_private****做以下事情来存储数据
1.计算serial。serial标记了这个节点内的数据是何时的。⼀般情况下新增节点与上⼀个节点的serial是⼀
样的,但出现seek操作后队列中会加⼊⼀个flush_pkt,从而serial会加1,后续节点的serial就会比之前大一,以此来区别不同的播放序列。
2.通过av_fifo_write进行节点⼊队列操作。
3.队列属性操作。更新队列中节点的数⽬、占⽤字节数(含AVPacket.data的⼤⼩)及其时⻓。可以通过限制size、duration、nb_packets的最大值,来控制PacketQueue队列的大小。
static int packet_queue_put_private(PacketQueue *q, AVPacket *pkt)
{
MyAVPacketList pkt1;
int ret;
if (q->abort_request)
return -1;
pkt1.pkt = pkt;
pkt1.serial = q->serial;
ret = av_fifo_write(q->pkt_list, &pkt1, 1);
if (ret < 0)
return ret;
q->nb_packets++;
q->size += pkt1.pkt->size + sizeof(pkt1);
q->duration += pkt1.pkt->duration;
/* XXX: should duplicate packet data in DV case */
SDL_CondSignal(q->cond);
return 0;
}
av_packet_move_ref()会将src完整拷贝给dst,拷贝完成之后会通过get_packet_defaults()函数将src的数据清除,因为dst和src有各自的地址空间,所以src被清空不会影响到dst。
void av_packet_move_ref(AVPacket *dst, AVPacket *src)
{
*dst = *src;
get_packet_defaults(src);
}
边走边补充粮草,吃饱了才能干活呀。packet_queue_get目标就是获取AvPacket,主要是av_fifo_read()读取pkt_list中存储的帧,读成功后将 nb_packet 进行减一,注意size要减去的长度不仅仅是pkt1中数据的大小,还有pkt1自己的大小。后续只需要把取出来的帧av_packet_move_ref()拷贝到要返回的空间地址里就行了。
static int packet_queue_get(PacketQueue *q, AVPacket *pkt, int block, int *serial)
{
MyAVPacketList pkt1;
int ret;
SDL_LockMutex(q->mutex);
for (;;) {
if (q->abort_request) {
ret = -1;
break;
}
if (av_fifo_read(q->pkt_list, &pkt1, 1) >= 0) { // 表示队列中有数据,成功读取
q->nb_packets--;
q->size -= pkt1.pkt->size + sizeof(pkt1);
q->duration -= pkt1.pkt->duration;
av_packet_move_ref(pkt, pkt1.pkt); //将pkt返回出去
if (serial) //如果需要输出serial,把serial传出
*serial = pkt1.serial;
av_packet_free(&pkt1.pkt);
ret = 1;
break;
} else if (!block) {
//队列中无数据,且为⾮阻塞调⽤
ret = 0;
break;
} else {
//此处不break。等待满足条件变量,重复上述步骤取数据
SDL_CondWait(q->cond, q->mutex);
}
}
SDL_UnlockMutex(q->mutex);
return ret;
}
放置空包进PacketQueue 2022/10/27号 github ffplay.c中 packet_queue_put_nullpacket 函数是需要传入一个包,再将这个包存储进PackeQueue队列中。看调用此函数的四个地方,三个是会将eof的值置为1表示播放结束,另外一个是关于mp3的图片显示调用,此处修改后就和名字对应不上了,所以对ffmpeg作者修改这个函数,本人还是存在一定疑问的,下面会贴出之前的此函数。
static int packet_queue_put_nullpacket(PacketQueue *q, AVPacket *pkt, int stream_index)
{
pkt->stream_index = stream_index;
return packet_queue_put(q, pkt);
}
原先的packet_queue_put_nullpacket 是真的会插入一个空的pkt
static int packet_queue_put_nullpacket(PacketQueue *q, int stream_index)
{
AVPacket pkt1, *pkt = &pkt1;
av_init_packet(pkt);
pkt->data = NULL;
pkt->size = 0;
pkt->stream_index = stream_index;
return packet_queue_put(q, pkt);
}
打完仗准备跑路了,先清理一下物资,packet_queue_flush中先看看队列里还有木有包,有就将包全部取出释放了,释放完成后再将PacketQueue队列各参数初始化。主要用在:
1.退出播放时,清空PacketQueue
2.seek操作后,需清空PacketQueue之前缓存的节点数据,以便插⼊新节点数据
static void packet_queue_flush(PacketQueue *q)
{
MyAVPacketList pkt1;
SDL_LockMutex(q->mutex);
while (av_fifo_read(q->pkt_list, &pkt1, 1) >= 0)
av_packet_free(&pkt1.pkt);
q->nb_packets = 0;
q->size = 0;
q->duration = 0;
q->serial++;
SDL_UnlockMutex(q->mutex);
}
packet_queue_destroy()会销毁PacketQueue内部资源并清理mutex和cond,防止内存泄露。
static void packet_queue_destroy(PacketQueue *q)
{
packet_queue_flush(q);
av_fifo_freep2(&q->pkt_list);
SDL_DestroyMutex(q->mutex);
SDL_DestroyCond(q->cond);
}
1.PacketQueue的内存管理:
PacketQueue会维护MyAVPackeList的内,在put时(packet_queue_put_private函数内调用av_malloc)malloc,在get时(packet_queue_get函数内调用av_packet_free)free。
AVPacket内存分为两块空间:
1.AVPacket结构体的内存,这部分内存会和MyAVPacketList共存亡的。
2.AVPacket字段指向的内存,这部分需要通过 av_packet_unref 函数释放。
2.serial的变化:
每放入一个flush_pkt,serial就会加1.
3.PacketQueue设计思路:
我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和
我有一个将某些事件写入队列的Rails3应用。现在我想在服务器上创建一个服务,每x秒轮询一次队列,并按计划执行其他任务。除了创建ruby脚本并通过cron作业运行它之外,还有其他稳定的替代方案吗? 最佳答案 尽管启动基于Rails的持久任务是一种选择,但您可能希望查看更有序的系统,例如delayed_job或Starling管理您的工作量。我建议不要在cron中运行某些东西,因为启动整个Rails堆栈的开销可能很大。每隔几秒运行一次它是不切实际的,因为Rails上的启动时间通常为5-15秒,具体取决于您的硬件。不过,每天这样做几
我正在尝试为现有队列编写消费者。RabbbitMQ在一个单独的实例中运行,名为“org-queue”的队列已经创建并绑定(bind)到一个交换器。org-queue是一个持久队列,它还有一些额外的属性。现在我需要从这个队列接收消息。我使用下面的代码来获取队列的实例conn=Bunny.newconn.startch=conn.create_channelq=ch.queue("org-queue")它抛出一个错误,指出不同的耐用属性。默认情况下,Bunny似乎使用durable=false。所以我添加了durabletrue作为参数。现在它说明了其他参数之间的区别。我是否需要指定所有参
我知道我们可以做到:sidekiq_optionsqueue:"Foo"但在这种情况下,Worker只分配给一个队列:“Foo”。我需要在特定队列中分配作业(而不是worker)。使用Resque很容易:Resque.enqueue_to(queue_name,my_job)另外,为了并发问题,我需要限制每个队列的Worker数量为1。我该怎么做? 最佳答案 您可能会使用https://github.com/brainopia/sidekiq-limit_fetch然后:Sidekiq::Client.push({'class'=>
题目描述小张买了 n 件白色的衣服,他觉得所有衣服都是一种颜色太单调,希望对这些衣服进行染色,每次染色时,他会将某种颜色的所有衣服寄去染色厂,第 i 件衣服的邮费为 ai 元,染色厂会按照小张的要求将其中一部分衣服染成同一种任意的颜色,之后将衣服寄给小张,请问小张要将 n 件衣服染成不同颜色的最小代价是多少?输入描述第一行为一个整数 n ,表示衣服的数量。第二行包括 n 个整数a1,a2...an 表示第 i 件衣服的邮费为 ai 元。(1≤n≤10^5,1≤ai≤10^9 )输出描述输出一个整数表示小张所要花费的最小代价。输入输出样例输入551321输出25 思考🤔:题意:意思是
我目前有一个Rails3.0项目,使用Ruby1.9.2和Resque。我的应用程序有多个工作类和多个队列,它们是动态创建的(在运行时)。此外,有多个worker已启动,可以自由地在任何队列上工作,因为在启动时没有任何现有队列,并且无法预测它们:$COUNT=3QUEUE=*rakeresque:workers根据project的id创建队列:@queue="project_#{project.id}".to_sym对于给定的队列,他们的作业必须按顺序处理,一次处理一个。我的问题是,通过拥有多个工作人员,可以并行处理多个作业。有没有办法设置每个队列的最大worker数(为1)?有没有办
是否可以使用Amazon简单排队服务创建优先级队列?最初我找不到关于这个主题的任何内容,这就是我创建两个队列的原因。一个普通队列和一个优先队列。我正在根据我定义的规则将消息排入此队列,但在出列消息时会出现困惑。如何对队列进行长时间轮询,使我的队列组合表现得像一个优先级队列? 最佳答案 我认为您通过创建两个队列走在正确的轨道上-一个普通队列和一个优先级队列。在这种情况下,您不一定需要长时间轮询。由于优先队列中的消息优先于普通队列中的消息,您可以采用如下方法:轮询优先级队列,直到没有更多消息为止。轮询普通队列并在普通队列中的每条消息后重
我正在尝试使用AMQP、Websockets和Ruby构建一个简单的聊天应用程序。我知道这可能不是理解AMQP的最佳用例,但我想了解我哪里出错了。以下是我的amqp-server代码require'rubygems'require'amqp'require'mongo'require'em-websocket'require'json'classMessageParser#messageformat=>"room:harry_potter,nickname:siddharth,room:members"defself.parse(message)parsed_message=JSON.
我一直在尝试使用sidekiq-limit_fetch来限制每个队列的工作人员数量gem,而Sidekiq似乎在日志中“看到”了强加的限制,但是当我观察工作人员时,这些限制被忽略了。这是日志中Sidekiq看到限制的部分:2013-04-02T05:47:19Z748TID-11ilcwDEBUG:{:queues=>["recommendvariations","recommendvariations","recommendvariations","recommendphenotypes","recommendphenotypes","recommendphenotypes","pr
我有一个场景,我需要极快地分发和处理作业。我将在队列中快速填充大约45个作业,我可以同时处理大约20个(5台机器,每台机器4个内核)。每项工作花费的时间长短不一,而且垃圾收集使事情变得复杂,因此我需要能够让消费者离线以进行垃圾收集。目前,我的一切都与pop一起工作(每个消费者每5毫秒弹出一次)。这似乎是不可取的,因为它转换为每秒600个pop请求到rabbitmq。如果有一个类似于订阅的pop命令,但只针对一条消息,我会很高兴。(进程会阻塞,等待来自rabbitMQ连接的输入,通过类似于Kernel.select的东西)我试图欺骗AMQPgem做这样的事情,但它不起作用:我似乎无法取消