FFmpeg
fifo.c
Go to the documentation of this file.
1 /*
2  * FIFO pseudo-muxer
3  * Copyright (c) 2016 Jan Sebechlebsky
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public License
9  * as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <stdatomic.h>
23 
24 #include "libavutil/avassert.h"
25 #include "libavutil/opt.h"
26 #include "libavutil/time.h"
27 #include "libavutil/thread.h"
29 #include "avformat.h"
30 #include "internal.h"
31 #include "mux.h"
32 
33 #define FIFO_DEFAULT_QUEUE_SIZE 60
34 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 0
35 #define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds
36 
37 typedef struct FifoContext {
38  const AVClass *class;
40 
41  char *format;
43 
46 
48 
49  /* Return value of last write_trailer_call */
51 
52  /* Time to wait before next recovery attempt
53  * This can refer to the time in processed stream,
54  * or real time. */
56 
57  /* Maximal number of unsuccessful successive recovery attempts */
59 
60  /* Whether to attempt recovery from failure */
62 
63  /* If >0 stream time will be used when waiting
64  * for the recovery attempt instead of real time */
66 
67  /* If >0 recovery will be attempted regardless of error code
68  * (except AVERROR_EXIT, so exit request is never ignored) */
70 
71  /* Whether to drop packets in case the queue is full. */
73 
74  /* Whether to wait for keyframe when recovering
75  * from failure or queue overflow */
77 
80  /* Value > 0 signals queue overflow */
81  volatile uint8_t overflow_flag;
82 
84  int64_t last_sent_dts;
85  int64_t timeshift;
86 } FifoContext;
87 
88 typedef struct FifoThreadContext {
90 
91  /* Timestamp of last failure.
92  * This is either pts in case stream time is used,
93  * or microseconds as returned by av_getttime_relative() */
95 
96  /* Number of current recovery process
97  * Value > 0 means we are in recovery process */
99 
100  /* If > 0 all frames will be dropped until keyframe is received */
102 
103  /* Value > 0 means that the previous write_header call was successful
104  * so finalization by calling write_trailer and ff_io_close must be done
105  * before exiting / reinitialization of underlying muxer */
106  uint8_t header_written;
107 
110 
111 typedef enum FifoMessageType {
117 
118 typedef struct FifoMessage {
121 } FifoMessage;
122 
124 {
125  AVFormatContext *avf = ctx->avf;
126  FifoContext *fifo = avf->priv_data;
127  AVFormatContext *avf2 = fifo->avf;
128  AVDictionary *format_options = NULL;
129  int ret, i;
130 
131  ret = av_dict_copy(&format_options, fifo->format_options, 0);
132  if (ret < 0)
133  goto end;
134 
135  ret = ff_format_output_open(avf2, avf->url, &format_options);
136  if (ret < 0) {
137  av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->url,
138  av_err2str(ret));
139  goto end;
140  }
141 
142  for (i = 0;i < avf2->nb_streams; i++)
143  ffstream(avf2->streams[i])->cur_dts = 0;
144 
145  ret = avformat_write_header(avf2, &format_options);
146  if (!ret)
147  ctx->header_written = 1;
148 
149  // Check for options unrecognized by underlying muxer
150  if (format_options) {
151  const AVDictionaryEntry *entry = NULL;
152  while ((entry = av_dict_iterate(format_options, entry)))
153  av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
154  ret = AVERROR(EINVAL);
155  }
156 
157 end:
158  av_dict_free(&format_options);
159  return ret;
160 }
161 
163 {
164  AVFormatContext *avf = ctx->avf;
165  FifoContext *fifo = avf->priv_data;
166  AVFormatContext *avf2 = fifo->avf;
167 
168  return av_write_frame(avf2, NULL);
169 }
170 
171 static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts)
172 {
173  AVStream *st = avf->streams[pkt->stream_index];
174  int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q);
175  int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts);
176  *last_dts = dts;
177  return duration;
178 }
179 
181 {
182  AVFormatContext *avf = ctx->avf;
183  FifoContext *fifo = avf->priv_data;
184  AVFormatContext *avf2 = fifo->avf;
185  AVRational src_tb, dst_tb;
186  int ret, s_idx;
187  int64_t orig_pts, orig_dts, orig_duration;
188 
189  if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
190  atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed);
191 
192  if (ctx->drop_until_keyframe) {
193  if (pkt->flags & AV_PKT_FLAG_KEY) {
194  ctx->drop_until_keyframe = 0;
195  av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
196  } else {
197  av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
199  return 0;
200  }
201  }
202 
203  orig_pts = pkt->pts;
204  orig_dts = pkt->dts;
205  orig_duration = pkt->duration;
206  s_idx = pkt->stream_index;
207  src_tb = avf->streams[s_idx]->time_base;
208  dst_tb = avf2->streams[s_idx]->time_base;
209  av_packet_rescale_ts(pkt, src_tb, dst_tb);
210 
211  ret = av_write_frame(avf2, pkt);
212  if (ret >= 0) {
214  } else {
215  // avoid scaling twice
216  pkt->pts = orig_pts;
217  pkt->dts = orig_dts;
218  pkt->duration = orig_duration;
219  }
220  return ret;
221 }
222 
224 {
225  AVFormatContext *avf = ctx->avf;
226  FifoContext *fifo = avf->priv_data;
227  AVFormatContext *avf2 = fifo->avf;
228  int ret;
229 
230  if (!ctx->header_written)
231  return 0;
232 
233  ret = av_write_trailer(avf2);
234  ff_format_io_close(avf2, &avf2->pb);
235 
236  return ret;
237 }
238 
240 {
241  int ret = AVERROR(EINVAL);
242 
243  if (msg->type == FIFO_NOOP)
244  return 0;
245 
246  if (!ctx->header_written) {
248  if (ret < 0)
249  return ret;
250  }
251 
252  switch(msg->type) {
253  case FIFO_WRITE_HEADER:
254  av_assert0(ret >= 0);
255  return ret;
256  case FIFO_WRITE_PACKET:
257  return fifo_thread_write_packet(ctx, &msg->pkt);
258  case FIFO_FLUSH_OUTPUT:
260  }
261 
262  av_assert0(0);
263  return AVERROR(EINVAL);
264 }
265 
266 static int is_recoverable(const FifoContext *fifo, int err_no) {
267  if (!fifo->attempt_recovery)
268  return 0;
269 
270  if (fifo->recover_any_error)
271  return err_no != AVERROR_EXIT;
272 
273  switch (err_no) {
274  case AVERROR(EINVAL):
275  case AVERROR(ENOSYS):
276  case AVERROR_EOF:
277  case AVERROR_EXIT:
279  return 0;
280  default:
281  return 1;
282  }
283 }
284 
285 static void free_message(void *msg)
286 {
287  FifoMessage *fifo_msg = msg;
288 
289  if (fifo_msg->type == FIFO_WRITE_PACKET)
290  av_packet_unref(&fifo_msg->pkt);
291 }
292 
294  int err_no)
295 {
296  AVFormatContext *avf = ctx->avf;
297  FifoContext *fifo = avf->priv_data;
298  int ret;
299 
300  av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
301  av_err2str(err_no));
302 
303  if (fifo->recovery_wait_streamtime) {
304  if (pkt->pts == AV_NOPTS_VALUE)
305  av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
306  " timestamp, recovery will be attempted immediately");
307  ctx->last_recovery_ts = pkt->pts;
308  } else {
309  ctx->last_recovery_ts = av_gettime_relative();
310  }
311 
312  if (fifo->max_recovery_attempts &&
313  ctx->recovery_nr >= fifo->max_recovery_attempts) {
314  av_log(avf, AV_LOG_ERROR,
315  "Maximal number of %d recovery attempts reached.\n",
316  fifo->max_recovery_attempts);
317  ret = err_no;
318  } else {
319  ret = AVERROR(EAGAIN);
320  }
321 
322  return ret;
323 }
324 
326 {
327  AVFormatContext *avf = ctx->avf;
328  FifoContext *fifo = avf->priv_data;
329  AVPacket *pkt = &msg->pkt;
330  int64_t time_since_recovery;
331  int ret;
332 
333  if (!is_recoverable(fifo, err_no)) {
334  ret = err_no;
335  goto fail;
336  }
337 
338  if (ctx->header_written) {
340  ctx->header_written = 0;
341  }
342 
343  if (!ctx->recovery_nr) {
344  ctx->last_recovery_ts = fifo->recovery_wait_streamtime ?
345  AV_NOPTS_VALUE : 0;
346  } else {
347  if (fifo->recovery_wait_streamtime) {
348  if (ctx->last_recovery_ts == AV_NOPTS_VALUE) {
350  time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
351  tb, AV_TIME_BASE_Q);
352  } else {
353  /* Enforce recovery immediately */
354  time_since_recovery = fifo->recovery_wait_time;
355  }
356  } else {
357  time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
358  }
359 
360  if (time_since_recovery < fifo->recovery_wait_time)
361  return AVERROR(EAGAIN);
362  }
363 
364  ctx->recovery_nr++;
365 
366  if (fifo->max_recovery_attempts) {
367  av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n",
368  ctx->recovery_nr, fifo->max_recovery_attempts);
369  } else {
370  av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n",
371  ctx->recovery_nr);
372  }
373 
374  if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow)
375  ctx->drop_until_keyframe = 1;
376 
378  if (ret < 0) {
379  if (is_recoverable(fifo, ret)) {
381  } else {
382  goto fail;
383  }
384  } else {
385  av_log(avf, AV_LOG_INFO, "Recovery successful\n");
386  ctx->recovery_nr = 0;
387  }
388 
389  return 0;
390 
391 fail:
392  free_message(msg);
393  return ret;
394 }
395 
396 static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
397 {
398  AVFormatContext *avf = ctx->avf;
399  FifoContext *fifo = avf->priv_data;
400  int ret;
401 
402  do {
403  if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
404  int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
405  int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
406  if (time_to_wait)
407  av_usleep(FFMIN(10000, time_to_wait));
408  }
409 
410  ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
411  } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow);
412 
413  if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) {
414  if (msg->type == FIFO_WRITE_PACKET)
415  av_packet_unref(&msg->pkt);
416  ret = 0;
417  }
418 
419  return ret;
420 }
421 
422 static void *fifo_consumer_thread(void *data)
423 {
424  AVFormatContext *avf = data;
425  FifoContext *fifo = avf->priv_data;
426  AVThreadMessageQueue *queue = fifo->queue;
427  FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}};
428  int ret;
429 
430  FifoThreadContext fifo_thread_ctx;
431  memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
432  fifo_thread_ctx.avf = avf;
433  fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE;
434 
435  ff_thread_setname("fifo-consumer");
436 
437  while (1) {
438  uint8_t just_flushed = 0;
439 
440  if (!fifo_thread_ctx.recovery_nr)
441  ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
442 
443  if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
444  int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
445  if (rec_ret < 0) {
446  av_thread_message_queue_set_err_send(queue, rec_ret);
447  break;
448  }
449  }
450 
451  /* If the queue is full at the moment when fifo_write_packet
452  * attempts to insert new message (packet) to the queue,
453  * it sets the fifo->overflow_flag to 1 and drops packet.
454  * Here in consumer thread, the flag is checked and if it is
455  * set, the queue is flushed and flag cleared. */
457  if (fifo->overflow_flag) {
459  if (fifo->restart_with_keyframe)
460  fifo_thread_ctx.drop_until_keyframe = 1;
461  fifo->overflow_flag = 0;
462  just_flushed = 1;
463  }
465 
466  if (just_flushed)
467  av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
468 
469  if (fifo->timeshift)
470  while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift)
471  av_usleep(10000);
472 
473  ret = av_thread_message_queue_recv(queue, &msg, 0);
474  if (ret < 0) {
476  break;
477  }
478  }
479 
480  fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
481 
482  return NULL;
483 }
484 
485 static int fifo_mux_init(AVFormatContext *avf, const AVOutputFormat *oformat,
486  const char *filename)
487 {
488  FifoContext *fifo = avf->priv_data;
489  AVFormatContext *avf2;
490  int ret = 0, i;
491 
492  ret = avformat_alloc_output_context2(&avf2, oformat, NULL, filename);
493  if (ret < 0)
494  return ret;
495 
496  fifo->avf = avf2;
497 
499  avf2->max_delay = avf->max_delay;
500  ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
501  if (ret < 0)
502  return ret;
503  avf2->opaque = avf->opaque;
504  avf2->io_close = avf->io_close;
505  avf2->io_close2 = avf->io_close2;
506  avf2->io_open = avf->io_open;
507  avf2->flags = avf->flags;
508 
509  for (i = 0; i < avf->nb_streams; ++i) {
510  AVStream *st = ff_stream_clone(avf2, avf->streams[i]);
511  if (!st)
512  return AVERROR(ENOMEM);
513  }
514 
515  return 0;
516 }
517 
518 static int fifo_init(AVFormatContext *avf)
519 {
520  FifoContext *fifo = avf->priv_data;
521  const AVOutputFormat *oformat;
522  int ret = 0;
523 
524  if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
525  av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
526  " only when drop_pkts_on_overflow is also turned on\n");
527  return AVERROR(EINVAL);
528  }
529  atomic_init(&fifo->queue_duration, 0);
531 
532  oformat = av_guess_format(fifo->format, avf->url, NULL);
533  if (!oformat) {
535  return ret;
536  }
537 
538  ret = fifo_mux_init(avf, oformat, avf->url);
539  if (ret < 0)
540  return ret;
541 
542  ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
543  sizeof(FifoMessage));
544  if (ret < 0)
545  return ret;
546 
548 
550  if (ret < 0)
551  return AVERROR(ret);
553 
554  return 0;
555 }
556 
558 {
559  FifoContext * fifo = avf->priv_data;
560  int ret;
561 
563  if (ret) {
564  av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
566  ret = AVERROR(ret);
567  }
568 
569  return ret;
570 }
571 
573 {
574  FifoContext *fifo = avf->priv_data;
576  int ret;
577 
578  if (pkt) {
579  ret = av_packet_ref(&msg.pkt,pkt);
580  if (ret < 0)
581  return ret;
582  }
583 
584  ret = av_thread_message_queue_send(fifo->queue, &msg,
585  fifo->drop_pkts_on_overflow ?
587  if (ret == AVERROR(EAGAIN)) {
588  uint8_t overflow_set = 0;
589 
590  /* Queue is full, set fifo->overflow_flag to 1
591  * to let consumer thread know the queue should
592  * be flushed. */
594  if (!fifo->overflow_flag)
595  fifo->overflow_flag = overflow_set = 1;
597 
598  if (overflow_set)
599  av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
600  ret = 0;
601  goto fail;
602  } else if (ret < 0) {
603  goto fail;
604  }
605 
606  if (fifo->timeshift && pkt && pkt->dts != AV_NOPTS_VALUE)
607  atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed);
608 
609  return ret;
610 fail:
611  if (pkt)
612  av_packet_unref(&msg.pkt);
613  return ret;
614 }
615 
617 {
618  FifoContext *fifo= avf->priv_data;
619  int ret;
620 
622  if (fifo->timeshift) {
623  int64_t now = av_gettime_relative();
624  int64_t elapsed = 0;
625  FifoMessage msg = {FIFO_NOOP};
626  do {
627  int64_t delay = av_gettime_relative() - now;
628  if (delay < 0) { // Discontinuity?
629  delay = 10000;
630  now = av_gettime_relative();
631  } else {
632  now += delay;
633  }
634  atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed);
635  elapsed += delay;
636  if (elapsed > fifo->timeshift)
637  break;
638  av_usleep(10000);
640  } while (ret >= 0 || ret == AVERROR(EAGAIN));
641  atomic_store(&fifo->queue_duration, INT64_MAX);
642  }
643 
645  if (ret < 0) {
646  av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
648  return AVERROR(ret);
649  }
650 
651  ret = fifo->write_trailer_ret;
652  return ret;
653 }
654 
655 static void fifo_deinit(AVFormatContext *avf)
656 {
657  FifoContext *fifo = avf->priv_data;
658 
659  avformat_free_context(fifo->avf);
663 }
664 
665 #define OFFSET(x) offsetof(FifoContext, x)
666 static const AVOption options[] = {
667  {"fifo_format", "Target muxer", OFFSET(format),
669 
670  {"queue_size", "Size of fifo queue", OFFSET(queue_size),
672 
673  {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options),
675 
676  {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
677  AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
678 
679  {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
680  AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
681 
682  {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
683  AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
684 
685  {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
687 
688  {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
690 
691  {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
692  OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
693 
694  {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
695  AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
696 
697  {"timeshift", "Delay fifo output", OFFSET(timeshift),
698  AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
699 
700  {NULL},
701 };
702 
703 static const AVClass fifo_muxer_class = {
704  .class_name = "Fifo muxer",
705  .item_name = av_default_item_name,
706  .option = options,
707  .version = LIBAVUTIL_VERSION_INT,
708 };
709 
711  .name = "fifo",
712  .long_name = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
713  .priv_data_size = sizeof(FifoContext),
714  .init = fifo_init,
718  .deinit = fifo_deinit,
719  .priv_class = &fifo_muxer_class,
721 };
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
av_packet_unref
void av_packet_unref(AVPacket *pkt)
Wipe the packet.
Definition: avpacket.c:422
pthread_join
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
av_gettime_relative
int64_t av_gettime_relative(void)
Get the current time in microseconds since some unspecified starting point.
Definition: time.c:56
fifo_init
static int fifo_init(AVFormatContext *avf)
Definition: fifo.c:518
AV_LOG_WARNING
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:186
fifo_thread_flush_output
static int fifo_thread_flush_output(FifoThreadContext *ctx)
Definition: fifo.c:162
atomic_store
#define atomic_store(object, desired)
Definition: stdatomic.h:85
AVOutputFormat::name
const char * name
Definition: avformat.h:510
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
FifoContext::write_trailer_ret
int write_trailer_ret
Definition: fifo.c:50
opt.h
FifoContext::queue
AVThreadMessageQueue * queue
Definition: fifo.c:45
FIFO_WRITE_PACKET
@ FIFO_WRITE_PACKET
Definition: fifo.c:114
FifoContext::avf
AVFormatContext * avf
Definition: fifo.c:39
thread.h
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
fifo_deinit
static void fifo_deinit(AVFormatContext *avf)
Definition: fifo.c:655
FifoContext
Definition: fifo.c:38
AV_TIME_BASE_Q
#define AV_TIME_BASE_Q
Internal time base represented as fractional value.
Definition: avutil.h:260
fifo_write_packet
static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
Definition: fifo.c:572
AV_THREAD_MESSAGE_NONBLOCK
@ AV_THREAD_MESSAGE_NONBLOCK
Perform non-blocking operation.
Definition: threadmessage.h:31
AVFormatContext::streams
AVStream ** streams
A list of all streams in the file.
Definition: avformat.h:1284
deinit
static void deinit(AVFormatContext *s)
Definition: chromaprint.c:49
free_message
static void free_message(void *msg)
Definition: fifo.c:285
FifoContext::format_options
AVDictionary * format_options
Definition: fifo.c:42
AVOption
AVOption.
Definition: opt.h:251
FIFO_NOOP
@ FIFO_NOOP
Definition: fifo.c:112
data
const char data[16]
Definition: mxf.c:146
AV_OPT_TYPE_DURATION
@ AV_OPT_TYPE_DURATION
Definition: opt.h:239
AV_LOG_VERBOSE
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:196
FifoThreadContext::drop_until_keyframe
uint8_t drop_until_keyframe
Definition: fifo.c:101
AVPacket::duration
int64_t duration
Duration of this packet in AVStream->time_base units, 0 if unknown.
Definition: packet.h:392
AVDictionary
Definition: dict.c:32
FFMAX
#define FFMAX(a, b)
Definition: macros.h:47
FIFO_WRITE_HEADER
@ FIFO_WRITE_HEADER
Definition: fifo.c:113
AV_PKT_FLAG_KEY
#define AV_PKT_FLAG_KEY
The packet contains a keyframe.
Definition: packet.h:429
FifoContext::overflow_flag
volatile uint8_t overflow_flag
Definition: fifo.c:81
AVFormatContext::interrupt_callback
AVIOInterruptCB interrupt_callback
Custom interrupt callbacks for the I/O layer.
Definition: avformat.h:1483
FifoContext::last_sent_dts
int64_t last_sent_dts
Definition: fifo.c:84
ffstream
static av_always_inline FFStream * ffstream(AVStream *st)
Definition: internal.h:413
FifoMessage::type
FifoMessageType type
Definition: fifo.c:119
fail
#define fail()
Definition: checkasm.h:134
FifoMessage
Definition: fifo.c:118
FifoContext::queue_size
int queue_size
Definition: fifo.c:44
av_thread_message_queue_recv
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
Definition: threadmessage.c:174
is_recoverable
static int is_recoverable(const FifoContext *fifo, int err_no)
Definition: fifo.c:266
avassert.h
fifo_thread_process_recovery_failure
static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt, int err_no)
Definition: fifo.c:293
pkt
AVPacket * pkt
Definition: movenc.c:59
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:180
AVFormatContext::metadata
AVDictionary * metadata
Metadata that applies to the whole file.
Definition: avformat.h:1445
av_thread_message_queue_send
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
Definition: threadmessage.c:158
av_thread_message_flush
void av_thread_message_flush(AVThreadMessageQueue *mq)
Flush the message queue.
Definition: threadmessage.c:223
duration
int64_t duration
Definition: movenc.c:64
FifoContext::overflow_flag_lock
pthread_mutex_t overflow_flag_lock
Definition: fifo.c:78
AV_OPT_FLAG_ENCODING_PARAM
#define AV_OPT_FLAG_ENCODING_PARAM
a generic parameter which can be set by the user for muxing or encoding
Definition: opt.h:281
FifoContext::max_recovery_attempts
int max_recovery_attempts
Definition: fifo.c:58
AVFormatContext::flags
int flags
Flags modifying the (de)muxer behaviour.
Definition: avformat.h:1334
format
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample format(the sample packing is implied by the sample format) and sample rate. The lists are not just lists
AVDictionaryEntry::key
char * key
Definition: dict.h:90
init
int(* init)(AVBSFContext *ctx)
Definition: dts2pts_bsf.c:363
fifo_thread_write_packet
static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
Definition: fifo.c:180
options
static const AVOption options[]
Definition: fifo.c:666
OFFSET
#define OFFSET(x)
Definition: fifo.c:665
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:37
ctx
AVFormatContext * ctx
Definition: movenc.c:48
av_rescale_q
int64_t av_rescale_q(int64_t a, AVRational bq, AVRational cq)
Rescale a 64-bit integer by 2 rational numbers.
Definition: mathematics.c:142
FIFO_DEFAULT_QUEUE_SIZE
#define FIFO_DEFAULT_QUEUE_SIZE
Definition: fifo.c:33
AVFormatContext::opaque
void * opaque
User data.
Definition: avformat.h:1722
AVThreadMessageQueue
Definition: threadmessage.c:27
fifo_thread_write_trailer
static int fifo_thread_write_trailer(FifoThreadContext *ctx)
Definition: fifo.c:223
av_usleep
int av_usleep(unsigned usec)
Sleep for a period of time.
Definition: time.c:84
avformat_write_header
av_warn_unused_result int avformat_write_header(AVFormatContext *s, AVDictionary **options)
Allocate the stream private data and write the stream header to an output media file.
Definition: mux.c:450
pthread_create
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
AVFormatContext
Format I/O context.
Definition: avformat.h:1216
internal.h
FifoThreadContext
Definition: fifo.c:88
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:66
AVStream::time_base
AVRational time_base
This is the fundamental unit of time (in seconds) in terms of which frame timestamps are represented.
Definition: avformat.h:978
NULL
#define NULL
Definition: coverity.c:32
AVERROR_PATCHWELCOME
#define AVERROR_PATCHWELCOME
Not yet implemented in FFmpeg, patches welcome.
Definition: error.h:64
write_trailer
static int write_trailer(AVFormatContext *s1)
Definition: v4l2enc.c:100
AVRational
Rational number (pair of numerator and denominator).
Definition: rational.h:58
AV_OPT_TYPE_DICT
@ AV_OPT_TYPE_DICT
Definition: opt.h:232
av_default_item_name
const char * av_default_item_name(void *ptr)
Return the context name.
Definition: log.c:237
AVFormatContext::pb
AVIOContext * pb
I/O context.
Definition: avformat.h:1258
fifo_write_header
static int fifo_write_header(AVFormatContext *avf)
Definition: fifo.c:557
fifo_thread_dispatch_message
static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
Definition: fifo.c:239
time.h
av_write_frame
int av_write_frame(AVFormatContext *s, AVPacket *pkt)
Write a packet to an output media file.
Definition: mux.c:1192
av_packet_ref
int av_packet_ref(AVPacket *dst, const AVPacket *src)
Setup a new reference to the data described by a given packet.
Definition: avpacket.c:430
atomic_fetch_sub_explicit
#define atomic_fetch_sub_explicit(object, operand, order)
Definition: stdatomic.h:152
atomic_load_explicit
#define atomic_load_explicit(object, order)
Definition: stdatomic.h:96
FifoContext::writer_thread
pthread_t writer_thread
Definition: fifo.c:47
pthread_mutex_unlock
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:78
AVFormatContext::nb_streams
unsigned int nb_streams
Number of elements in AVFormatContext.streams.
Definition: avformat.h:1272
FifoContext::recover_any_error
int recover_any_error
Definition: fifo.c:69
FifoThreadContext::last_received_dts
int64_t last_received_dts
Definition: fifo.c:108
NULL_IF_CONFIG_SMALL
#define NULL_IF_CONFIG_SMALL(x)
Return NULL if CONFIG_SMALL is true, otherwise the argument without modification.
Definition: internal.h:115
fifo_consumer_thread
static void * fifo_consumer_thread(void *data)
Definition: fifo.c:422
threadmessage.h
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:121
FifoContext::drop_pkts_on_overflow
int drop_pkts_on_overflow
Definition: fifo.c:72
FifoThreadContext::last_recovery_ts
int64_t last_recovery_ts
Definition: fifo.c:94
AVFormatContext::url
char * url
input or output URL.
Definition: avformat.h:1299
atomic_fetch_add_explicit
#define atomic_fetch_add_explicit(object, operand, order)
Definition: stdatomic.h:149
AV_NOPTS_VALUE
#define AV_NOPTS_VALUE
Undefined timestamp value.
Definition: avutil.h:248
AVFMT_ALLOW_FLUSH
#define AVFMT_ALLOW_FLUSH
Format allows flushing.
Definition: avformat.h:490
AVFMT_NOFILE
#define AVFMT_NOFILE
Demuxer will use avio_open, no opened file should be provided by the caller.
Definition: avformat.h:470
ff_format_io_close
int ff_format_io_close(AVFormatContext *s, AVIOContext **pb)
Definition: avformat.c:845
ff_fifo_muxer
const AVOutputFormat ff_fifo_muxer
Definition: fifo.c:710
FifoContext::timeshift
int64_t timeshift
Definition: fifo.c:85
AVPacket::dts
int64_t dts
Decompression timestamp in AVStream->time_base units; the time at which the packet is decompressed.
Definition: packet.h:373
FifoMessageType
FifoMessageType
Definition: fifo.c:111
FifoContext::recovery_wait_streamtime
int recovery_wait_streamtime
Definition: fifo.c:65
AVPacket::flags
int flags
A combination of AV_PKT_FLAG values.
Definition: packet.h:380
av_dict_free
void av_dict_free(AVDictionary **pm)
Free all the memory allocated for an AVDictionary struct and all keys and values.
Definition: dict.c:223
av_packet_rescale_ts
void av_packet_rescale_ts(AVPacket *pkt, AVRational src_tb, AVRational dst_tb)
Convert valid timing fields (timestamps / durations) in a packet from one timebase to another.
Definition: avpacket.c:526
pthread_t
Definition: os2threads.h:44
FifoContext::queue_duration
atomic_int_least64_t queue_duration
Definition: fifo.c:83
AV_LOG_INFO
#define AV_LOG_INFO
Standard information.
Definition: log.h:191
av_thread_message_queue_alloc
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:42
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
av_write_trailer
int av_write_trailer(AVFormatContext *s)
Write the stream trailer to an output media file and free the file private data.
Definition: mux.c:1254
FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS
#define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS
Definition: fifo.c:34
FifoMessage::pkt
AVPacket pkt
Definition: fifo.c:120
AVOutputFormat
Definition: avformat.h:509
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:269
AVPacket::pts
int64_t pts
Presentation timestamp in AVStream->time_base units; the time at which the decompressed packet will b...
Definition: packet.h:367
AVERROR_MUXER_NOT_FOUND
#define AVERROR_MUXER_NOT_FOUND
Muxer not found.
Definition: error.h:62
av_thread_message_queue_set_err_send
void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, int err)
Set the sending error code.
Definition: threadmessage.c:190
FFMIN
#define FFMIN(a, b)
Definition: macros.h:49
AVFormatContext::max_delay
int max_delay
Definition: avformat.h:1328
tb
#define tb
Definition: regdef.h:68
write_packet
static int write_packet(Muxer *mux, OutputStream *ost, AVPacket *pkt)
Definition: ffmpeg_mux.c:61
FifoThreadContext::recovery_nr
int recovery_nr
Definition: fifo.c:98
AVFMT_TS_NEGATIVE
#define AVFMT_TS_NEGATIVE
Format allows muxing negative timestamps.
Definition: avformat.h:494
atomic_int_least64_t
intptr_t atomic_int_least64_t
Definition: stdatomic.h:68
ret
ret
Definition: filter_design.txt:187
AVStream
Stream structure.
Definition: avformat.h:948
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:71
FifoContext::restart_with_keyframe
int restart_with_keyframe
Definition: fifo.c:76
avformat.h
fifo_thread_write_header
static int fifo_thread_write_header(FifoThreadContext *ctx)
Definition: fifo.c:123
AV_OPT_TYPE_INT
@ AV_OPT_TYPE_INT
Definition: opt.h:225
FifoThreadContext::avf
AVFormatContext * avf
Definition: fifo.c:89
avformat_free_context
void avformat_free_context(AVFormatContext *s)
Free an AVFormatContext and all its streams.
Definition: avformat.c:95
fifo_muxer_class
static const AVClass fifo_muxer_class
Definition: fifo.c:703
AVFormatContext::io_close
void(* io_close)(struct AVFormatContext *s, AVIOContext *pb)
A callback for closing the streams opened with AVFormatContext.io_open().
Definition: avformat.h:1782
AVFormatContext::io_open
int(* io_open)(struct AVFormatContext *s, AVIOContext **pb, const char *url, int flags, AVDictionary **options)
A callback for opening new IO streams.
Definition: avformat.h:1776
AVPacket::stream_index
int stream_index
Definition: packet.h:376
next_duration
static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts)
Definition: fifo.c:171
av_guess_format
const AVOutputFormat * av_guess_format(const char *short_name, const char *filename, const char *mime_type)
Return the output format in the list of registered output formats which best matches the provided par...
Definition: format.c:53
FifoContext::attempt_recovery
int attempt_recovery
Definition: fifo.c:61
FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC
#define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC
Definition: fifo.c:35
fifo_thread_recover
static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
Definition: fifo.c:396
AVDictionaryEntry
Definition: dict.h:89
fifo_write_trailer
static int fifo_write_trailer(AVFormatContext *avf)
Definition: fifo.c:616
ff_stream_clone
AVStream * ff_stream_clone(AVFormatContext *dst_ctx, const AVStream *src)
Create a new stream and copy to it all parameters from a source stream, with the exception of the ind...
Definition: avformat.c:286
AVPacket
This structure stores compressed data.
Definition: packet.h:351
AV_OPT_TYPE_BOOL
@ AV_OPT_TYPE_BOOL
Definition: opt.h:244
av_thread_message_queue_free
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:93
av_dict_copy
int av_dict_copy(AVDictionary **dst, const AVDictionary *src, int flags)
Copy entries from one AVDictionary struct into another.
Definition: dict.c:237
fifo_thread_attempt_recovery
static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
Definition: fifo.c:325
FFStream::cur_dts
int64_t cur_dts
Definition: internal.h:410
AVFormatContext::io_close2
int(* io_close2)(struct AVFormatContext *s, AVIOContext *pb)
A callback for closing the streams opened with AVFormatContext.io_open().
Definition: avformat.h:1823
flags
#define flags(name, subs,...)
Definition: cbs_av1.c:561
ff_format_output_open
int ff_format_output_open(AVFormatContext *s, const char *url, AVDictionary **options)
Utility function to open IO stream of output format.
Definition: mux_utils.c:117
av_thread_message_queue_set_err_recv
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
Definition: threadmessage.c:201
av_thread_message_queue_set_free_func
void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq, void(*free_func)(void *msg))
Set the optional free message callback function which will be called if an operation is removing mess...
Definition: threadmessage.c:85
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
AVERROR_EXIT
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:58
FifoContext::recovery_wait_time
int64_t recovery_wait_time
Definition: fifo.c:55
atomic_init
#define atomic_init(obj, value)
Definition: stdatomic.h:33
AV_OPT_TYPE_STRING
@ AV_OPT_TYPE_STRING
Definition: opt.h:229
write_header
static void write_header(FFV1Context *f)
Definition: ffv1enc.c:346
FIFO_FLUSH_OUTPUT
@ FIFO_FLUSH_OUTPUT
Definition: fifo.c:115
fifo_mux_init
static int fifo_mux_init(AVFormatContext *avf, const AVOutputFormat *oformat, const char *filename)
Definition: fifo.c:485
avformat_alloc_output_context2
int avformat_alloc_output_context2(AVFormatContext **ctx, const AVOutputFormat *oformat, const char *format_name, const char *filename)
Allocate an AVFormatContext for an output format.
Definition: mux.c:91
FifoContext::overflow_flag_lock_initialized
int overflow_flag_lock_initialized
Definition: fifo.c:79
AVFormatContext::priv_data
void * priv_data
Format private data.
Definition: avformat.h:1244
FifoThreadContext::header_written
uint8_t header_written
Definition: fifo.c:106
av_dict_iterate
const AVDictionaryEntry * av_dict_iterate(const AVDictionary *m, const AVDictionaryEntry *prev)
Iterate over a dictionary.
Definition: dict.c:42
FifoContext::format
char * format
Definition: fifo.c:41
pthread_mutex_lock
#define pthread_mutex_lock(a)
Definition: ffprobe.c:74
ff_thread_setname
static int ff_thread_setname(const char *name)
Definition: thread.h:195
mux.h