参考: ffplay packet queue分析
ffplay 用 PacketQueue 来保存解封装后的数据,即AVPacket。 定义MyAVPacketList表示队列中的元素,这里命名为MyAVPacketNode可能更合理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 typedef struct MyAVPacketList { AVPacket *pkt; int serial; } MyAVPacketList; typedef struct PacketQueue { AVFifo *pkt_list; int nb_packets; int size; int64_t duration; int abort_request; int serial; SDL_mutex *mutex; SDL_cond *cond; } PacketQueue;
结构体的第一个成员是AVFifo指针类型的。FFmpeg 定义了AVFifo结构体和操作这个结构体的一系列函数,实现了队列的数据结构。PacketQueue 是封装了AVFifo 实例,来管理MyAVPacketList,进而管理解封装后的AVPacket的。
AVFifo 1 2 3 4 5 6 7 8 9 10 11 struct AVFifo { uint8_t *buffer; size_t elem_size, nb_elems; size_t offset_r, offset_w; int is_empty; unsigned int flags; size_t auto_grow_limit; };
操作
1 2 3 4 5 6 7 8 AVFifo *av_fifo_alloc2(size_t elems, size_t elem_size, unsigned int flags); int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems); int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems); void av_fifo_freep2(AVFifo **f);
PacketQueue 操作函数 PacketQueue操作提供以下方法:
packet_queue_init 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static int packet_queue_init(PacketQueue *q) { memset(q, 0, sizeof(PacketQueue)); q->pkt_list = av_fifo_alloc2(1, sizeof(MyAVPacketList), AV_FIFO_FLAG_AUTO_GROW); if (!q->pkt_list) return AVERROR(ENOMEM); q->mutex = SDL_CreateMutex(); if (!q->mutex) { av_log(NULL, AV_LOG_FATAL, "SDL_CreateMutex(): %s\n", SDL_GetError()); return AVERROR(ENOMEM); } q->cond = SDL_CreateCond(); if (!q->cond) { av_log(NULL, AV_LOG_FATAL, "SDL_CreateCond(): %s\n", SDL_GetError()); return AVERROR(ENOMEM); } q->abort_request = 1; return 0; }
内部调用av_fifo_alloc2 创建AVFifo 队列实例
创建mutex用于队列安全访问
创建cond 用于队列等待和唤醒控制
初始设置abort_request = 1
packet_queue_destroy 1 2 3 4 5 6 7 static void packet_queue_destroy(PacketQueue *q) { packet_queue_flush(q); av_fifo_freep2(&q->pkt_list); SDL_DestroyMutex(q->mutex); SDL_DestroyCond(q->cond); }
调用packet_queue_flush清空队列中的元素
销毁AVFifo 队列实例
销毁mutex
销毁cond
packet_queue_start 1 2 3 4 5 6 7 static void packet_queue_start(PacketQueue *q) { SDL_LockMutex(q->mutex); q->abort_request = 0; q->serial++; SDL_UnlockMutex(q->mutex); }
只是加锁,将abort_request设置为0,并且将serial加1,然后解锁。
packet_queue_abort 1 2 3 4 5 6 7 8 9 10 static void packet_queue_abort(PacketQueue *q) { SDL_LockMutex(q->mutex); q->abort_request = 1; SDL_CondSignal(q->cond); SDL_UnlockMutex(q->mutex); }
加锁,将abort_request设置为1,标记为队列被终止。
同时唤醒等待条件变量的线程,使其正常退出, 解锁。
packet_queue_get 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 static int packet_queue_get (PacketQueue *q, AVPacket *pkt, int block, int *serial) { MyAVPacketList pkt1; int ret; SDL_LockMutex(q->mutex); for (;;) { if (q->abort_request) { ret = -1 ; break ; } if (av_fifo_read(q->pkt_list, &pkt1, 1 ) >= 0 ) { q->nb_packets--; q->size -= pkt1.pkt->size + sizeof (pkt1); q->duration -= pkt1.pkt->duration; av_packet_move_ref(pkt, pkt1.pkt); if (serial) *serial = pkt1.serial; av_packet_free(&pkt1.pkt); ret = 1 ; break ; } else if (!block) { ret = 0 ; break ; } else { SDL_CondWait(q->cond, q->mutex); } } SDL_UnlockMutex(q->mutex); return ret; }
从队列中获取一个元素
加锁
获取成功,队列元素减少,更新队列信息
获取失败,要求不阻塞,立马返回
获取失败,要求阻塞,调用wait进入等待状态
解锁
packet_queue_put 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 static int packet_queue_put_private(PacketQueue *q, AVPacket *pkt) { MyAVPacketList pkt1; int ret; //如果队列被终止,返回-1 if (q->abort_request) return -1; //给节点填充数据 pkt1.pkt = pkt; //设置pkt的序列号 pkt1.serial = q->serial; //将节点添加到队列 ret = av_fifo_write(q->pkt_list, &pkt1, 1); if (ret < 0) return ret; //更新队列大小 q->nb_packets++; //更新队列内存大小,添加节点size(pkt内存的大小 + 节点数据大小) q->size += pkt1.pkt->size + sizeof(pkt1); //更新队列的总时长 q->duration += pkt1.pkt->duration; /* XXX: should duplicate packet data in DV case */ SDL_CondSignal(q->cond); return 0; } static int packet_queue_put(PacketQueue *q, AVPacket *pkt) { AVPacket *pkt1; int ret; //创建pkt1 pkt1 = av_packet_alloc(); if (!pkt1) { //创建pkt1失败,取消引用pkt av_packet_unref(pkt); return -1; } //将pkt内容转移到pkt1中 av_packet_move_ref(pkt1, pkt); //加锁操加入队列 SDL_LockMutex(q->mutex); //调用packet_queue_put_private将pkt1放入队列中 ret = packet_queue_put_private(q, pkt1); SDL_UnlockMutex(q->mutex); if (ret < 0) //放入队列失败,需要取消引用pkt1 av_packet_free(&pkt1); return ret; }
将AVPacket封装为MyAVPacketList,保存在内部的队列中
队列长度加1,同时更新时长,内存占用等信息
packet_queue_put内部调用packet_queue_put_private来完成相关的操作
packet_queue_put_nullpacket 1 2 3 4 5 6 7 8 //播放完了,从文件中读出了空的pkt,空的pkt丢给解码器的时候,会冲洗解码器,将解码器剩余的额数据读出来 static int packet_queue_put_nullpacket(PacketQueue *q, AVPacket *pkt, int stream_index) { //设置pkt对应的stream_index pkt->stream_index = stream_index; //添加有一个内容为空的pkt到队列中 return packet_queue_put(q, pkt); }
给队列添加一个内容为空的pkt
设置pkt对应的stream_index,追加到队列中
packet_queue_flush 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static void packet_queue_flush(PacketQueue *q) { MyAVPacketList pkt1; //加锁 SDL_LockMutex(q->mutex); //从内部队列循环获取元素,存入pkt1中 while (av_fifo_read(q->pkt_list, &pkt1, 1) >= 0) //释放MyAVPacketList中的AVPacket的内存 av_packet_free(&pkt1.pkt); //更新队列信息 q->nb_packets = 0; q->size = 0; q->duration = 0; //serial加1 q->serial++; //解锁 SDL_UnlockMutex(q->mutex); }
清空队列,释放节点的内存,更新队列信息。
而对于serial的操作不是设置为0,而是在原来的基础上增加了1,为什么呢?
总结:
ffplay 用PacketQueue来保存解封装后的AVPacket
PacketQueue对元素的操作,依赖于内部基于ffmpeg AVFifo的队列实例
PacketQueue通过mutex来保证线层安全
PacketQueue通过条件变量来控制写入和读取的顺序