FFmpeg
fifo.c
Go to the documentation of this file.
1 /*
2  * FIFO pseudo-muxer
3  * Copyright (c) 2016 Jan Sebechlebsky
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public License
9  * as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <stdatomic.h>
23 
24 #include "libavutil/avassert.h"
25 #include "libavutil/opt.h"
26 #include "libavutil/time.h"
27 #include "libavutil/thread.h"
29 #include "avformat.h"
30 #include "internal.h"
31 
32 #define FIFO_DEFAULT_QUEUE_SIZE 60
33 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 0
34 #define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC 5000000 // 5 seconds
35 
36 typedef struct FifoContext {
37  const AVClass *class;
39 
40  char *format;
42 
45 
47 
48  /* Return value of last write_trailer_call */
50 
51  /* Time to wait before next recovery attempt
52  * This can refer to the time in processed stream,
53  * or real time. */
55 
56  /* Maximal number of unsuccessful successive recovery attempts */
58 
59  /* Whether to attempt recovery from failure */
61 
62  /* If >0 stream time will be used when waiting
63  * for the recovery attempt instead of real time */
65 
66  /* If >0 recovery will be attempted regardless of error code
67  * (except AVERROR_EXIT, so exit request is never ignored) */
69 
70  /* Whether to drop packets in case the queue is full. */
72 
73  /* Whether to wait for keyframe when recovering
74  * from failure or queue overflow */
76 
79  /* Value > 0 signals queue overflow */
81 
83  int64_t last_sent_dts;
84  int64_t timeshift;
85 } FifoContext;
86 
87 typedef struct FifoThreadContext {
89 
90  /* Timestamp of last failure.
91  * This is either pts in case stream time is used,
92  * or microseconds as returned by av_getttime_relative() */
94 
95  /* Number of current recovery process
96  * Value > 0 means we are in recovery process */
98 
99  /* If > 0 all frames will be dropped until keyframe is received */
101 
102  /* Value > 0 means that the previous write_header call was successful
103  * so finalization by calling write_trailer and ff_io_close must be done
104  * before exiting / reinitialization of underlying muxer */
106 
109 
110 typedef enum FifoMessageType {
116 
117 typedef struct FifoMessage {
120 } FifoMessage;
121 
123 {
124  AVFormatContext *avf = ctx->avf;
125  FifoContext *fifo = avf->priv_data;
126  AVFormatContext *avf2 = fifo->avf;
128  int ret, i;
129 
130  ret = av_dict_copy(&format_options, fifo->format_options, 0);
131  if (ret < 0)
132  return ret;
133 
134  ret = ff_format_output_open(avf2, avf->url, &format_options);
135  if (ret < 0) {
136  av_log(avf, AV_LOG_ERROR, "Error opening %s: %s\n", avf->url,
137  av_err2str(ret));
138  goto end;
139  }
140 
141  for (i = 0;i < avf2->nb_streams; i++)
142  avf2->streams[i]->cur_dts = 0;
143 
144  ret = avformat_write_header(avf2, &format_options);
145  if (!ret)
146  ctx->header_written = 1;
147 
148  // Check for options unrecognized by underlying muxer
149  if (format_options) {
150  AVDictionaryEntry *entry = NULL;
151  while ((entry = av_dict_get(format_options, "", entry, AV_DICT_IGNORE_SUFFIX)))
152  av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
153  ret = AVERROR(EINVAL);
154  }
155 
156 end:
157  av_dict_free(&format_options);
158  return ret;
159 }
160 
162 {
163  AVFormatContext *avf = ctx->avf;
164  FifoContext *fifo = avf->priv_data;
165  AVFormatContext *avf2 = fifo->avf;
166 
167  return av_write_frame(avf2, NULL);
168 }
169 
170 static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts)
171 {
172  AVStream *st = avf->streams[pkt->stream_index];
173  int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q);
174  int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts);
175  *last_dts = dts;
176  return duration;
177 }
178 
180 {
181  AVFormatContext *avf = ctx->avf;
182  FifoContext *fifo = avf->priv_data;
183  AVFormatContext *avf2 = fifo->avf;
184  AVRational src_tb, dst_tb;
185  int ret, s_idx;
186 
187  if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
188  atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed);
189 
190  if (ctx->drop_until_keyframe) {
191  if (pkt->flags & AV_PKT_FLAG_KEY) {
192  ctx->drop_until_keyframe = 0;
193  av_log(avf, AV_LOG_VERBOSE, "Keyframe received, recovering...\n");
194  } else {
195  av_log(avf, AV_LOG_VERBOSE, "Dropping non-keyframe packet\n");
196  av_packet_unref(pkt);
197  return 0;
198  }
199  }
200 
201  s_idx = pkt->stream_index;
202  src_tb = avf->streams[s_idx]->time_base;
203  dst_tb = avf2->streams[s_idx]->time_base;
204  av_packet_rescale_ts(pkt, src_tb, dst_tb);
205 
206  ret = av_write_frame(avf2, pkt);
207  if (ret >= 0)
208  av_packet_unref(pkt);
209  return ret;
210 }
211 
213 {
214  AVFormatContext *avf = ctx->avf;
215  FifoContext *fifo = avf->priv_data;
216  AVFormatContext *avf2 = fifo->avf;
217  int ret;
218 
219  if (!ctx->header_written)
220  return 0;
221 
222  ret = av_write_trailer(avf2);
223  ff_format_io_close(avf2, &avf2->pb);
224 
225  return ret;
226 }
227 
229 {
230  int ret = AVERROR(EINVAL);
231 
232  if (msg->type == FIFO_NOOP)
233  return 0;
234 
235  if (!ctx->header_written) {
236  ret = fifo_thread_write_header(ctx);
237  if (ret < 0)
238  return ret;
239  }
240 
241  switch(msg->type) {
242  case FIFO_WRITE_HEADER:
243  av_assert0(ret >= 0);
244  return ret;
245  case FIFO_WRITE_PACKET:
246  return fifo_thread_write_packet(ctx, &msg->pkt);
247  case FIFO_FLUSH_OUTPUT:
248  return fifo_thread_flush_output(ctx);
249  }
250 
251  av_assert0(0);
252  return AVERROR(EINVAL);
253 }
254 
255 static int is_recoverable(const FifoContext *fifo, int err_no) {
256  if (!fifo->attempt_recovery)
257  return 0;
258 
259  if (fifo->recover_any_error)
260  return err_no != AVERROR_EXIT;
261 
262  switch (err_no) {
263  case AVERROR(EINVAL):
264  case AVERROR(ENOSYS):
265  case AVERROR_EOF:
266  case AVERROR_EXIT:
268  return 0;
269  default:
270  return 1;
271  }
272 }
273 
274 static void free_message(void *msg)
275 {
276  FifoMessage *fifo_msg = msg;
277 
278  if (fifo_msg->type == FIFO_WRITE_PACKET)
279  av_packet_unref(&fifo_msg->pkt);
280 }
281 
283  int err_no)
284 {
285  AVFormatContext *avf = ctx->avf;
286  FifoContext *fifo = avf->priv_data;
287  int ret;
288 
289  av_log(avf, AV_LOG_INFO, "Recovery failed: %s\n",
290  av_err2str(err_no));
291 
292  if (fifo->recovery_wait_streamtime) {
293  if (pkt->pts == AV_NOPTS_VALUE)
294  av_log(avf, AV_LOG_WARNING, "Packet does not contain presentation"
295  " timestamp, recovery will be attempted immediately");
296  ctx->last_recovery_ts = pkt->pts;
297  } else {
299  }
300 
301  if (fifo->max_recovery_attempts &&
302  ctx->recovery_nr >= fifo->max_recovery_attempts) {
303  av_log(avf, AV_LOG_ERROR,
304  "Maximal number of %d recovery attempts reached.\n",
305  fifo->max_recovery_attempts);
306  ret = err_no;
307  } else {
308  ret = AVERROR(EAGAIN);
309  }
310 
311  return ret;
312 }
313 
315 {
316  AVFormatContext *avf = ctx->avf;
317  FifoContext *fifo = avf->priv_data;
318  AVPacket *pkt = &msg->pkt;
319  int64_t time_since_recovery;
320  int ret;
321 
322  if (!is_recoverable(fifo, err_no)) {
323  ret = err_no;
324  goto fail;
325  }
326 
327  if (ctx->header_written) {
329  ctx->header_written = 0;
330  }
331 
332  if (!ctx->recovery_nr) {
334  AV_NOPTS_VALUE : 0;
335  } else {
336  if (fifo->recovery_wait_streamtime) {
337  if (ctx->last_recovery_ts == AV_NOPTS_VALUE) {
339  time_since_recovery = av_rescale_q(pkt->pts - ctx->last_recovery_ts,
340  tb, AV_TIME_BASE_Q);
341  } else {
342  /* Enforce recovery immediately */
343  time_since_recovery = fifo->recovery_wait_time;
344  }
345  } else {
346  time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
347  }
348 
349  if (time_since_recovery < fifo->recovery_wait_time)
350  return AVERROR(EAGAIN);
351  }
352 
353  ctx->recovery_nr++;
354 
355  if (fifo->max_recovery_attempts) {
356  av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d/%d\n",
357  ctx->recovery_nr, fifo->max_recovery_attempts);
358  } else {
359  av_log(avf, AV_LOG_VERBOSE, "Recovery attempt #%d\n",
360  ctx->recovery_nr);
361  }
362 
363  if (fifo->restart_with_keyframe && fifo->drop_pkts_on_overflow)
364  ctx->drop_until_keyframe = 1;
365 
366  ret = fifo_thread_dispatch_message(ctx, msg);
367  if (ret < 0) {
368  if (is_recoverable(fifo, ret)) {
369  return fifo_thread_process_recovery_failure(ctx, pkt, ret);
370  } else {
371  goto fail;
372  }
373  } else {
374  av_log(avf, AV_LOG_INFO, "Recovery successful\n");
375  ctx->recovery_nr = 0;
376  }
377 
378  return 0;
379 
380 fail:
381  free_message(msg);
382  return ret;
383 }
384 
385 static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
386 {
387  AVFormatContext *avf = ctx->avf;
388  FifoContext *fifo = avf->priv_data;
389  int ret;
390 
391  do {
392  if (!fifo->recovery_wait_streamtime && ctx->recovery_nr > 0) {
393  int64_t time_since_recovery = av_gettime_relative() - ctx->last_recovery_ts;
394  int64_t time_to_wait = FFMAX(0, fifo->recovery_wait_time - time_since_recovery);
395  if (time_to_wait)
396  av_usleep(FFMIN(10000, time_to_wait));
397  }
398 
399  ret = fifo_thread_attempt_recovery(ctx, msg, err_no);
400  } while (ret == AVERROR(EAGAIN) && !fifo->drop_pkts_on_overflow);
401 
402  if (ret == AVERROR(EAGAIN) && fifo->drop_pkts_on_overflow) {
403  if (msg->type == FIFO_WRITE_PACKET)
404  av_packet_unref(&msg->pkt);
405  ret = 0;
406  }
407 
408  return ret;
409 }
410 
411 static void *fifo_consumer_thread(void *data)
412 {
414  FifoContext *fifo = avf->priv_data;
416  FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}};
417  int ret;
418 
419  FifoThreadContext fifo_thread_ctx;
420  memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
421  fifo_thread_ctx.avf = avf;
422  fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE;
423 
424  while (1) {
425  uint8_t just_flushed = 0;
426 
427  if (!fifo_thread_ctx.recovery_nr)
428  ret = fifo_thread_dispatch_message(&fifo_thread_ctx, &msg);
429 
430  if (ret < 0 || fifo_thread_ctx.recovery_nr > 0) {
431  int rec_ret = fifo_thread_recover(&fifo_thread_ctx, &msg, ret);
432  if (rec_ret < 0) {
433  av_thread_message_queue_set_err_send(queue, rec_ret);
434  break;
435  }
436  }
437 
438  /* If the queue is full at the moment when fifo_write_packet
439  * attempts to insert new message (packet) to the queue,
440  * it sets the fifo->overflow_flag to 1 and drops packet.
441  * Here in consumer thread, the flag is checked and if it is
442  * set, the queue is flushed and flag cleared. */
444  if (fifo->overflow_flag) {
446  if (fifo->restart_with_keyframe)
447  fifo_thread_ctx.drop_until_keyframe = 1;
448  fifo->overflow_flag = 0;
449  just_flushed = 1;
450  }
452 
453  if (just_flushed)
454  av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");
455 
456  if (fifo->timeshift)
457  while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift)
458  av_usleep(10000);
459 
460  ret = av_thread_message_queue_recv(queue, &msg, 0);
461  if (ret < 0) {
463  break;
464  }
465  }
466 
467  fifo->write_trailer_ret = fifo_thread_write_trailer(&fifo_thread_ctx);
468 
469  return NULL;
470 }
471 
473  const char *filename)
474 {
475  FifoContext *fifo = avf->priv_data;
476  AVFormatContext *avf2;
477  int ret = 0, i;
478 
479  ret = avformat_alloc_output_context2(&avf2, oformat, NULL, filename);
480  if (ret < 0)
481  return ret;
482 
483  fifo->avf = avf2;
484 
486  avf2->max_delay = avf->max_delay;
487  ret = av_dict_copy(&avf2->metadata, avf->metadata, 0);
488  if (ret < 0)
489  return ret;
490  avf2->opaque = avf->opaque;
491  avf2->io_close = avf->io_close;
492  avf2->io_open = avf->io_open;
493  avf2->flags = avf->flags;
494 
495  for (i = 0; i < avf->nb_streams; ++i) {
496  AVStream *st = avformat_new_stream(avf2, NULL);
497  if (!st)
498  return AVERROR(ENOMEM);
499 
500  ret = ff_stream_encode_params_copy(st, avf->streams[i]);
501  if (ret < 0)
502  return ret;
503  }
504 
505  return 0;
506 }
507 
509 {
510  FifoContext *fifo = avf->priv_data;
511  ff_const59 AVOutputFormat *oformat;
512  int ret = 0;
513 
514  if (fifo->recovery_wait_streamtime && !fifo->drop_pkts_on_overflow) {
515  av_log(avf, AV_LOG_ERROR, "recovery_wait_streamtime can be turned on"
516  " only when drop_pkts_on_overflow is also turned on\n");
517  return AVERROR(EINVAL);
518  }
519  atomic_init(&fifo->queue_duration, 0);
521 
522  oformat = av_guess_format(fifo->format, avf->url, NULL);
523  if (!oformat) {
525  return ret;
526  }
527 
528  ret = fifo_mux_init(avf, oformat, avf->url);
529  if (ret < 0)
530  return ret;
531 
532  ret = av_thread_message_queue_alloc(&fifo->queue, (unsigned) fifo->queue_size,
533  sizeof(FifoMessage));
534  if (ret < 0)
535  return ret;
536 
538 
540  if (ret < 0)
541  return AVERROR(ret);
543 
544  return 0;
545 }
546 
548 {
549  FifoContext * fifo = avf->priv_data;
550  int ret;
551 
553  if (ret) {
554  av_log(avf, AV_LOG_ERROR, "Failed to start thread: %s\n",
555  av_err2str(AVERROR(ret)));
556  ret = AVERROR(ret);
557  }
558 
559  return ret;
560 }
561 
563 {
564  FifoContext *fifo = avf->priv_data;
566  int ret;
567 
568  if (pkt) {
569  ret = av_packet_ref(&msg.pkt,pkt);
570  if (ret < 0)
571  return ret;
572  }
573 
574  ret = av_thread_message_queue_send(fifo->queue, &msg,
575  fifo->drop_pkts_on_overflow ?
577  if (ret == AVERROR(EAGAIN)) {
578  uint8_t overflow_set = 0;
579 
580  /* Queue is full, set fifo->overflow_flag to 1
581  * to let consumer thread know the queue should
582  * be flushed. */
584  if (!fifo->overflow_flag)
585  fifo->overflow_flag = overflow_set = 1;
587 
588  if (overflow_set)
589  av_log(avf, AV_LOG_WARNING, "FIFO queue full\n");
590  ret = 0;
591  goto fail;
592  } else if (ret < 0) {
593  goto fail;
594  }
595 
596  if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
597  atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed);
598 
599  return ret;
600 fail:
601  if (pkt)
602  av_packet_unref(&msg.pkt);
603  return ret;
604 }
605 
607 {
608  FifoContext *fifo= avf->priv_data;
609  int ret;
610 
612  if (fifo->timeshift) {
613  int64_t now = av_gettime_relative();
614  int64_t elapsed = 0;
615  FifoMessage msg = {FIFO_NOOP};
616  do {
617  int64_t delay = av_gettime_relative() - now;
618  if (delay < 0) { // Discontinuity?
619  delay = 10000;
620  now = av_gettime_relative();
621  } else {
622  now += delay;
623  }
624  atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed);
625  elapsed += delay;
626  if (elapsed > fifo->timeshift)
627  break;
628  av_usleep(10000);
630  } while (ret >= 0 || ret == AVERROR(EAGAIN));
631  atomic_store(&fifo->queue_duration, INT64_MAX);
632  }
633 
634  ret = pthread_join(fifo->writer_thread, NULL);
635  if (ret < 0) {
636  av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n",
637  av_err2str(AVERROR(ret)));
638  return AVERROR(ret);
639  }
640 
641  ret = fifo->write_trailer_ret;
642  return ret;
643 }
644 
646 {
647  FifoContext *fifo = avf->priv_data;
648 
649  avformat_free_context(fifo->avf);
653 }
654 
655 #define OFFSET(x) offsetof(FifoContext, x)
656 static const AVOption options[] = {
657  {"fifo_format", "Target muxer", OFFSET(format),
659 
660  {"queue_size", "Size of fifo queue", OFFSET(queue_size),
661  AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_QUEUE_SIZE}, 1, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
662 
663  {"format_opts", "Options to be passed to underlying muxer", OFFSET(format_options),
664  AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
665 
666  {"drop_pkts_on_overflow", "Drop packets on fifo queue overflow not to block encoder", OFFSET(drop_pkts_on_overflow),
667  AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
668 
669  {"restart_with_keyframe", "Wait for keyframe when restarting output", OFFSET(restart_with_keyframe),
670  AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
671 
672  {"attempt_recovery", "Attempt recovery in case of failure", OFFSET(attempt_recovery),
673  AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
674 
675  {"max_recovery_attempts", "Maximal number of recovery attempts", OFFSET(max_recovery_attempts),
676  AV_OPT_TYPE_INT, {.i64 = FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS}, 0, INT_MAX, AV_OPT_FLAG_ENCODING_PARAM},
677 
678  {"recovery_wait_time", "Waiting time between recovery attempts", OFFSET(recovery_wait_time),
679  AV_OPT_TYPE_DURATION, {.i64 = FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
680 
681  {"recovery_wait_streamtime", "Use stream time instead of real time while waiting for recovery",
682  OFFSET(recovery_wait_streamtime), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
683 
684  {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
685  AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
686 
687  {"timeshift", "Delay fifo output", OFFSET(timeshift),
688  AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},
689 
690  {NULL},
691 };
692 
693 static const AVClass fifo_muxer_class = {
694  .class_name = "Fifo muxer",
695  .item_name = av_default_item_name,
696  .option = options,
697  .version = LIBAVUTIL_VERSION_INT,
698 };
699 
701  .name = "fifo",
702  .long_name = NULL_IF_CONFIG_SMALL("FIFO queue pseudo-muxer"),
703  .priv_data_size = sizeof(FifoContext),
704  .init = fifo_init,
708  .deinit = fifo_deinit,
709  .priv_class = &fifo_muxer_class,
711 };
static void write_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int unqueue)
Definition: ffmpeg.c:702
int(* io_open)(struct AVFormatContext *s, AVIOContext **pb, const char *url, int flags, AVDictionary **options)
A callback for opening new IO streams.
Definition: avformat.h:1933
#define NULL
Definition: coverity.c:32
AVFormatContext * avf
Definition: fifo.c:88
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
#define pthread_mutex_lock(a)
Definition: ffprobe.c:62
AVIOInterruptCB interrupt_callback
Custom interrupt callbacks for the I/O layer.
Definition: avformat.h:1629
#define atomic_store(object, desired)
Definition: stdatomic.h:85
AVOption.
Definition: opt.h:248
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
ptrdiff_t const GLvoid * data
Definition: opengl_enc.c:100
int av_write_frame(AVFormatContext *s, AVPacket *pkt)
Write a packet to an output media file.
Definition: mux.c:1190
static av_cold int init(AVFilterContext *ctx)
Definition: fifo.c:50
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:200
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq, void(*free_func)(void *msg))
Set the optional free message callback function which will be called if an operation is removing mess...
Definition: threadmessage.c:83
const char * av_default_item_name(void *ptr)
Return the context name.
Definition: log.c:235
static const AVOption options[]
Definition: fifo.c:656
int av_dict_copy(AVDictionary **dst, const AVDictionary *src, int flags)
Copy entries from one AVDictionary struct into another.
Definition: dict.c:217
uint8_t drop_until_keyframe
Definition: fifo.c:100
int av_usleep(unsigned usec)
Sleep for a period of time.
Definition: time.c:84
static AVPacket pkt
static void free_message(void *msg)
Definition: fifo.c:274
static const AVClass fifo_muxer_class
Definition: fifo.c:693
#define AVFMT_ALLOW_FLUSH
Format allows flushing.
Definition: avformat.h:471
pthread_mutex_t overflow_flag_lock
Definition: fifo.c:77
char * format
Definition: fifo.c:40
uint8_t header_written
Definition: fifo.c:105
Format I/O context.
Definition: avformat.h:1351
int64_t cur_dts
Definition: avformat.h:1079
AVDictionary * format_options
Definition: fifo.c:41
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:72
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:37
AVOutputFormat ff_fifo_muxer
Definition: fifo.c:700
uint8_t
static void fifo_deinit(AVFormatContext *avf)
Definition: fifo.c:645
int max_recovery_attempts
Definition: fifo.c:57
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
AVFormatContext * avf
Definition: fifo.c:38
AVOptions.
static av_cold int end(AVCodecContext *avctx)
Definition: avrndec.c:92
int drop_pkts_on_overflow
Definition: fifo.c:71
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
void ff_format_io_close(AVFormatContext *s, AVIOContext **pb)
Definition: utils.c:5616
AVStream * avformat_new_stream(AVFormatContext *s, const AVCodec *c)
Add a new stream to a media file.
Definition: utils.c:4450
AVStream ** streams
A list of all streams in the file.
Definition: avformat.h:1419
int64_t duration
Definition: movenc.c:63
int64_t last_recovery_ts
Definition: fifo.c:93
AVDictionaryEntry * av_dict_get(const AVDictionary *m, const char *key, const AVDictionaryEntry *prev, int flags)
Get a dictionary entry with matching key.
Definition: dict.c:40
int flags
Flags modifying the (de)muxer behaviour.
Definition: avformat.h:1482
int ff_format_output_open(AVFormatContext *s, const char *url, AVDictionary **options)
Utility function to open IO stream of output format.
Definition: utils.c:5606
int ff_stream_encode_params_copy(AVStream *dst, const AVStream *src)
Copy encoding parameters from source to destination stream.
Definition: utils.c:4261
#define AVERROR_EOF
End of file.
Definition: error.h:55
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:210
FifoMessageType
Definition: fifo.c:110
#define av_log(a,...)
#define AV_OPT_FLAG_ENCODING_PARAM
a generic parameter which can be set by the user for muxing or encoding
Definition: opt.h:278
int av_packet_ref(AVPacket *dst, const AVPacket *src)
Setup a new reference to the data described by a given packet.
Definition: avpacket.c:615
int recovery_wait_streamtime
Definition: fifo.c:64
#define AV_PKT_FLAG_KEY
The packet contains a keyframe.
Definition: packet.h:401
static int fifo_thread_flush_output(FifoThreadContext *ctx)
Definition: fifo.c:161
static int fifo_thread_write_trailer(FifoThreadContext *ctx)
Definition: fifo.c:212
int64_t av_rescale_q(int64_t a, AVRational bq, AVRational cq)
Rescale a 64-bit integer by 2 rational numbers.
Definition: mathematics.c:142
static int fifo_init(AVFormatContext *avf)
Definition: fifo.c:508
#define ff_const59
The ff_const59 define is not part of the public API and will be removed without further warning...
Definition: avformat.h:544
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:194
AVDictionary * metadata
Metadata that applies to the whole file.
Definition: avformat.h:1591
void av_packet_rescale_ts(AVPacket *pkt, AVRational src_tb, AVRational dst_tb)
Convert valid timing fields (timestamps / durations) in a packet from one timebase to another...
Definition: avpacket.c:713
static int fifo_thread_write_header(FifoThreadContext *ctx)
Definition: fifo.c:122
void av_thread_message_flush(AVThreadMessageQueue *mq)
Flush the message queue.
static int fifo_write_header(AVFormatContext *avf)
Definition: fifo.c:547
FifoMessageType type
Definition: fifo.c:118
int64_t recovery_wait_time
Definition: fifo.c:54
pthread_t writer_thread
Definition: fifo.c:46
#define NULL_IF_CONFIG_SMALL(x)
Return NULL if CONFIG_SMALL is true, otherwise the argument without modification. ...
Definition: internal.h:153
volatile uint8_t overflow_flag
Definition: fifo.c:80
char * url
input or output URL.
Definition: avformat.h:1447
#define OFFSET(x)
Definition: fifo.c:655
void av_dict_free(AVDictionary **pm)
Free all the memory allocated for an AVDictionary struct and all keys and values. ...
Definition: dict.c:203
simple assert() macros that are a bit more flexible than ISO C assert().
#define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS
Definition: fifo.c:33
int avformat_alloc_output_context2(AVFormatContext **ctx, ff_const59 AVOutputFormat *oformat, const char *format_name, const char *filename)
Allocate an AVFormatContext for an output format.
Definition: mux.c:135
#define FFMAX(a, b)
Definition: common.h:94
#define fail()
Definition: checkasm.h:123
int flags
A combination of AV_PKT_FLAG values.
Definition: packet.h:369
void * opaque
User data.
Definition: avformat.h:1857
AVPacket pkt
Definition: fifo.c:119
unsigned int nb_streams
Number of elements in AVFormatContext.streams.
Definition: avformat.h:1407
int queue_size
Definition: fifo.c:43
av_warn_unused_result int avformat_write_header(AVFormatContext *s, AVDictionary **options)
Allocate the stream private data and write the stream header to an output media file.
Definition: mux.c:505
#define FFMIN(a, b)
Definition: common.h:96
int recovery_nr
Definition: fifo.c:97
static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
Definition: fifo.c:179
static int write_trailer(AVFormatContext *s1)
Definition: v4l2enc.c:98
const char * name
Definition: avformat.h:500
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:119
AVFormatContext * ctx
Definition: movenc.c:48
#define atomic_load_explicit(object, order)
Definition: stdatomic.h:96
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
atomic_int_least64_t queue_duration
Definition: fifo.c:82
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:66
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:56
#define FIFO_DEFAULT_QUEUE_SIZE
Definition: fifo.c:32
int restart_with_keyframe
Definition: fifo.c:75
Stream structure.
Definition: avformat.h:876
#define AVERROR_PATCHWELCOME
Not yet implemented in FFmpeg, patches welcome.
Definition: error.h:62
#define atomic_fetch_add_explicit(object, operand, order)
Definition: stdatomic.h:149
int64_t last_sent_dts
Definition: fifo.c:83
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
Definition: fifo.c:562
#define AV_LOG_INFO
Standard information.
Definition: log.h:205
#define atomic_fetch_sub_explicit(object, operand, order)
Definition: stdatomic.h:152
void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, int err)
Set the sending error code.
#define AV_TIME_BASE_Q
Internal time base represented as fractional value.
Definition: avutil.h:260
AVIOContext * pb
I/O context.
Definition: avformat.h:1393
static int fifo_write_trailer(AVFormatContext *avf)
Definition: fifo.c:606
void av_packet_unref(AVPacket *pkt)
Wipe the packet.
Definition: avpacket.c:606
void(* io_close)(struct AVFormatContext *s, AVIOContext *pb)
A callback for closing the streams opened with AVFormatContext.io_open().
Definition: avformat.h:1939
Perform non-blocking operation.
Definition: threadmessage.h:31
Describe the class of an AVClass context structure.
Definition: log.h:67
ff_const59 AVOutputFormat * av_guess_format(const char *short_name, const char *filename, const char *mime_type)
Return the output format in the list of registered output formats which best matches the provided par...
Definition: format.c:51
static int fifo_thread_process_recovery_failure(FifoThreadContext *ctx, AVPacket *pkt, int err_no)
Definition: fifo.c:282
Rational number (pair of numerator and denominator).
Definition: rational.h:58
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:40
AVThreadMessageQueue * queue
Definition: fifo.c:44
int write_trailer_ret
Definition: fifo.c:49
static int fifo_thread_attempt_recovery(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
Definition: fifo.c:314
static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg)
Definition: fifo.c:228
void avformat_free_context(AVFormatContext *s)
Free an AVFormatContext and all its streams.
Definition: utils.c:4379
static int fifo_mux_init(AVFormatContext *avf, ff_const59 AVOutputFormat *oformat, const char *filename)
Definition: fifo.c:472
int64_t av_gettime_relative(void)
Get the current time in microseconds since some unspecified starting point.
Definition: time.c:56
#define flags(name, subs,...)
Definition: cbs_av1.c:560
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:91
#define FIFO_DEFAULT_RECOVERY_WAIT_TIME_USEC
Definition: fifo.c:34
Main libavformat public API header.
static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts)
Definition: fifo.c:170
_fmutex pthread_mutex_t
Definition: os2threads.h:53
#define AVFMT_NOFILE
Demuxer will use avio_open, no opened file should be provided by the caller.
Definition: avformat.h:458
int attempt_recovery
Definition: fifo.c:60
char * key
Definition: dict.h:86
int overflow_flag_lock_initialized
Definition: fifo.c:78
intptr_t atomic_int_least64_t
Definition: stdatomic.h:68
void * priv_data
Format private data.
Definition: avformat.h:1379
static void write_header(FFV1Context *f)
Definition: ffv1enc.c:346
int64_t dts
Decompression timestamp in AVStream->time_base units; the time at which the packet is decompressed...
Definition: packet.h:362
static void * fifo_consumer_thread(void *data)
Definition: fifo.c:411
int av_write_trailer(AVFormatContext *s)
Write the stream trailer to an output media file and free the file private data.
Definition: mux.c:1251
#define atomic_init(obj, value)
Definition: stdatomic.h:33
int64_t last_received_dts
Definition: fifo.c:107
#define AV_DICT_IGNORE_SUFFIX
Return first entry in a dictionary whose first part corresponds to the search key, ignoring the suffix of the found key string.
Definition: dict.h:70
static int fifo_thread_recover(FifoThreadContext *ctx, FifoMessage *msg, int err_no)
Definition: fifo.c:385
#define AVERROR_MUXER_NOT_FOUND
Muxer not found.
Definition: error.h:60
int stream_index
Definition: packet.h:365
AVRational time_base
This is the fundamental unit of time (in seconds) in terms of which frame timestamps are represented...
Definition: avformat.h:905
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later.That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another.Frame references ownership and permissions
static int is_recoverable(const FifoContext *fifo, int err_no)
Definition: fifo.c:255
#define AVFMT_TS_NEGATIVE
Format allows muxing negative timestamps.
Definition: avformat.h:477
This structure stores compressed data.
Definition: packet.h:340
int64_t timeshift
Definition: fifo.c:84
int64_t pts
Presentation timestamp in AVStream->time_base units; the time at which the decompressed packet will b...
Definition: packet.h:356
int i
Definition: input.c:407
int recover_any_error
Definition: fifo.c:68
#define AV_NOPTS_VALUE
Undefined timestamp value.
Definition: avutil.h:248
#define tb
Definition: regdef.h:68