参考: ffplay packet queue分析
ffplay 用 PacketQueue 来保存解封装后的数据,即AVPacket。 定义MyAVPacketList表示队列中的元素,这里命名为MyAVPacketNode可能更合理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 typedef struct MyAVPacketList { AVPacket *pkt; int serial; } MyAVPacketList;typedef struct PacketQueue { AVFifo *pkt_list; int nb_packets; int size; int64_t duration; int abort_request; int serial; SDL_mutex *mutex; SDL_cond *cond; } PacketQueue;
结构体的第一个成员是AVFifo指针类型的。FFmpeg 定义了AVFifo结构体和操作这个结构体的一系列函数,实现了队列的数据结构。PacketQueue 是封装了AVFifo 实例,来管理MyAVPacketList,进而管理解封装后的AVPacket的。
AVFifo 1 2 3 4 5 6 7 8 9 10 11 struct AVFifo { uint8_t *buffer; size_t elem_size, nb_elems; size_t offset_r, offset_w; int is_empty; unsigned int flags; size_t auto_grow_limit; };
操作
1 2 3 4 5 6 7 8 AVFifo *av_fifo_alloc2 (size_t elems, size_t elem_size, unsigned int flags) ;int av_fifo_write (AVFifo *f, const void *buf, size_t nb_elems) ;int av_fifo_read (AVFifo *f, void *buf, size_t nb_elems) ;void av_fifo_freep2 (AVFifo **f) ;
PacketQueue 操作函数 PacketQueue
操作提供以下方法:
packet_queue_init 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static int packet_queue_init(PacketQueue *q) { memset(q, 0 , sizeof (PacketQueue)); q->pkt_list = av_fifo_alloc2(1 , sizeof (MyAVPacketList), AV_FIFO_FLAG_AUTO_GROW ); if (!q->pkt_list) return AVERROR (ENOMEM); q->mutex = SDL_CreateMutex(); if (!q->mutex) { av_log(NULL , AV_LOG_FATAL , "SDL_CreateMutex(): %s\n" , SDL_GetError()); return AVERROR (ENOMEM); } q->cond = SDL_CreateCond(); if (!q->cond) { av_log(NULL , AV_LOG_FATAL , "SDL_CreateCond(): %s\n" , SDL_GetError()); return AVERROR (ENOMEM); } q->abort_request = 1 ; return 0 ; }
内部调用av_fifo_alloc2 创建AVFifo 队列实例
创建mutex用于队列安全访问
创建cond 用于队列等待和唤醒控制
初始设置abort_request = 1
packet_queue_destroy 1 2 3 4 5 6 7 static void packet_queue_destroy (PacketQueue *q) { packet_queue_flush (q); av_fifo_freep2 (&q->pkt_list); SDL_DestroyMutex (q->mutex); SDL_DestroyCond (q->cond); }
调用packet_queue_flush清空队列中的元素
销毁AVFifo 队列实例
销毁mutex
销毁cond
packet_queue_start 1 2 3 4 5 6 7 static void packet_queue_start(PacketQueue *q) { SDL_LockMutex (q-> mutex); q -> abort_request = 0 ; q -> serial++; SDL_UnlockMutex (q-> mutex); }
只是加锁,将abort_request设置为0,并且将serial加1,然后解锁。
packet_queue_abort 1 2 3 4 5 6 7 8 9 10 static void packet_queue_abort(PacketQueue *q) { SDL_LockMutex (q-> mutex); q -> abort_request = 1 ; SDL_CondSignal (q-> cond); SDL_UnlockMutex (q-> mutex); }
加锁,将abort_request设置为1,标记为队列被终止。
同时唤醒等待条件变量的线程,使其正常退出, 解锁。
packet_queue_get 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 static int packet_queue_get (PacketQueue *q, AVPacket *pkt, int block, int *serial) { MyAVPacketList pkt1; int ret; SDL_LockMutex(q->mutex); for (;;) { if (q->abort_request) { ret = -1 ; break ; } if (av_fifo_read(q->pkt_list, &pkt1, 1 ) >= 0 ) { q->nb_packets--; q->size -= pkt1.pkt->size + sizeof (pkt1); q->duration -= pkt1.pkt->duration; av_packet_move_ref(pkt, pkt1.pkt); if (serial) *serial = pkt1.serial; av_packet_free(&pkt1.pkt); ret = 1 ; break ; } else if (!block) { ret = 0 ; break ; } else { SDL_CondWait(q->cond, q->mutex); } } SDL_UnlockMutex(q->mutex); return ret; }
从队列中获取一个元素
加锁
获取成功,队列元素减少,更新队列信息
获取失败,要求不阻塞,立马返回
获取失败,要求阻塞,调用wait进入等待状态
解锁
packet_queue_put 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 static int packet_queue_put_private(PacketQueue *q, AVPacket *pkt) { MyAVPacketList pkt1; int ret; if (q-> abort_request) return -1 ; pkt1.pkt = pkt; pkt1 .serial = q-> serial; ret = av_fifo_write(q-> pkt_list, &pkt1, 1 ); if (ret < 0 ) return ret; q -> nb_packets++; q ->size += pkt1.pkt-> size + sizeof(pkt1); q ->duration += pkt1.pkt-> duration; SDL_CondSignal (q-> cond); return 0 ; } static int packet_queue_put(PacketQueue *q, AVPacket *pkt) { AVPacket *pkt1; int ret; pkt1 = av_packet_alloc(); if (!pkt1) { av_packet_unref(pkt); return -1 ; } av_packet_move_ref(pkt1, pkt); SDL_LockMutex (q-> mutex); ret = packet_queue_put_private(q, pkt1); SDL_UnlockMutex (q-> mutex); if (ret < 0 ) av_packet_free(&pkt1); return ret; }
将AVPacket封装为MyAVPacketList,保存在内部的队列中
队列长度加1,同时更新时长,内存占用等信息
packet_queue_put内部调用packet_queue_put_private来完成相关的操作
packet_queue_put_nullpacket 1 2 3 4 5 6 7 8 static int packet_queue_put_nullpacket (PacketQueue *q, AVPacket *pkt, int stream_index) { pkt->stream_index = stream_index; return packet_queue_put (q, pkt) ; }
给队列添加一个内容为空的pkt
设置pkt对应的stream_index,追加到队列中
packet_queue_flush 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static void packet_queue_flush(PacketQueue *q) { MyAVPacketList pkt1; SDL_LockMutex (q-> mutex); while (av_fifo_read(q-> pkt_list, &pkt1, 1 ) >= 0 ) av_packet_free(&pkt1.pkt); q -> nb_packets = 0 ; q -> size = 0 ; q -> duration = 0 ; q -> serial++; SDL_UnlockMutex (q-> mutex); }
清空队列,释放节点的内存,更新队列信息。
而对于serial的操作不是设置为0,而是在原来的基础上增加了1,为什么呢?
总结:
ffplay 用PacketQueue来保存解封装后的AVPacket
PacketQueue对元素的操作,依赖于内部基于ffmpeg AVFifo的队列实例
PacketQueue通过mutex来保证线层安全
PacketQueue通过条件变量来控制写入和读取的顺序