FFmpeg
ffmpeg_sched.c
Go to the documentation of this file.
1 /*
2  * Inter-thread scheduling/synchronization.
3  * Copyright (c) 2023 Anton Khirnov
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <stdatomic.h>
23 #include <stddef.h>
24 #include <stdint.h>
25 
26 #include "cmdutils.h"
27 #include "ffmpeg_sched.h"
28 #include "ffmpeg_utils.h"
29 #include "sync_queue.h"
30 #include "thread_queue.h"
31 
32 #include "libavcodec/packet.h"
33 
34 #include "libavutil/avassert.h"
35 #include "libavutil/error.h"
36 #include "libavutil/fifo.h"
37 #include "libavutil/frame.h"
38 #include "libavutil/mem.h"
39 #include "libavutil/thread.h"
41 #include "libavutil/time.h"
42 
43 // 100 ms
44 // FIXME: some other value? make this dynamic?
45 #define SCHEDULE_TOLERANCE (100 * 1000)
46 
47 enum QueueType {
50 };
51 
52 typedef struct SchWaiter {
56 
57  // the following are internal state of schedule_update_locked() and must not
58  // be accessed outside of it
61 } SchWaiter;
62 
63 typedef struct SchTask {
66 
68  void *func_arg;
69 
72 } SchTask;
73 
74 typedef struct SchDecOutput {
76  uint8_t *dst_finished;
77  unsigned nb_dst;
78 } SchDecOutput;
79 
80 typedef struct SchDec {
81  const AVClass *class;
82 
84 
86  unsigned nb_outputs;
87 
89  // Queue for receiving input packets, one stream.
91 
92  // Queue for sending post-flush end timestamps back to the source
95 
96  // temporary storage used by sch_dec_send()
98 } SchDec;
99 
100 typedef struct SchSyncQueue {
104 
105  unsigned *enc_idx;
106  unsigned nb_enc_idx;
107 } SchSyncQueue;
108 
109 typedef struct SchEnc {
110  const AVClass *class;
111 
114  uint8_t *dst_finished;
115  unsigned nb_dst;
116 
117  // [0] - index of the sync queue in Scheduler.sq_enc,
118  // [1] - index of this encoder in the sq
119  int sq_idx[2];
120 
121  /* Opening encoders is somewhat nontrivial due to their interaction with
122  * sync queues, which are (among other things) responsible for maintaining
123  * constant audio frame size, when it is required by the encoder.
124  *
125  * Opening the encoder requires stream parameters, obtained from the first
126  * frame. However, that frame cannot be properly chunked by the sync queue
127  * without knowing the required frame size, which is only available after
128  * opening the encoder.
129  *
130  * This apparent circular dependency is resolved in the following way:
131  * - the caller creating the encoder gives us a callback which opens the
132  * encoder and returns the required frame size (if any)
133  * - when the first frame is sent to the encoder, the sending thread
134  * - calls this callback, opening the encoder
135  * - passes the returned frame size to the sync queue
136  */
137  int (*open_cb)(void *opaque, const AVFrame *frame);
138  int opened;
139 
141  // Queue for receiving input frames, one stream.
143  // tq_send() to queue returned EOF
145 
146  // temporary storage used by sch_enc_send()
148 } SchEnc;
149 
150 typedef struct SchDemuxStream {
152  uint8_t *dst_finished;
153  unsigned nb_dst;
155 
156 typedef struct SchDemux {
157  const AVClass *class;
158 
160  unsigned nb_streams;
161 
164 
165  // temporary storage used by sch_demux_send()
167 
168  // protected by schedule_lock
170 } SchDemux;
171 
172 typedef struct PreMuxQueue {
173  /**
174  * Queue for buffering the packets before the muxer task can be started.
175  */
177  /**
178  * Maximum number of packets in fifo.
179  */
181  /*
182  * The size of the AVPackets' buffers in queue.
183  * Updated when a packet is either pushed or pulled from the queue.
184  */
185  size_t data_size;
186  /* Threshold after which max_packets will be in effect */
188 } PreMuxQueue;
189 
190 typedef struct SchMuxStream {
193 
194  unsigned *sub_heartbeat_dst;
196 
198 
199  // an EOF was generated while flushing the pre-mux queue
200  int init_eof;
201 
202  ////////////////////////////////////////////////////////////
203  // The following are protected by Scheduler.schedule_lock //
204 
205  /* dts+duration of the last packet sent to this stream
206  in AV_TIME_BASE_Q */
208  // this stream no longer accepts input
210  ////////////////////////////////////////////////////////////
211 } SchMuxStream;
212 
213 typedef struct SchMux {
214  const AVClass *class;
215 
217  unsigned nb_streams;
219 
220  int (*init)(void *arg);
221 
223  /**
224  * Set to 1 after starting the muxer task and flushing the
225  * pre-muxing queues.
226  * Set either before any tasks have started, or with
227  * Scheduler.mux_ready_lock held.
228  */
231  unsigned queue_size;
232 
234 } SchMux;
235 
236 typedef struct SchFilterIn {
241 } SchFilterIn;
242 
243 typedef struct SchFilterOut {
245 } SchFilterOut;
246 
247 typedef struct SchFilterGraph {
248  const AVClass *class;
249 
251  unsigned nb_inputs;
254 
256  unsigned nb_outputs;
257 
259  // input queue, nb_inputs+1 streams
260  // last stream is control
263 
264  // protected by schedule_lock
265  unsigned best_input;
268 
273 };
274 
275 struct Scheduler {
276  const AVClass *class;
277 
279  unsigned nb_demux;
280 
282  unsigned nb_mux;
283 
284  unsigned nb_mux_ready;
286 
287  unsigned nb_mux_done;
288  unsigned task_failed;
291 
292 
294  unsigned nb_dec;
295 
297  unsigned nb_enc;
298 
300  unsigned nb_sq_enc;
301 
303  unsigned nb_filters;
304 
306  int sdp_auto;
307 
310 
312 
314 };
315 
316 /**
317  * Wait until this task is allowed to proceed.
318  *
319  * @retval 0 the caller should proceed
320  * @retval 1 the caller should terminate
321  */
322 static int waiter_wait(Scheduler *sch, SchWaiter *w)
323 {
324  int terminate;
325 
326  if (!atomic_load(&w->choked))
327  return 0;
328 
329  pthread_mutex_lock(&w->lock);
330 
331  while (atomic_load(&w->choked) && !atomic_load(&sch->terminate))
332  pthread_cond_wait(&w->cond, &w->lock);
333 
334  terminate = atomic_load(&sch->terminate);
335 
336  pthread_mutex_unlock(&w->lock);
337 
338  return terminate;
339 }
340 
341 static void waiter_set(SchWaiter *w, int choked)
342 {
343  pthread_mutex_lock(&w->lock);
344 
345  atomic_store(&w->choked, choked);
346  pthread_cond_signal(&w->cond);
347 
348  pthread_mutex_unlock(&w->lock);
349 }
350 
351 static int waiter_init(SchWaiter *w)
352 {
353  int ret;
354 
355  atomic_init(&w->choked, 0);
356 
357  ret = pthread_mutex_init(&w->lock, NULL);
358  if (ret)
359  return AVERROR(ret);
360 
361  ret = pthread_cond_init(&w->cond, NULL);
362  if (ret)
363  return AVERROR(ret);
364 
365  return 0;
366 }
367 
368 static void waiter_uninit(SchWaiter *w)
369 {
370  pthread_mutex_destroy(&w->lock);
371  pthread_cond_destroy(&w->cond);
372 }
373 
374 static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size,
375  enum QueueType type)
376 {
377  ThreadQueue *tq;
378  ObjPool *op;
379 
380  if (queue_size <= 0) {
381  if (type == QUEUE_FRAMES)
382  queue_size = DEFAULT_FRAME_THREAD_QUEUE_SIZE;
383  else
385  }
386 
387  if (type == QUEUE_FRAMES) {
388  // This queue length is used in the decoder code to ensure that
389  // there are enough entries in fixed-size frame pools to account
390  // for frames held in queues inside the ffmpeg utility. If this
391  // can ever dynamically change then the corresponding decode
392  // code needs to be updated as well.
394  }
395 
398  if (!op)
399  return AVERROR(ENOMEM);
400 
401  tq = tq_alloc(nb_streams, queue_size, op,
403  if (!tq) {
404  objpool_free(&op);
405  return AVERROR(ENOMEM);
406  }
407 
408  *ptq = tq;
409  return 0;
410 }
411 
412 static void *task_wrapper(void *arg);
413 
414 static int task_start(SchTask *task)
415 {
416  int ret;
417 
418  av_log(task->func_arg, AV_LOG_VERBOSE, "Starting thread...\n");
419 
420  av_assert0(!task->thread_running);
421 
422  ret = pthread_create(&task->thread, NULL, task_wrapper, task);
423  if (ret) {
424  av_log(task->func_arg, AV_LOG_ERROR, "pthread_create() failed: %s\n",
425  strerror(ret));
426  return AVERROR(ret);
427  }
428 
429  task->thread_running = 1;
430  return 0;
431 }
432 
433 static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx,
434  SchThreadFunc func, void *func_arg)
435 {
436  task->parent = sch;
437 
438  task->node.type = type;
439  task->node.idx = idx;
440 
441  task->func = func;
442  task->func_arg = func_arg;
443 }
444 
445 static int64_t trailing_dts(const Scheduler *sch, int count_finished)
446 {
447  int64_t min_dts = INT64_MAX;
448 
449  for (unsigned i = 0; i < sch->nb_mux; i++) {
450  const SchMux *mux = &sch->mux[i];
451 
452  for (unsigned j = 0; j < mux->nb_streams; j++) {
453  const SchMuxStream *ms = &mux->streams[j];
454 
455  if (ms->source_finished && !count_finished)
456  continue;
457  if (ms->last_dts == AV_NOPTS_VALUE)
458  return AV_NOPTS_VALUE;
459 
460  min_dts = FFMIN(min_dts, ms->last_dts);
461  }
462  }
463 
464  return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
465 }
466 
467 void sch_free(Scheduler **psch)
468 {
469  Scheduler *sch = *psch;
470 
471  if (!sch)
472  return;
473 
474  sch_stop(sch, NULL);
475 
476  for (unsigned i = 0; i < sch->nb_demux; i++) {
477  SchDemux *d = &sch->demux[i];
478 
479  for (unsigned j = 0; j < d->nb_streams; j++) {
480  SchDemuxStream *ds = &d->streams[j];
481  av_freep(&ds->dst);
482  av_freep(&ds->dst_finished);
483  }
484  av_freep(&d->streams);
485 
487 
488  waiter_uninit(&d->waiter);
489  }
490  av_freep(&sch->demux);
491 
492  for (unsigned i = 0; i < sch->nb_mux; i++) {
493  SchMux *mux = &sch->mux[i];
494 
495  for (unsigned j = 0; j < mux->nb_streams; j++) {
496  SchMuxStream *ms = &mux->streams[j];
497 
498  if (ms->pre_mux_queue.fifo) {
499  AVPacket *pkt;
500  while (av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1) >= 0)
503  }
504 
506  }
507  av_freep(&mux->streams);
508 
510 
511  tq_free(&mux->queue);
512  }
513  av_freep(&sch->mux);
514 
515  for (unsigned i = 0; i < sch->nb_dec; i++) {
516  SchDec *dec = &sch->dec[i];
517 
518  tq_free(&dec->queue);
519 
521 
522  for (unsigned j = 0; j < dec->nb_outputs; j++) {
523  SchDecOutput *o = &dec->outputs[j];
524 
525  av_freep(&o->dst);
526  av_freep(&o->dst_finished);
527  }
528 
529  av_freep(&dec->outputs);
530 
531  av_frame_free(&dec->send_frame);
532  }
533  av_freep(&sch->dec);
534 
535  for (unsigned i = 0; i < sch->nb_enc; i++) {
536  SchEnc *enc = &sch->enc[i];
537 
538  tq_free(&enc->queue);
539 
540  av_packet_free(&enc->send_pkt);
541 
542  av_freep(&enc->dst);
543  av_freep(&enc->dst_finished);
544  }
545  av_freep(&sch->enc);
546 
547  for (unsigned i = 0; i < sch->nb_sq_enc; i++) {
548  SchSyncQueue *sq = &sch->sq_enc[i];
549  sq_free(&sq->sq);
550  av_frame_free(&sq->frame);
552  av_freep(&sq->enc_idx);
553  }
554  av_freep(&sch->sq_enc);
555 
556  for (unsigned i = 0; i < sch->nb_filters; i++) {
557  SchFilterGraph *fg = &sch->filters[i];
558 
559  tq_free(&fg->queue);
560 
561  av_freep(&fg->inputs);
562  av_freep(&fg->outputs);
563 
564  waiter_uninit(&fg->waiter);
565  }
566  av_freep(&sch->filters);
567 
568  av_freep(&sch->sdp_filename);
569 
571 
573 
576 
577  av_freep(psch);
578 }
579 
580 static const AVClass scheduler_class = {
581  .class_name = "Scheduler",
582  .version = LIBAVUTIL_VERSION_INT,
583 };
584 
586 {
587  Scheduler *sch;
588  int ret;
589 
590  sch = av_mallocz(sizeof(*sch));
591  if (!sch)
592  return NULL;
593 
594  sch->class = &scheduler_class;
595  sch->sdp_auto = 1;
596 
598  if (ret)
599  goto fail;
600 
602  if (ret)
603  goto fail;
604 
606  if (ret)
607  goto fail;
608 
610  if (ret)
611  goto fail;
612 
613  return sch;
614 fail:
615  sch_free(&sch);
616  return NULL;
617 }
618 
619 int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
620 {
621  av_freep(&sch->sdp_filename);
622  sch->sdp_filename = av_strdup(sdp_filename);
623  return sch->sdp_filename ? 0 : AVERROR(ENOMEM);
624 }
625 
626 static const AVClass sch_mux_class = {
627  .class_name = "SchMux",
628  .version = LIBAVUTIL_VERSION_INT,
629  .parent_log_context_offset = offsetof(SchMux, task.func_arg),
630 };
631 
632 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
633  void *arg, int sdp_auto, unsigned thread_queue_size)
634 {
635  const unsigned idx = sch->nb_mux;
636 
637  SchMux *mux;
638  int ret;
639 
640  ret = GROW_ARRAY(sch->mux, sch->nb_mux);
641  if (ret < 0)
642  return ret;
643 
644  mux = &sch->mux[idx];
645  mux->class = &sch_mux_class;
646  mux->init = init;
647  mux->queue_size = thread_queue_size;
648 
649  task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
650 
651  sch->sdp_auto &= sdp_auto;
652 
653  return idx;
654 }
655 
656 int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
657 {
658  SchMux *mux;
659  SchMuxStream *ms;
660  unsigned stream_idx;
661  int ret;
662 
663  av_assert0(mux_idx < sch->nb_mux);
664  mux = &sch->mux[mux_idx];
665 
666  ret = GROW_ARRAY(mux->streams, mux->nb_streams);
667  if (ret < 0)
668  return ret;
669  stream_idx = mux->nb_streams - 1;
670 
671  ms = &mux->streams[stream_idx];
672 
673  ms->pre_mux_queue.fifo = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
674  if (!ms->pre_mux_queue.fifo)
675  return AVERROR(ENOMEM);
676 
677  ms->last_dts = AV_NOPTS_VALUE;
678 
679  return stream_idx;
680 }
681 
682 static const AVClass sch_demux_class = {
683  .class_name = "SchDemux",
684  .version = LIBAVUTIL_VERSION_INT,
685  .parent_log_context_offset = offsetof(SchDemux, task.func_arg),
686 };
687 
689 {
690  const unsigned idx = sch->nb_demux;
691 
692  SchDemux *d;
693  int ret;
694 
695  ret = GROW_ARRAY(sch->demux, sch->nb_demux);
696  if (ret < 0)
697  return ret;
698 
699  d = &sch->demux[idx];
700 
701  task_init(sch, &d->task, SCH_NODE_TYPE_DEMUX, idx, func, ctx);
702 
703  d->class = &sch_demux_class;
704  d->send_pkt = av_packet_alloc();
705  if (!d->send_pkt)
706  return AVERROR(ENOMEM);
707 
708  ret = waiter_init(&d->waiter);
709  if (ret < 0)
710  return ret;
711 
712  return idx;
713 }
714 
715 int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
716 {
717  SchDemux *d;
718  int ret;
719 
720  av_assert0(demux_idx < sch->nb_demux);
721  d = &sch->demux[demux_idx];
722 
723  ret = GROW_ARRAY(d->streams, d->nb_streams);
724  return ret < 0 ? ret : d->nb_streams - 1;
725 }
726 
727 int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
728 {
729  SchDec *dec;
730  int ret;
731 
732  av_assert0(dec_idx < sch->nb_dec);
733  dec = &sch->dec[dec_idx];
734 
735  ret = GROW_ARRAY(dec->outputs, dec->nb_outputs);
736  if (ret < 0)
737  return ret;
738 
739  return dec->nb_outputs - 1;
740 }
741 
742 static const AVClass sch_dec_class = {
743  .class_name = "SchDec",
744  .version = LIBAVUTIL_VERSION_INT,
745  .parent_log_context_offset = offsetof(SchDec, task.func_arg),
746 };
747 
748 int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
749 {
750  const unsigned idx = sch->nb_dec;
751 
752  SchDec *dec;
753  int ret;
754 
755  ret = GROW_ARRAY(sch->dec, sch->nb_dec);
756  if (ret < 0)
757  return ret;
758 
759  dec = &sch->dec[idx];
760 
761  task_init(sch, &dec->task, SCH_NODE_TYPE_DEC, idx, func, ctx);
762 
763  dec->class = &sch_dec_class;
764  dec->send_frame = av_frame_alloc();
765  if (!dec->send_frame)
766  return AVERROR(ENOMEM);
767 
768  ret = sch_add_dec_output(sch, idx);
769  if (ret < 0)
770  return ret;
771 
772  ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
773  if (ret < 0)
774  return ret;
775 
776  if (send_end_ts) {
778  if (ret < 0)
779  return ret;
780  }
781 
782  return idx;
783 }
784 
785 static const AVClass sch_enc_class = {
786  .class_name = "SchEnc",
787  .version = LIBAVUTIL_VERSION_INT,
788  .parent_log_context_offset = offsetof(SchEnc, task.func_arg),
789 };
790 
792  int (*open_cb)(void *opaque, const AVFrame *frame))
793 {
794  const unsigned idx = sch->nb_enc;
795 
796  SchEnc *enc;
797  int ret;
798 
799  ret = GROW_ARRAY(sch->enc, sch->nb_enc);
800  if (ret < 0)
801  return ret;
802 
803  enc = &sch->enc[idx];
804 
805  enc->class = &sch_enc_class;
806  enc->open_cb = open_cb;
807  enc->sq_idx[0] = -1;
808  enc->sq_idx[1] = -1;
809 
810  task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
811 
812  enc->send_pkt = av_packet_alloc();
813  if (!enc->send_pkt)
814  return AVERROR(ENOMEM);
815 
816  ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
817  if (ret < 0)
818  return ret;
819 
820  return idx;
821 }
822 
823 static const AVClass sch_fg_class = {
824  .class_name = "SchFilterGraph",
825  .version = LIBAVUTIL_VERSION_INT,
826  .parent_log_context_offset = offsetof(SchFilterGraph, task.func_arg),
827 };
828 
829 int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
830  SchThreadFunc func, void *ctx)
831 {
832  const unsigned idx = sch->nb_filters;
833 
834  SchFilterGraph *fg;
835  int ret;
836 
837  ret = GROW_ARRAY(sch->filters, sch->nb_filters);
838  if (ret < 0)
839  return ret;
840  fg = &sch->filters[idx];
841 
842  fg->class = &sch_fg_class;
843 
844  task_init(sch, &fg->task, SCH_NODE_TYPE_FILTER_IN, idx, func, ctx);
845 
846  if (nb_inputs) {
847  fg->inputs = av_calloc(nb_inputs, sizeof(*fg->inputs));
848  if (!fg->inputs)
849  return AVERROR(ENOMEM);
850  fg->nb_inputs = nb_inputs;
851  }
852 
853  if (nb_outputs) {
854  fg->outputs = av_calloc(nb_outputs, sizeof(*fg->outputs));
855  if (!fg->outputs)
856  return AVERROR(ENOMEM);
857  fg->nb_outputs = nb_outputs;
858  }
859 
860  ret = waiter_init(&fg->waiter);
861  if (ret < 0)
862  return ret;
863 
864  ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
865  if (ret < 0)
866  return ret;
867 
868  return idx;
869 }
870 
871 int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
872 {
873  SchSyncQueue *sq;
874  int ret;
875 
876  ret = GROW_ARRAY(sch->sq_enc, sch->nb_sq_enc);
877  if (ret < 0)
878  return ret;
879  sq = &sch->sq_enc[sch->nb_sq_enc - 1];
880 
881  sq->sq = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, logctx);
882  if (!sq->sq)
883  return AVERROR(ENOMEM);
884 
885  sq->frame = av_frame_alloc();
886  if (!sq->frame)
887  return AVERROR(ENOMEM);
888 
889  ret = pthread_mutex_init(&sq->lock, NULL);
890  if (ret)
891  return AVERROR(ret);
892 
893  return sq - sch->sq_enc;
894 }
895 
896 int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
897  int limiting, uint64_t max_frames)
898 {
899  SchSyncQueue *sq;
900  SchEnc *enc;
901  int ret;
902 
903  av_assert0(sq_idx < sch->nb_sq_enc);
904  sq = &sch->sq_enc[sq_idx];
905 
906  av_assert0(enc_idx < sch->nb_enc);
907  enc = &sch->enc[enc_idx];
908 
909  ret = GROW_ARRAY(sq->enc_idx, sq->nb_enc_idx);
910  if (ret < 0)
911  return ret;
912  sq->enc_idx[sq->nb_enc_idx - 1] = enc_idx;
913 
914  ret = sq_add_stream(sq->sq, limiting);
915  if (ret < 0)
916  return ret;
917 
918  enc->sq_idx[0] = sq_idx;
919  enc->sq_idx[1] = ret;
920 
921  if (max_frames != INT64_MAX)
922  sq_limit_frames(sq->sq, enc->sq_idx[1], max_frames);
923 
924  return 0;
925 }
926 
928 {
929  int ret;
930 
931  switch (src.type) {
932  case SCH_NODE_TYPE_DEMUX: {
933  SchDemuxStream *ds;
934 
935  av_assert0(src.idx < sch->nb_demux &&
936  src.idx_stream < sch->demux[src.idx].nb_streams);
937  ds = &sch->demux[src.idx].streams[src.idx_stream];
938 
939  ret = GROW_ARRAY(ds->dst, ds->nb_dst);
940  if (ret < 0)
941  return ret;
942 
943  ds->dst[ds->nb_dst - 1] = dst;
944 
945  // demuxed packets go to decoding or streamcopy
946  switch (dst.type) {
947  case SCH_NODE_TYPE_DEC: {
948  SchDec *dec;
949 
950  av_assert0(dst.idx < sch->nb_dec);
951  dec = &sch->dec[dst.idx];
952 
953  av_assert0(!dec->src.type);
954  dec->src = src;
955  break;
956  }
957  case SCH_NODE_TYPE_MUX: {
958  SchMuxStream *ms;
959 
960  av_assert0(dst.idx < sch->nb_mux &&
961  dst.idx_stream < sch->mux[dst.idx].nb_streams);
962  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
963 
964  av_assert0(!ms->src.type);
965  ms->src = src;
966 
967  break;
968  }
969  default: av_assert0(0);
970  }
971 
972  break;
973  }
974  case SCH_NODE_TYPE_DEC: {
975  SchDec *dec;
976  SchDecOutput *o;
977 
978  av_assert0(src.idx < sch->nb_dec);
979  dec = &sch->dec[src.idx];
980 
981  av_assert0(src.idx_stream < dec->nb_outputs);
982  o = &dec->outputs[src.idx_stream];
983 
984  ret = GROW_ARRAY(o->dst, o->nb_dst);
985  if (ret < 0)
986  return ret;
987 
988  o->dst[o->nb_dst - 1] = dst;
989 
990  // decoded frames go to filters or encoding
991  switch (dst.type) {
993  SchFilterIn *fi;
994 
995  av_assert0(dst.idx < sch->nb_filters &&
996  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
997  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
998 
999  av_assert0(!fi->src.type);
1000  fi->src = src;
1001  break;
1002  }
1003  case SCH_NODE_TYPE_ENC: {
1004  SchEnc *enc;
1005 
1006  av_assert0(dst.idx < sch->nb_enc);
1007  enc = &sch->enc[dst.idx];
1008 
1009  av_assert0(!enc->src.type);
1010  enc->src = src;
1011  break;
1012  }
1013  default: av_assert0(0);
1014  }
1015 
1016  break;
1017  }
1018  case SCH_NODE_TYPE_FILTER_OUT: {
1019  SchFilterOut *fo;
1020 
1021  av_assert0(src.idx < sch->nb_filters &&
1022  src.idx_stream < sch->filters[src.idx].nb_outputs);
1023  fo = &sch->filters[src.idx].outputs[src.idx_stream];
1024 
1025  av_assert0(!fo->dst.type);
1026  fo->dst = dst;
1027 
1028  // filtered frames go to encoding or another filtergraph
1029  switch (dst.type) {
1030  case SCH_NODE_TYPE_ENC: {
1031  SchEnc *enc;
1032 
1033  av_assert0(dst.idx < sch->nb_enc);
1034  enc = &sch->enc[dst.idx];
1035 
1036  av_assert0(!enc->src.type);
1037  enc->src = src;
1038  break;
1039  }
1040  case SCH_NODE_TYPE_FILTER_IN: {
1041  SchFilterIn *fi;
1042 
1043  av_assert0(dst.idx < sch->nb_filters &&
1044  dst.idx_stream < sch->filters[dst.idx].nb_inputs);
1045  fi = &sch->filters[dst.idx].inputs[dst.idx_stream];
1046 
1047  av_assert0(!fi->src.type);
1048  fi->src = src;
1049  break;
1050  }
1051  default: av_assert0(0);
1052  }
1053 
1054 
1055  break;
1056  }
1057  case SCH_NODE_TYPE_ENC: {
1058  SchEnc *enc;
1059 
1060  av_assert0(src.idx < sch->nb_enc);
1061  enc = &sch->enc[src.idx];
1062 
1063  ret = GROW_ARRAY(enc->dst, enc->nb_dst);
1064  if (ret < 0)
1065  return ret;
1066 
1067  enc->dst[enc->nb_dst - 1] = dst;
1068 
1069  // encoding packets go to muxing or decoding
1070  switch (dst.type) {
1071  case SCH_NODE_TYPE_MUX: {
1072  SchMuxStream *ms;
1073 
1074  av_assert0(dst.idx < sch->nb_mux &&
1075  dst.idx_stream < sch->mux[dst.idx].nb_streams);
1076  ms = &sch->mux[dst.idx].streams[dst.idx_stream];
1077 
1078  av_assert0(!ms->src.type);
1079  ms->src = src;
1080 
1081  break;
1082  }
1083  case SCH_NODE_TYPE_DEC: {
1084  SchDec *dec;
1085 
1086  av_assert0(dst.idx < sch->nb_dec);
1087  dec = &sch->dec[dst.idx];
1088 
1089  av_assert0(!dec->src.type);
1090  dec->src = src;
1091 
1092  break;
1093  }
1094  default: av_assert0(0);
1095  }
1096 
1097  break;
1098  }
1099  default: av_assert0(0);
1100  }
1101 
1102  return 0;
1103 }
1104 
1105 static int mux_task_start(SchMux *mux)
1106 {
1107  int ret = 0;
1108 
1109  ret = task_start(&mux->task);
1110  if (ret < 0)
1111  return ret;
1112 
1113  /* flush the pre-muxing queues */
1114  while (1) {
1115  int min_stream = -1;
1116  Timestamp min_ts = { .ts = AV_NOPTS_VALUE };
1117 
1118  AVPacket *pkt;
1119 
1120  // find the stream with the earliest dts or EOF in pre-muxing queue
1121  for (unsigned i = 0; i < mux->nb_streams; i++) {
1122  SchMuxStream *ms = &mux->streams[i];
1123 
1124  if (av_fifo_peek(ms->pre_mux_queue.fifo, &pkt, 1, 0) < 0)
1125  continue;
1126 
1127  if (!pkt || pkt->dts == AV_NOPTS_VALUE) {
1128  min_stream = i;
1129  break;
1130  }
1131 
1132  if (min_ts.ts == AV_NOPTS_VALUE ||
1133  av_compare_ts(min_ts.ts, min_ts.tb, pkt->dts, pkt->time_base) > 0) {
1134  min_stream = i;
1135  min_ts = (Timestamp){ .ts = pkt->dts, .tb = pkt->time_base };
1136  }
1137  }
1138 
1139  if (min_stream >= 0) {
1140  SchMuxStream *ms = &mux->streams[min_stream];
1141 
1142  ret = av_fifo_read(ms->pre_mux_queue.fifo, &pkt, 1);
1143  av_assert0(ret >= 0);
1144 
1145  if (pkt) {
1146  if (!ms->init_eof)
1147  ret = tq_send(mux->queue, min_stream, pkt);
1148  av_packet_free(&pkt);
1149  if (ret == AVERROR_EOF)
1150  ms->init_eof = 1;
1151  else if (ret < 0)
1152  return ret;
1153  } else
1154  tq_send_finish(mux->queue, min_stream);
1155 
1156  continue;
1157  }
1158 
1159  break;
1160  }
1161 
1162  atomic_store(&mux->mux_started, 1);
1163 
1164  return 0;
1165 }
1166 
1167 int print_sdp(const char *filename);
1168 
1169 static int mux_init(Scheduler *sch, SchMux *mux)
1170 {
1171  int ret;
1172 
1173  ret = mux->init(mux->task.func_arg);
1174  if (ret < 0)
1175  return ret;
1176 
1177  sch->nb_mux_ready++;
1178 
1179  if (sch->sdp_filename || sch->sdp_auto) {
1180  if (sch->nb_mux_ready < sch->nb_mux)
1181  return 0;
1182 
1183  ret = print_sdp(sch->sdp_filename);
1184  if (ret < 0) {
1185  av_log(sch, AV_LOG_ERROR, "Error writing the SDP.\n");
1186  return ret;
1187  }
1188 
1189  /* SDP is written only after all the muxers are ready, so now we
1190  * start ALL the threads */
1191  for (unsigned i = 0; i < sch->nb_mux; i++) {
1192  ret = mux_task_start(&sch->mux[i]);
1193  if (ret < 0)
1194  return ret;
1195  }
1196  } else {
1197  ret = mux_task_start(mux);
1198  if (ret < 0)
1199  return ret;
1200  }
1201 
1202  return 0;
1203 }
1204 
1205 void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1206  size_t data_threshold, int max_packets)
1207 {
1208  SchMux *mux;
1209  SchMuxStream *ms;
1210 
1211  av_assert0(mux_idx < sch->nb_mux);
1212  mux = &sch->mux[mux_idx];
1213 
1214  av_assert0(stream_idx < mux->nb_streams);
1215  ms = &mux->streams[stream_idx];
1216 
1217  ms->pre_mux_queue.max_packets = max_packets;
1218  ms->pre_mux_queue.data_threshold = data_threshold;
1219 }
1220 
1221 int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
1222 {
1223  SchMux *mux;
1224  int ret = 0;
1225 
1226  av_assert0(mux_idx < sch->nb_mux);
1227  mux = &sch->mux[mux_idx];
1228 
1229  av_assert0(stream_idx < mux->nb_streams);
1230 
1232 
1233  av_assert0(mux->nb_streams_ready < mux->nb_streams);
1234 
1235  // this may be called during initialization - do not start
1236  // threads before sch_start() is called
1237  if (++mux->nb_streams_ready == mux->nb_streams &&
1238  sch->state >= SCH_STATE_STARTED)
1239  ret = mux_init(sch, mux);
1240 
1242 
1243  return ret;
1244 }
1245 
1246 int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
1247  unsigned dec_idx)
1248 {
1249  SchMux *mux;
1250  SchMuxStream *ms;
1251  int ret = 0;
1252 
1253  av_assert0(mux_idx < sch->nb_mux);
1254  mux = &sch->mux[mux_idx];
1255 
1256  av_assert0(stream_idx < mux->nb_streams);
1257  ms = &mux->streams[stream_idx];
1258 
1260  if (ret < 0)
1261  return ret;
1262 
1263  av_assert0(dec_idx < sch->nb_dec);
1264  ms->sub_heartbeat_dst[ms->nb_sub_heartbeat_dst - 1] = dec_idx;
1265 
1266  if (!mux->sub_heartbeat_pkt) {
1268  if (!mux->sub_heartbeat_pkt)
1269  return AVERROR(ENOMEM);
1270  }
1271 
1272  return 0;
1273 }
1274 
1276 {
1277  while (1) {
1278  SchFilterGraph *fg;
1279 
1280  // fed directly by a demuxer (i.e. not through a filtergraph)
1281  if (src.type == SCH_NODE_TYPE_DEMUX) {
1282  sch->demux[src.idx].waiter.choked_next = 0;
1283  return;
1284  }
1285 
1287  fg = &sch->filters[src.idx];
1288 
1289  // the filtergraph contains internal sources and
1290  // requested to be scheduled directly
1291  if (fg->best_input == fg->nb_inputs) {
1292  fg->waiter.choked_next = 0;
1293  return;
1294  }
1295 
1296  src = fg->inputs[fg->best_input].src_sched;
1297  }
1298 }
1299 
1301 {
1302  int64_t dts;
1303  int have_unchoked = 0;
1304 
1305  // on termination request all waiters are choked,
1306  // we are not to unchoke them
1307  if (atomic_load(&sch->terminate))
1308  return;
1309 
1310  dts = trailing_dts(sch, 0);
1311 
1312  atomic_store(&sch->last_dts, dts);
1313 
1314  // initialize our internal state
1315  for (unsigned type = 0; type < 2; type++)
1316  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1317  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1318  w->choked_prev = atomic_load(&w->choked);
1319  w->choked_next = 1;
1320  }
1321 
1322  // figure out the sources that are allowed to proceed
1323  for (unsigned i = 0; i < sch->nb_mux; i++) {
1324  SchMux *mux = &sch->mux[i];
1325 
1326  for (unsigned j = 0; j < mux->nb_streams; j++) {
1327  SchMuxStream *ms = &mux->streams[j];
1328 
1329  // unblock sources for output streams that are not finished
1330  // and not too far ahead of the trailing stream
1331  if (ms->source_finished)
1332  continue;
1333  if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
1334  continue;
1335  if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE)
1336  continue;
1337 
1338  // resolve the source to unchoke
1339  unchoke_for_stream(sch, ms->src_sched);
1340  have_unchoked = 1;
1341  }
1342  }
1343 
1344  // make sure to unchoke at least one source, if still available
1345  for (unsigned type = 0; !have_unchoked && type < 2; type++)
1346  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1347  int exited = type ? sch->filters[i].task_exited : sch->demux[i].task_exited;
1348  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1349  if (!exited) {
1350  w->choked_next = 0;
1351  have_unchoked = 1;
1352  break;
1353  }
1354  }
1355 
1356 
1357  for (unsigned type = 0; type < 2; type++)
1358  for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
1359  SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
1360  if (w->choked_prev != w->choked_next)
1361  waiter_set(w, w->choked_next);
1362  }
1363 
1364 }
1365 
1366 enum {
1370 };
1371 
1372 static int
1374  uint8_t *filters_visited, SchedulerNode *filters_stack)
1375 {
1376  unsigned nb_filters_stack = 0;
1377 
1378  memset(filters_visited, 0, sch->nb_filters * sizeof(*filters_visited));
1379 
1380  while (1) {
1381  const SchFilterGraph *fg = &sch->filters[src.idx];
1382 
1383  filters_visited[src.idx] = CYCLE_NODE_STARTED;
1384 
1385  // descend into every input, depth first
1386  if (src.idx_stream < fg->nb_inputs) {
1387  const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
1388 
1389  // connected to demuxer, no cycles possible
1390  if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
1391  continue;
1392 
1393  // otherwise connected to another filtergraph
1395 
1396  // found a cycle
1397  if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
1398  return AVERROR(EINVAL);
1399 
1400  // place current position on stack and descend
1401  av_assert0(nb_filters_stack < sch->nb_filters);
1402  filters_stack[nb_filters_stack++] = src;
1403  src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
1404  continue;
1405  }
1406 
1407  filters_visited[src.idx] = CYCLE_NODE_DONE;
1408 
1409  // previous search finished,
1410  if (nb_filters_stack) {
1411  src = filters_stack[--nb_filters_stack];
1412  continue;
1413  }
1414  return 0;
1415  }
1416 }
1417 
1418 static int check_acyclic(Scheduler *sch)
1419 {
1420  uint8_t *filters_visited = NULL;
1421  SchedulerNode *filters_stack = NULL;
1422 
1423  int ret = 0;
1424 
1425  if (!sch->nb_filters)
1426  return 0;
1427 
1428  filters_visited = av_malloc_array(sch->nb_filters, sizeof(*filters_visited));
1429  if (!filters_visited)
1430  return AVERROR(ENOMEM);
1431 
1432  filters_stack = av_malloc_array(sch->nb_filters, sizeof(*filters_stack));
1433  if (!filters_stack) {
1434  ret = AVERROR(ENOMEM);
1435  goto fail;
1436  }
1437 
1438  // trace the transcoding graph upstream from every filtegraph
1439  for (unsigned i = 0; i < sch->nb_filters; i++) {
1440  ret = check_acyclic_for_output(sch, (SchedulerNode){ .idx = i },
1441  filters_visited, filters_stack);
1442  if (ret < 0) {
1443  av_log(&sch->filters[i], AV_LOG_ERROR, "Transcoding graph has a cycle\n");
1444  goto fail;
1445  }
1446  }
1447 
1448 fail:
1449  av_freep(&filters_visited);
1450  av_freep(&filters_stack);
1451  return ret;
1452 }
1453 
1454 static int start_prepare(Scheduler *sch)
1455 {
1456  int ret;
1457 
1458  for (unsigned i = 0; i < sch->nb_demux; i++) {
1459  SchDemux *d = &sch->demux[i];
1460 
1461  for (unsigned j = 0; j < d->nb_streams; j++) {
1462  SchDemuxStream *ds = &d->streams[j];
1463 
1464  if (!ds->nb_dst) {
1465  av_log(d, AV_LOG_ERROR,
1466  "Demuxer stream %u not connected to any sink\n", j);
1467  return AVERROR(EINVAL);
1468  }
1469 
1470  ds->dst_finished = av_calloc(ds->nb_dst, sizeof(*ds->dst_finished));
1471  if (!ds->dst_finished)
1472  return AVERROR(ENOMEM);
1473  }
1474  }
1475 
1476  for (unsigned i = 0; i < sch->nb_dec; i++) {
1477  SchDec *dec = &sch->dec[i];
1478 
1479  if (!dec->src.type) {
1480  av_log(dec, AV_LOG_ERROR,
1481  "Decoder not connected to a source\n");
1482  return AVERROR(EINVAL);
1483  }
1484 
1485  for (unsigned j = 0; j < dec->nb_outputs; j++) {
1486  SchDecOutput *o = &dec->outputs[j];
1487 
1488  if (!o->nb_dst) {
1489  av_log(dec, AV_LOG_ERROR,
1490  "Decoder output %u not connected to any sink\n", j);
1491  return AVERROR(EINVAL);
1492  }
1493 
1494  o->dst_finished = av_calloc(o->nb_dst, sizeof(*o->dst_finished));
1495  if (!o->dst_finished)
1496  return AVERROR(ENOMEM);
1497  }
1498  }
1499 
1500  for (unsigned i = 0; i < sch->nb_enc; i++) {
1501  SchEnc *enc = &sch->enc[i];
1502 
1503  if (!enc->src.type) {
1504  av_log(enc, AV_LOG_ERROR,
1505  "Encoder not connected to a source\n");
1506  return AVERROR(EINVAL);
1507  }
1508  if (!enc->nb_dst) {
1509  av_log(enc, AV_LOG_ERROR,
1510  "Encoder not connected to any sink\n");
1511  return AVERROR(EINVAL);
1512  }
1513 
1514  enc->dst_finished = av_calloc(enc->nb_dst, sizeof(*enc->dst_finished));
1515  if (!enc->dst_finished)
1516  return AVERROR(ENOMEM);
1517  }
1518 
1519  for (unsigned i = 0; i < sch->nb_mux; i++) {
1520  SchMux *mux = &sch->mux[i];
1521 
1522  for (unsigned j = 0; j < mux->nb_streams; j++) {
1523  SchMuxStream *ms = &mux->streams[j];
1524 
1525  switch (ms->src.type) {
1526  case SCH_NODE_TYPE_ENC: {
1527  SchEnc *enc = &sch->enc[ms->src.idx];
1528  if (enc->src.type == SCH_NODE_TYPE_DEC) {
1529  ms->src_sched = sch->dec[enc->src.idx].src;
1531  } else {
1532  ms->src_sched = enc->src;
1534  }
1535  break;
1536  }
1537  case SCH_NODE_TYPE_DEMUX:
1538  ms->src_sched = ms->src;
1539  break;
1540  default:
1541  av_log(mux, AV_LOG_ERROR,
1542  "Muxer stream #%u not connected to a source\n", j);
1543  return AVERROR(EINVAL);
1544  }
1545  }
1546 
1547  ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
1548  QUEUE_PACKETS);
1549  if (ret < 0)
1550  return ret;
1551  }
1552 
1553  for (unsigned i = 0; i < sch->nb_filters; i++) {
1554  SchFilterGraph *fg = &sch->filters[i];
1555 
1556  for (unsigned j = 0; j < fg->nb_inputs; j++) {
1557  SchFilterIn *fi = &fg->inputs[j];
1558  SchDec *dec;
1559 
1560  if (!fi->src.type) {
1561  av_log(fg, AV_LOG_ERROR,
1562  "Filtergraph input %u not connected to a source\n", j);
1563  return AVERROR(EINVAL);
1564  }
1565 
1566  if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT)
1567  fi->src_sched = fi->src;
1568  else {
1570  dec = &sch->dec[fi->src.idx];
1571 
1572  switch (dec->src.type) {
1573  case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src; break;
1574  case SCH_NODE_TYPE_ENC: fi->src_sched = sch->enc[dec->src.idx].src; break;
1575  default: av_assert0(0);
1576  }
1577  }
1578  }
1579 
1580  for (unsigned j = 0; j < fg->nb_outputs; j++) {
1581  SchFilterOut *fo = &fg->outputs[j];
1582 
1583  if (!fo->dst.type) {
1584  av_log(fg, AV_LOG_ERROR,
1585  "Filtergraph %u output %u not connected to a sink\n", i, j);
1586  return AVERROR(EINVAL);
1587  }
1588  }
1589  }
1590 
1591  // Check that the transcoding graph has no cycles.
1592  ret = check_acyclic(sch);
1593  if (ret < 0)
1594  return ret;
1595 
1596  return 0;
1597 }
1598 
1600 {
1601  int ret;
1602 
1603  ret = start_prepare(sch);
1604  if (ret < 0)
1605  return ret;
1606 
1608  sch->state = SCH_STATE_STARTED;
1609 
1610  for (unsigned i = 0; i < sch->nb_mux; i++) {
1611  SchMux *mux = &sch->mux[i];
1612 
1613  if (mux->nb_streams_ready == mux->nb_streams) {
1614  ret = mux_init(sch, mux);
1615  if (ret < 0)
1616  goto fail;
1617  }
1618  }
1619 
1620  for (unsigned i = 0; i < sch->nb_enc; i++) {
1621  SchEnc *enc = &sch->enc[i];
1622 
1623  ret = task_start(&enc->task);
1624  if (ret < 0)
1625  goto fail;
1626  }
1627 
1628  for (unsigned i = 0; i < sch->nb_filters; i++) {
1629  SchFilterGraph *fg = &sch->filters[i];
1630 
1631  ret = task_start(&fg->task);
1632  if (ret < 0)
1633  goto fail;
1634  }
1635 
1636  for (unsigned i = 0; i < sch->nb_dec; i++) {
1637  SchDec *dec = &sch->dec[i];
1638 
1639  ret = task_start(&dec->task);
1640  if (ret < 0)
1641  goto fail;
1642  }
1643 
1644  for (unsigned i = 0; i < sch->nb_demux; i++) {
1645  SchDemux *d = &sch->demux[i];
1646 
1647  if (!d->nb_streams)
1648  continue;
1649 
1650  ret = task_start(&d->task);
1651  if (ret < 0)
1652  goto fail;
1653  }
1654 
1658 
1659  return 0;
1660 fail:
1661  sch_stop(sch, NULL);
1662  return ret;
1663 }
1664 
1665 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
1666 {
1667  int ret;
1668 
1669  // convert delay to absolute timestamp
1670  timeout_us += av_gettime();
1671 
1673 
1674  if (sch->nb_mux_done < sch->nb_mux) {
1675  struct timespec tv = { .tv_sec = timeout_us / 1000000,
1676  .tv_nsec = (timeout_us % 1000000) * 1000 };
1677  pthread_cond_timedwait(&sch->finish_cond, &sch->finish_lock, &tv);
1678  }
1679 
1680  // abort transcoding if any task failed
1681  ret = sch->nb_mux_done == sch->nb_mux || sch->task_failed;
1682 
1684 
1685  *transcode_ts = atomic_load(&sch->last_dts);
1686 
1687  return ret;
1688 }
1689 
1690 static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
1691 {
1692  int ret;
1693 
1694  ret = enc->open_cb(enc->task.func_arg, frame);
1695  if (ret < 0)
1696  return ret;
1697 
1698  // ret>0 signals audio frame size, which means sync queue must
1699  // have been enabled during encoder creation
1700  if (ret > 0) {
1701  SchSyncQueue *sq;
1702 
1703  av_assert0(enc->sq_idx[0] >= 0);
1704  sq = &sch->sq_enc[enc->sq_idx[0]];
1705 
1706  pthread_mutex_lock(&sq->lock);
1707 
1708  sq_frame_samples(sq->sq, enc->sq_idx[1], ret);
1709 
1710  pthread_mutex_unlock(&sq->lock);
1711  }
1712 
1713  return 0;
1714 }
1715 
1717 {
1718  int ret;
1719 
1720  if (!frame) {
1721  tq_send_finish(enc->queue, 0);
1722  return 0;
1723  }
1724 
1725  if (enc->in_finished)
1726  return AVERROR_EOF;
1727 
1728  ret = tq_send(enc->queue, 0, frame);
1729  if (ret < 0)
1730  enc->in_finished = 1;
1731 
1732  return ret;
1733 }
1734 
1735 static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1736 {
1737  SchSyncQueue *sq = &sch->sq_enc[enc->sq_idx[0]];
1738  int ret = 0;
1739 
1740  // inform the scheduling code that no more input will arrive along this path;
1741  // this is necessary because the sync queue may not send an EOF downstream
1742  // until other streams finish
1743  // TODO: consider a cleaner way of passing this information through
1744  // the pipeline
1745  if (!frame) {
1746  for (unsigned i = 0; i < enc->nb_dst; i++) {
1747  SchMux *mux;
1748  SchMuxStream *ms;
1749 
1750  if (enc->dst[i].type != SCH_NODE_TYPE_MUX)
1751  continue;
1752 
1753  mux = &sch->mux[enc->dst[i].idx];
1754  ms = &mux->streams[enc->dst[i].idx_stream];
1755 
1757 
1758  ms->source_finished = 1;
1760 
1762  }
1763  }
1764 
1765  pthread_mutex_lock(&sq->lock);
1766 
1767  ret = sq_send(sq->sq, enc->sq_idx[1], SQFRAME(frame));
1768  if (ret < 0)
1769  goto finish;
1770 
1771  while (1) {
1772  SchEnc *enc;
1773 
1774  // TODO: the SQ API should be extended to allow returning EOF
1775  // for individual streams
1776  ret = sq_receive(sq->sq, -1, SQFRAME(sq->frame));
1777  if (ret < 0) {
1778  ret = (ret == AVERROR(EAGAIN)) ? 0 : ret;
1779  break;
1780  }
1781 
1782  enc = &sch->enc[sq->enc_idx[ret]];
1783  ret = send_to_enc_thread(sch, enc, sq->frame);
1784  if (ret < 0) {
1785  av_frame_unref(sq->frame);
1786  if (ret != AVERROR_EOF)
1787  break;
1788 
1789  sq_send(sq->sq, enc->sq_idx[1], SQFRAME(NULL));
1790  continue;
1791  }
1792  }
1793 
1794  if (ret < 0) {
1795  // close all encoders fed from this sync queue
1796  for (unsigned i = 0; i < sq->nb_enc_idx; i++) {
1797  int err = send_to_enc_thread(sch, &sch->enc[sq->enc_idx[i]], NULL);
1798 
1799  // if the sync queue error is EOF and closing the encoder
1800  // produces a more serious error, make sure to pick the latter
1801  ret = err_merge((ret == AVERROR_EOF && err < 0) ? 0 : ret, err);
1802  }
1803  }
1804 
1805 finish:
1806  pthread_mutex_unlock(&sq->lock);
1807 
1808  return ret;
1809 }
1810 
1811 static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
1812 {
1813  if (enc->open_cb && frame && !enc->opened) {
1814  int ret = enc_open(sch, enc, frame);
1815  if (ret < 0)
1816  return ret;
1817  enc->opened = 1;
1818 
1819  // discard empty frames that only carry encoder init parameters
1820  if (!frame->buf[0]) {
1822  return 0;
1823  }
1824  }
1825 
1826  return (enc->sq_idx[0] >= 0) ?
1827  send_to_enc_sq (sch, enc, frame) :
1828  send_to_enc_thread(sch, enc, frame);
1829 }
1830 
1832 {
1833  PreMuxQueue *q = &ms->pre_mux_queue;
1834  AVPacket *tmp_pkt = NULL;
1835  int ret;
1836 
1837  if (!av_fifo_can_write(q->fifo)) {
1838  size_t packets = av_fifo_can_read(q->fifo);
1839  size_t pkt_size = pkt ? pkt->size : 0;
1840  int thresh_reached = (q->data_size + pkt_size) > q->data_threshold;
1841  size_t max_packets = thresh_reached ? q->max_packets : SIZE_MAX;
1842  size_t new_size = FFMIN(2 * packets, max_packets);
1843 
1844  if (new_size <= packets) {
1845  av_log(mux, AV_LOG_ERROR,
1846  "Too many packets buffered for output stream.\n");
1847  return AVERROR(ENOSPC);
1848  }
1849  ret = av_fifo_grow2(q->fifo, new_size - packets);
1850  if (ret < 0)
1851  return ret;
1852  }
1853 
1854  if (pkt) {
1855  tmp_pkt = av_packet_alloc();
1856  if (!tmp_pkt)
1857  return AVERROR(ENOMEM);
1858 
1859  av_packet_move_ref(tmp_pkt, pkt);
1860  q->data_size += tmp_pkt->size;
1861  }
1862  av_fifo_write(q->fifo, &tmp_pkt, 1);
1863 
1864  return 0;
1865 }
1866 
1867 static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx,
1868  AVPacket *pkt)
1869 {
1870  SchMuxStream *ms = &mux->streams[stream_idx];
1871  int64_t dts = (pkt && pkt->dts != AV_NOPTS_VALUE) ?
1874 
1875  // queue the packet if the muxer cannot be started yet
1876  if (!atomic_load(&mux->mux_started)) {
1877  int queued = 0;
1878 
1879  // the muxer could have started between the above atomic check and
1880  // locking the mutex, then this block falls through to normal send path
1882 
1883  if (!atomic_load(&mux->mux_started)) {
1884  int ret = mux_queue_packet(mux, ms, pkt);
1885  queued = ret < 0 ? ret : 1;
1886  }
1887 
1889 
1890  if (queued < 0)
1891  return queued;
1892  else if (queued)
1893  goto update_schedule;
1894  }
1895 
1896  if (pkt) {
1897  int ret;
1898 
1899  if (ms->init_eof)
1900  return AVERROR_EOF;
1901 
1902  ret = tq_send(mux->queue, stream_idx, pkt);
1903  if (ret < 0)
1904  return ret;
1905  } else
1906  tq_send_finish(mux->queue, stream_idx);
1907 
1908 update_schedule:
1909  // TODO: use atomics to check whether this changes trailing dts
1910  // to avoid locking unnecesarily
1911  if (dts != AV_NOPTS_VALUE || !pkt) {
1913 
1914  if (pkt) ms->last_dts = dts;
1915  else ms->source_finished = 1;
1916 
1918 
1920  }
1921 
1922  return 0;
1923 }
1924 
1925 static int
1927  uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
1928 {
1929  int ret;
1930 
1931  if (*dst_finished)
1932  return AVERROR_EOF;
1933 
1934  if (pkt && dst.type == SCH_NODE_TYPE_MUX &&
1937  pkt = NULL;
1938  }
1939 
1940  if (!pkt)
1941  goto finish;
1942 
1943  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
1944  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
1945  tq_send(sch->dec[dst.idx].queue, 0, pkt);
1946  if (ret == AVERROR_EOF)
1947  goto finish;
1948 
1949  return ret;
1950 
1951 finish:
1952  if (dst.type == SCH_NODE_TYPE_MUX)
1953  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
1954  else
1955  tq_send_finish(sch->dec[dst.idx].queue, 0);
1956 
1957  *dst_finished = 1;
1958  return AVERROR_EOF;
1959 }
1960 
1962  AVPacket *pkt, unsigned flags)
1963 {
1964  unsigned nb_done = 0;
1965 
1966  for (unsigned i = 0; i < ds->nb_dst; i++) {
1967  AVPacket *to_send = pkt;
1968  uint8_t *finished = &ds->dst_finished[i];
1969 
1970  int ret;
1971 
1972  // sending a packet consumes it, so make a temporary reference if needed
1973  if (pkt && i < ds->nb_dst - 1) {
1974  to_send = d->send_pkt;
1975 
1976  ret = av_packet_ref(to_send, pkt);
1977  if (ret < 0)
1978  return ret;
1979  }
1980 
1981  ret = demux_stream_send_to_dst(sch, ds->dst[i], finished, to_send, flags);
1982  if (to_send)
1983  av_packet_unref(to_send);
1984  if (ret == AVERROR_EOF)
1985  nb_done++;
1986  else if (ret < 0)
1987  return ret;
1988  }
1989 
1990  return (nb_done == ds->nb_dst) ? AVERROR_EOF : 0;
1991 }
1992 
1994 {
1995  Timestamp max_end_ts = (Timestamp){ .ts = AV_NOPTS_VALUE };
1996 
1997  av_assert0(!pkt->buf && !pkt->data && !pkt->side_data_elems);
1998 
1999  for (unsigned i = 0; i < d->nb_streams; i++) {
2000  SchDemuxStream *ds = &d->streams[i];
2001 
2002  for (unsigned j = 0; j < ds->nb_dst; j++) {
2003  const SchedulerNode *dst = &ds->dst[j];
2004  SchDec *dec;
2005  int ret;
2006 
2007  if (ds->dst_finished[j] || dst->type != SCH_NODE_TYPE_DEC)
2008  continue;
2009 
2010  dec = &sch->dec[dst->idx];
2011 
2012  ret = tq_send(dec->queue, 0, pkt);
2013  if (ret < 0)
2014  return ret;
2015 
2016  if (dec->queue_end_ts) {
2017  Timestamp ts;
2019  if (ret < 0)
2020  return ret;
2021 
2022  if (max_end_ts.ts == AV_NOPTS_VALUE ||
2023  (ts.ts != AV_NOPTS_VALUE &&
2024  av_compare_ts(max_end_ts.ts, max_end_ts.tb, ts.ts, ts.tb) < 0))
2025  max_end_ts = ts;
2026 
2027  }
2028  }
2029  }
2030 
2031  pkt->pts = max_end_ts.ts;
2032  pkt->time_base = max_end_ts.tb;
2033 
2034  return 0;
2035 }
2036 
2037 int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt,
2038  unsigned flags)
2039 {
2040  SchDemux *d;
2041  int terminate;
2042 
2043  av_assert0(demux_idx < sch->nb_demux);
2044  d = &sch->demux[demux_idx];
2045 
2046  terminate = waiter_wait(sch, &d->waiter);
2047  if (terminate)
2048  return AVERROR_EXIT;
2049 
2050  // flush the downstreams after seek
2051  if (pkt->stream_index == -1)
2052  return demux_flush(sch, d, pkt);
2053 
2055 
2056  return demux_send_for_stream(sch, d, &d->streams[pkt->stream_index], pkt, flags);
2057 }
2058 
2059 static int demux_done(Scheduler *sch, unsigned demux_idx)
2060 {
2061  SchDemux *d = &sch->demux[demux_idx];
2062  int ret = 0;
2063 
2064  for (unsigned i = 0; i < d->nb_streams; i++) {
2065  int err = demux_send_for_stream(sch, d, &d->streams[i], NULL, 0);
2066  if (err != AVERROR_EOF)
2067  ret = err_merge(ret, err);
2068  }
2069 
2071 
2072  d->task_exited = 1;
2073 
2075 
2077 
2078  return ret;
2079 }
2080 
2081 int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
2082 {
2083  SchMux *mux;
2084  int ret, stream_idx;
2085 
2086  av_assert0(mux_idx < sch->nb_mux);
2087  mux = &sch->mux[mux_idx];
2088 
2089  ret = tq_receive(mux->queue, &stream_idx, pkt);
2090  pkt->stream_index = stream_idx;
2091  return ret;
2092 }
2093 
2094 void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
2095 {
2096  SchMux *mux;
2097 
2098  av_assert0(mux_idx < sch->nb_mux);
2099  mux = &sch->mux[mux_idx];
2100 
2101  av_assert0(stream_idx < mux->nb_streams);
2102  tq_receive_finish(mux->queue, stream_idx);
2103 
2105  mux->streams[stream_idx].source_finished = 1;
2106 
2108 
2110 }
2111 
2112 int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
2113  const AVPacket *pkt)
2114 {
2115  SchMux *mux;
2116  SchMuxStream *ms;
2117 
2118  av_assert0(mux_idx < sch->nb_mux);
2119  mux = &sch->mux[mux_idx];
2120 
2121  av_assert0(stream_idx < mux->nb_streams);
2122  ms = &mux->streams[stream_idx];
2123 
2124  for (unsigned i = 0; i < ms->nb_sub_heartbeat_dst; i++) {
2125  SchDec *dst = &sch->dec[ms->sub_heartbeat_dst[i]];
2126  int ret;
2127 
2129  if (ret < 0)
2130  return ret;
2131 
2132  tq_send(dst->queue, 0, mux->sub_heartbeat_pkt);
2133  }
2134 
2135  return 0;
2136 }
2137 
2138 static int mux_done(Scheduler *sch, unsigned mux_idx)
2139 {
2140  SchMux *mux = &sch->mux[mux_idx];
2141 
2143 
2144  for (unsigned i = 0; i < mux->nb_streams; i++) {
2145  tq_receive_finish(mux->queue, i);
2146  mux->streams[i].source_finished = 1;
2147  }
2148 
2150 
2152 
2154 
2155  av_assert0(sch->nb_mux_done < sch->nb_mux);
2156  sch->nb_mux_done++;
2157 
2159 
2161 
2162  return 0;
2163 }
2164 
2165 int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
2166 {
2167  SchDec *dec;
2168  int ret, dummy;
2169 
2170  av_assert0(dec_idx < sch->nb_dec);
2171  dec = &sch->dec[dec_idx];
2172 
2173  // the decoder should have given us post-flush end timestamp in pkt
2174  if (dec->expect_end_ts) {
2175  Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
2177  if (ret < 0)
2178  return ret;
2179 
2180  dec->expect_end_ts = 0;
2181  }
2182 
2183  ret = tq_receive(dec->queue, &dummy, pkt);
2184  av_assert0(dummy <= 0);
2185 
2186  // got a flush packet, on the next call to this function the decoder
2187  // will give us post-flush end timestamp
2188  if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
2189  dec->expect_end_ts = 1;
2190 
2191  return ret;
2192 }
2193 
2195  unsigned in_idx, AVFrame *frame)
2196 {
2197  if (frame)
2198  return tq_send(fg->queue, in_idx, frame);
2199 
2200  if (!fg->inputs[in_idx].send_finished) {
2201  fg->inputs[in_idx].send_finished = 1;
2202  tq_send_finish(fg->queue, in_idx);
2203 
2204  // close the control stream when all actual inputs are done
2205  if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
2206  tq_send_finish(fg->queue, fg->nb_inputs);
2207  }
2208  return 0;
2209 }
2210 
2212  uint8_t *dst_finished, AVFrame *frame)
2213 {
2214  int ret;
2215 
2216  if (*dst_finished)
2217  return AVERROR_EOF;
2218 
2219  if (!frame)
2220  goto finish;
2221 
2222  ret = (dst.type == SCH_NODE_TYPE_FILTER_IN) ?
2223  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame) :
2224  send_to_enc(sch, &sch->enc[dst.idx], frame);
2225  if (ret == AVERROR_EOF)
2226  goto finish;
2227 
2228  return ret;
2229 
2230 finish:
2231  if (dst.type == SCH_NODE_TYPE_FILTER_IN)
2232  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2233  else
2234  send_to_enc(sch, &sch->enc[dst.idx], NULL);
2235 
2236  *dst_finished = 1;
2237 
2238  return AVERROR_EOF;
2239 }
2240 
2241 int sch_dec_send(Scheduler *sch, unsigned dec_idx,
2242  unsigned out_idx, AVFrame *frame)
2243 {
2244  SchDec *dec;
2245  SchDecOutput *o;
2246  int ret;
2247  unsigned nb_done = 0;
2248 
2249  av_assert0(dec_idx < sch->nb_dec);
2250  dec = &sch->dec[dec_idx];
2251 
2252  av_assert0(out_idx < dec->nb_outputs);
2253  o = &dec->outputs[out_idx];
2254 
2255  for (unsigned i = 0; i < o->nb_dst; i++) {
2256  uint8_t *finished = &o->dst_finished[i];
2257  AVFrame *to_send = frame;
2258 
2259  // sending a frame consumes it, so make a temporary reference if needed
2260  if (i < o->nb_dst - 1) {
2261  to_send = dec->send_frame;
2262 
2263  // frame may sometimes contain props only,
2264  // e.g. to signal EOF timestamp
2265  ret = frame->buf[0] ? av_frame_ref(to_send, frame) :
2266  av_frame_copy_props(to_send, frame);
2267  if (ret < 0)
2268  return ret;
2269  }
2270 
2271  ret = dec_send_to_dst(sch, o->dst[i], finished, to_send);
2272  if (ret < 0) {
2273  av_frame_unref(to_send);
2274  if (ret == AVERROR_EOF) {
2275  nb_done++;
2276  continue;
2277  }
2278  return ret;
2279  }
2280  }
2281 
2282  return (nb_done == o->nb_dst) ? AVERROR_EOF : 0;
2283 }
2284 
2285 static int dec_done(Scheduler *sch, unsigned dec_idx)
2286 {
2287  SchDec *dec = &sch->dec[dec_idx];
2288  int ret = 0;
2289 
2290  tq_receive_finish(dec->queue, 0);
2291 
2292  // make sure our source does not get stuck waiting for end timestamps
2293  // that will never arrive
2294  if (dec->queue_end_ts)
2296 
2297  for (unsigned i = 0; i < dec->nb_outputs; i++) {
2298  SchDecOutput *o = &dec->outputs[i];
2299 
2300  for (unsigned j = 0; j < o->nb_dst; j++) {
2301  int err = dec_send_to_dst(sch, o->dst[j], &o->dst_finished[j], NULL);
2302  if (err < 0 && err != AVERROR_EOF)
2303  ret = err_merge(ret, err);
2304  }
2305  }
2306 
2307  return ret;
2308 }
2309 
2310 int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
2311 {
2312  SchEnc *enc;
2313  int ret, dummy;
2314 
2315  av_assert0(enc_idx < sch->nb_enc);
2316  enc = &sch->enc[enc_idx];
2317 
2318  ret = tq_receive(enc->queue, &dummy, frame);
2319  av_assert0(dummy <= 0);
2320 
2321  return ret;
2322 }
2323 
2325  uint8_t *dst_finished, AVPacket *pkt)
2326 {
2327  int ret;
2328 
2329  if (*dst_finished)
2330  return AVERROR_EOF;
2331 
2332  if (!pkt)
2333  goto finish;
2334 
2335  ret = (dst.type == SCH_NODE_TYPE_MUX) ?
2336  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, pkt) :
2337  tq_send(sch->dec[dst.idx].queue, 0, pkt);
2338  if (ret == AVERROR_EOF)
2339  goto finish;
2340 
2341  return ret;
2342 
2343 finish:
2344  if (dst.type == SCH_NODE_TYPE_MUX)
2345  send_to_mux(sch, &sch->mux[dst.idx], dst.idx_stream, NULL);
2346  else
2347  tq_send_finish(sch->dec[dst.idx].queue, 0);
2348 
2349  *dst_finished = 1;
2350 
2351  return AVERROR_EOF;
2352 }
2353 
2354 int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
2355 {
2356  SchEnc *enc;
2357  int ret;
2358 
2359  av_assert0(enc_idx < sch->nb_enc);
2360  enc = &sch->enc[enc_idx];
2361 
2362  for (unsigned i = 0; i < enc->nb_dst; i++) {
2363  uint8_t *finished = &enc->dst_finished[i];
2364  AVPacket *to_send = pkt;
2365 
2366  // sending a packet consumes it, so make a temporary reference if needed
2367  if (i < enc->nb_dst - 1) {
2368  to_send = enc->send_pkt;
2369 
2370  ret = av_packet_ref(to_send, pkt);
2371  if (ret < 0)
2372  return ret;
2373  }
2374 
2375  ret = enc_send_to_dst(sch, enc->dst[i], finished, to_send);
2376  if (ret < 0) {
2377  av_packet_unref(to_send);
2378  if (ret == AVERROR_EOF)
2379  continue;
2380  return ret;
2381  }
2382  }
2383 
2384  return 0;
2385 }
2386 
2387 static int enc_done(Scheduler *sch, unsigned enc_idx)
2388 {
2389  SchEnc *enc = &sch->enc[enc_idx];
2390  int ret = 0;
2391 
2392  tq_receive_finish(enc->queue, 0);
2393 
2394  for (unsigned i = 0; i < enc->nb_dst; i++) {
2395  int err = enc_send_to_dst(sch, enc->dst[i], &enc->dst_finished[i], NULL);
2396  if (err < 0 && err != AVERROR_EOF)
2397  ret = err_merge(ret, err);
2398  }
2399 
2400  return ret;
2401 }
2402 
2403 int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
2404  unsigned *in_idx, AVFrame *frame)
2405 {
2406  SchFilterGraph *fg;
2407 
2408  av_assert0(fg_idx < sch->nb_filters);
2409  fg = &sch->filters[fg_idx];
2410 
2411  av_assert0(*in_idx <= fg->nb_inputs);
2412 
2413  // update scheduling to account for desired input stream, if it changed
2414  //
2415  // this check needs no locking because only the filtering thread
2416  // updates this value
2417  if (*in_idx != fg->best_input) {
2419 
2420  fg->best_input = *in_idx;
2422 
2424  }
2425 
2426  if (*in_idx == fg->nb_inputs) {
2427  int terminate = waiter_wait(sch, &fg->waiter);
2428  return terminate ? AVERROR_EOF : AVERROR(EAGAIN);
2429  }
2430 
2431  while (1) {
2432  int ret, idx;
2433 
2434  ret = tq_receive(fg->queue, &idx, frame);
2435  if (idx < 0)
2436  return AVERROR_EOF;
2437  else if (ret >= 0) {
2438  *in_idx = idx;
2439  return 0;
2440  }
2441 
2442  // disregard EOFs for specific streams - they should always be
2443  // preceded by an EOF frame
2444  }
2445 }
2446 
2447 void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
2448 {
2449  SchFilterGraph *fg;
2450  SchFilterIn *fi;
2451 
2452  av_assert0(fg_idx < sch->nb_filters);
2453  fg = &sch->filters[fg_idx];
2454 
2455  av_assert0(in_idx < fg->nb_inputs);
2456  fi = &fg->inputs[in_idx];
2457 
2458  if (!fi->receive_finished) {
2459  fi->receive_finished = 1;
2460  tq_receive_finish(fg->queue, in_idx);
2461 
2462  // close the control stream when all actual inputs are done
2463  if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
2464  tq_receive_finish(fg->queue, fg->nb_inputs);
2465  }
2466 }
2467 
2468 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
2469 {
2470  SchFilterGraph *fg;
2472 
2473  av_assert0(fg_idx < sch->nb_filters);
2474  fg = &sch->filters[fg_idx];
2475 
2476  av_assert0(out_idx < fg->nb_outputs);
2477  dst = fg->outputs[out_idx].dst;
2478 
2479  return (dst.type == SCH_NODE_TYPE_ENC) ?
2480  send_to_enc (sch, &sch->enc[dst.idx], frame) :
2481  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
2482 }
2483 
2484 static int filter_done(Scheduler *sch, unsigned fg_idx)
2485 {
2486  SchFilterGraph *fg = &sch->filters[fg_idx];
2487  int ret = 0;
2488 
2489  for (unsigned i = 0; i <= fg->nb_inputs; i++)
2490  tq_receive_finish(fg->queue, i);
2491 
2492  for (unsigned i = 0; i < fg->nb_outputs; i++) {
2493  SchedulerNode dst = fg->outputs[i].dst;
2494  int err = (dst.type == SCH_NODE_TYPE_ENC) ?
2495  send_to_enc (sch, &sch->enc[dst.idx], NULL) :
2496  send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
2497 
2498  if (err < 0 && err != AVERROR_EOF)
2499  ret = err_merge(ret, err);
2500  }
2501 
2503 
2504  fg->task_exited = 1;
2505 
2507 
2509 
2510  return ret;
2511 }
2512 
2513 int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
2514 {
2515  SchFilterGraph *fg;
2516 
2517  av_assert0(fg_idx < sch->nb_filters);
2518  fg = &sch->filters[fg_idx];
2519 
2520  return send_to_filter(sch, fg, fg->nb_inputs, frame);
2521 }
2522 
2523 static int task_cleanup(Scheduler *sch, SchedulerNode node)
2524 {
2525  switch (node.type) {
2526  case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
2527  case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
2528  case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
2529  case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
2530  case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
2531  default: av_assert0(0);
2532  }
2533 }
2534 
2535 static void *task_wrapper(void *arg)
2536 {
2537  SchTask *task = arg;
2538  Scheduler *sch = task->parent;
2539  int ret;
2540  int err = 0;
2541 
2542  ret = task->func(task->func_arg);
2543  if (ret < 0)
2544  av_log(task->func_arg, AV_LOG_ERROR,
2545  "Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
2546 
2547  err = task_cleanup(sch, task->node);
2548  ret = err_merge(ret, err);
2549 
2550  // EOF is considered normal termination
2551  if (ret == AVERROR_EOF)
2552  ret = 0;
2553  if (ret < 0) {
2555  sch->task_failed = 1;
2558  }
2559 
2561  "Terminating thread with return code %d (%s)\n", ret,
2562  ret < 0 ? av_err2str(ret) : "success");
2563 
2564  return (void*)(intptr_t)ret;
2565 }
2566 
2567 static int task_stop(Scheduler *sch, SchTask *task)
2568 {
2569  int ret;
2570  void *thread_ret;
2571 
2572  if (!task->thread_running)
2573  return task_cleanup(sch, task->node);
2574 
2575  ret = pthread_join(task->thread, &thread_ret);
2576  av_assert0(ret == 0);
2577 
2578  task->thread_running = 0;
2579 
2580  return (intptr_t)thread_ret;
2581 }
2582 
2583 int sch_stop(Scheduler *sch, int64_t *finish_ts)
2584 {
2585  int ret = 0, err;
2586 
2587  if (sch->state != SCH_STATE_STARTED)
2588  return 0;
2589 
2590  atomic_store(&sch->terminate, 1);
2591 
2592  for (unsigned type = 0; type < 2; type++)
2593  for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
2594  SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
2595  waiter_set(w, 1);
2596  }
2597 
2598  for (unsigned i = 0; i < sch->nb_demux; i++) {
2599  SchDemux *d = &sch->demux[i];
2600 
2601  err = task_stop(sch, &d->task);
2602  ret = err_merge(ret, err);
2603  }
2604 
2605  for (unsigned i = 0; i < sch->nb_dec; i++) {
2606  SchDec *dec = &sch->dec[i];
2607 
2608  err = task_stop(sch, &dec->task);
2609  ret = err_merge(ret, err);
2610  }
2611 
2612  for (unsigned i = 0; i < sch->nb_filters; i++) {
2613  SchFilterGraph *fg = &sch->filters[i];
2614 
2615  err = task_stop(sch, &fg->task);
2616  ret = err_merge(ret, err);
2617  }
2618 
2619  for (unsigned i = 0; i < sch->nb_enc; i++) {
2620  SchEnc *enc = &sch->enc[i];
2621 
2622  err = task_stop(sch, &enc->task);
2623  ret = err_merge(ret, err);
2624  }
2625 
2626  for (unsigned i = 0; i < sch->nb_mux; i++) {
2627  SchMux *mux = &sch->mux[i];
2628 
2629  err = task_stop(sch, &mux->task);
2630  ret = err_merge(ret, err);
2631  }
2632 
2633  if (finish_ts)
2634  *finish_ts = trailing_dts(sch, 1);
2635 
2636  sch->state = SCH_STATE_STOPPED;
2637 
2638  return ret;
2639 }
Scheduler::sq_enc
SchSyncQueue * sq_enc
Definition: ffmpeg_sched.c:299
func
int(* func)(AVBPrint *dst, const char *in, const char *arg)
Definition: jacosubdec.c:68
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
SchWaiter
Definition: ffmpeg_sched.c:52
av_packet_unref
void av_packet_unref(AVPacket *pkt)
Wipe the packet.
Definition: packet.c:429
mux_task_start
static int mux_task_start(SchMux *mux)
Definition: ffmpeg_sched.c:1105
pthread_join
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
SchedulerNode::idx_stream
unsigned idx_stream
Definition: ffmpeg_sched.h:106
SchDecOutput::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:76
waiter_init
static int waiter_init(SchWaiter *w)
Definition: ffmpeg_sched.c:351
av_fifo_can_write
size_t av_fifo_can_write(const AVFifo *f)
Definition: fifo.c:94
Scheduler::finish_lock
pthread_mutex_t finish_lock
Definition: ffmpeg_sched.c:289
atomic_store
#define atomic_store(object, desired)
Definition: stdatomic.h:85
sch_filter_send
int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
Called by filtergraph tasks to send a filtered frame or EOF to consumers.
Definition: ffmpeg_sched.c:2468
err_merge
static int err_merge(int err0, int err1)
Merge two return codes - return one of the error codes if at least one of them was negative,...
Definition: ffmpeg_utils.h:39
SchDec::task
SchTask task
Definition: ffmpeg_sched.c:88
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
Scheduler::enc
SchEnc * enc
Definition: ffmpeg_sched.c:296
Scheduler::nb_mux_done
unsigned nb_mux_done
Definition: ffmpeg_sched.c:287
av_compare_ts
int av_compare_ts(int64_t ts_a, AVRational tb_a, int64_t ts_b, AVRational tb_b)
Compare two timestamps each in its own time base.
Definition: mathematics.c:147
SchedulerState
SchedulerState
Definition: ffmpeg_sched.c:269
sch_mux_receive_finish
void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Called by muxer tasks to signal that a stream will no longer accept input.
Definition: ffmpeg_sched.c:2094
SCH_NODE_TYPE_ENC
@ SCH_NODE_TYPE_ENC
Definition: ffmpeg_sched.h:98
SchSyncQueue::sq
SyncQueue * sq
Definition: ffmpeg_sched.c:101
SchTask::thread
pthread_t thread
Definition: ffmpeg_sched.c:70
atomic_fetch_add
#define atomic_fetch_add(object, operand)
Definition: stdatomic.h:137
demux_flush
static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
Definition: ffmpeg_sched.c:1993
thread.h
AVERROR_EOF
#define AVERROR_EOF
End of file.
Definition: error.h:57
Scheduler::nb_sq_enc
unsigned nb_sq_enc
Definition: ffmpeg_sched.c:300
SchMux::sub_heartbeat_pkt
AVPacket * sub_heartbeat_pkt
Definition: ffmpeg_sched.c:233
sq_limit_frames
void sq_limit_frames(SyncQueue *sq, unsigned int stream_idx, uint64_t frames)
Limit the number of output frames for stream with index stream_idx to max_frames.
Definition: sync_queue.c:649
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
SchEnc::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:147
SCHEDULE_TOLERANCE
#define SCHEDULE_TOLERANCE
Definition: ffmpeg_sched.c:45
sch_add_demux
int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
Add a demuxer to the scheduler.
Definition: ffmpeg_sched.c:688
PreMuxQueue::data_size
size_t data_size
Definition: ffmpeg_sched.c:185
AV_TIME_BASE_Q
#define AV_TIME_BASE_Q
Internal time base represented as fractional value.
Definition: avutil.h:264
int64_t
long long int64_t
Definition: coverity.c:34
CYCLE_NODE_STARTED
@ CYCLE_NODE_STARTED
Definition: ffmpeg_sched.c:1368
SchTask::func
SchThreadFunc func
Definition: ffmpeg_sched.c:67
mux_done
static int mux_done(Scheduler *sch, unsigned mux_idx)
Definition: ffmpeg_sched.c:2138
Scheduler::nb_enc
unsigned nb_enc
Definition: ffmpeg_sched.c:297
av_frame_free
void av_frame_free(AVFrame **frame)
Free the frame and any dynamically allocated objects in it, e.g.
Definition: frame.c:162
SQFRAME
#define SQFRAME(frame)
Definition: sync_queue.h:38
av_fifo_peek
int av_fifo_peek(const AVFifo *f, void *buf, size_t nb_elems, size_t offset)
Read data from a FIFO without modifying FIFO state.
Definition: fifo.c:255
check_acyclic_for_output
static int check_acyclic_for_output(const Scheduler *sch, SchedulerNode src, uint8_t *filters_visited, SchedulerNode *filters_stack)
Definition: ffmpeg_sched.c:1373
AVFrame
This structure describes decoded (raw) audio or video data.
Definition: frame.h:389
w
uint8_t w
Definition: llviddspenc.c:38
task_cleanup
static int task_cleanup(Scheduler *sch, SchedulerNode node)
Definition: ffmpeg_sched.c:2523
frame_move
static void frame_move(void *dst, void *src)
Definition: ffmpeg_utils.h:52
sync_queue.h
AVPacket::data
uint8_t * data
Definition: packet.h:539
SchMux::nb_streams_ready
unsigned nb_streams_ready
Definition: ffmpeg_sched.c:218
SchDemux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:160
send_to_mux
static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx, AVPacket *pkt)
Definition: ffmpeg_sched.c:1867
Scheduler::nb_mux_ready
unsigned nb_mux_ready
Definition: ffmpeg_sched.c:284
atomic_int
intptr_t atomic_int
Definition: stdatomic.h:55
objpool_free
void objpool_free(ObjPool **pop)
Definition: objpool.c:54
enc_done
static int enc_done(Scheduler *sch, unsigned enc_idx)
Definition: ffmpeg_sched.c:2387
SCH_NODE_TYPE_MUX
@ SCH_NODE_TYPE_MUX
Definition: ffmpeg_sched.h:96
AV_LOG_VERBOSE
#define AV_LOG_VERBOSE
Detailed information.
Definition: log.h:225
AVPacket::duration
int64_t duration
Duration of this packet in AVStream->time_base units, 0 if unknown.
Definition: packet.h:557
SchWaiter::choked_prev
int choked_prev
Definition: ffmpeg_sched.c:59
QUEUE_PACKETS
@ QUEUE_PACKETS
Definition: ffmpeg_sched.c:48
SchMux
Definition: ffmpeg_sched.c:213
Scheduler::class
const AVClass * class
Definition: ffmpeg_sched.c:276
objpool_alloc_packets
ObjPool * objpool_alloc_packets(void)
Definition: objpool.c:124
SchFilterOut
Definition: ffmpeg_sched.c:243
SCH_STATE_UNINIT
@ SCH_STATE_UNINIT
Definition: ffmpeg_sched.c:270
Timestamp::ts
int64_t ts
Definition: ffmpeg_utils.h:31
PreMuxQueue::fifo
AVFifo * fifo
Queue for buffering the packets before the muxer task can be started.
Definition: ffmpeg_sched.c:176
SchMuxStream::last_dts
int64_t last_dts
Definition: ffmpeg_sched.c:207
av_packet_free
void av_packet_free(AVPacket **pkt)
Free the packet, if the packet is reference counted, it will be unreferenced first.
Definition: packet.c:74
DEFAULT_FRAME_THREAD_QUEUE_SIZE
#define DEFAULT_FRAME_THREAD_QUEUE_SIZE
Default size of a frame thread queue.
Definition: ffmpeg_sched.h:260
Scheduler::last_dts
atomic_int_least64_t last_dts
Definition: ffmpeg_sched.c:313
SchDemux::send_pkt
AVPacket * send_pkt
Definition: ffmpeg_sched.c:166
sch_mux_stream_ready
int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Signal to the scheduler that the specified muxed stream is initialized and ready.
Definition: ffmpeg_sched.c:1221
send_to_enc_thread
static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1716
task_stop
static int task_stop(Scheduler *sch, SchTask *task)
Definition: ffmpeg_sched.c:2567
SchFilterIn::receive_finished
int receive_finished
Definition: ffmpeg_sched.c:240
SchedulerNode::type
enum SchedulerNodeType type
Definition: ffmpeg_sched.h:104
fifo.h
finish
static void finish(void)
Definition: movenc.c:374
sch_stop
int sch_stop(Scheduler *sch, int64_t *finish_ts)
Definition: ffmpeg_sched.c:2583
SchEnc::sq_idx
int sq_idx[2]
Definition: ffmpeg_sched.c:119
fail
#define fail()
Definition: checkasm.h:189
av_fifo_write
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
Definition: fifo.c:188
SchThreadFunc
int(* SchThreadFunc)(void *arg)
Definition: ffmpeg_sched.h:109
SchFilterOut::dst
SchedulerNode dst
Definition: ffmpeg_sched.c:244
SchDec::outputs
SchDecOutput * outputs
Definition: ffmpeg_sched.c:85
SchEnc
Definition: ffmpeg_sched.c:109
dummy
int dummy
Definition: motion.c:66
av_fifo_grow2
int av_fifo_grow2(AVFifo *f, size_t inc)
Enlarge an AVFifo.
Definition: fifo.c:99
SchDec::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:90
sch_add_mux_stream
int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
Add a muxed stream for a previously added muxer.
Definition: ffmpeg_sched.c:656
SchMux::class
const AVClass * class
Definition: ffmpeg_sched.c:214
SchFilterGraph::nb_inputs_finished_send
atomic_uint nb_inputs_finished_send
Definition: ffmpeg_sched.c:252
SCH_STATE_STOPPED
@ SCH_STATE_STOPPED
Definition: ffmpeg_sched.c:272
sq_receive
int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
Read a frame from the queue.
Definition: sync_queue.c:608
type
it s the only field you need to keep assuming you have a context There is some magic you don t need to care about around this just let it vf type
Definition: writing_filters.txt:86
Scheduler::nb_dec
unsigned nb_dec
Definition: ffmpeg_sched.c:294
av_thread_message_queue_recv
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
Definition: threadmessage.c:177
sch_add_filtergraph
int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, SchThreadFunc func, void *ctx)
Add a filtergraph to the scheduler.
Definition: ffmpeg_sched.c:829
av_frame_alloc
AVFrame * av_frame_alloc(void)
Allocate an AVFrame and set its fields to default values.
Definition: frame.c:150
SchFilterGraph
Definition: ffmpeg_sched.c:247
SchMuxStream::src_sched
SchedulerNode src_sched
Definition: ffmpeg_sched.c:192
avassert.h
pkt
AVPacket * pkt
Definition: movenc.c:60
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:209
sch_free
void sch_free(Scheduler **psch)
Definition: ffmpeg_sched.c:467
Scheduler::state
enum SchedulerState state
Definition: ffmpeg_sched.c:308
SchMux::streams
SchMuxStream * streams
Definition: ffmpeg_sched.c:216
av_thread_message_queue_send
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
Definition: threadmessage.c:161
av_fifo_read
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
Definition: fifo.c:240
SchMuxStream
Definition: ffmpeg_sched.c:190
SchDecOutput
Definition: ffmpeg_sched.c:74
sch_add_mux
int sch_add_mux(Scheduler *sch, SchThreadFunc func, int(*init)(void *), void *arg, int sdp_auto, unsigned thread_queue_size)
Add a muxer to the scheduler.
Definition: ffmpeg_sched.c:632
waiter_set
static void waiter_set(SchWaiter *w, int choked)
Definition: ffmpeg_sched.c:341
SchFilterGraph::nb_outputs
unsigned nb_outputs
Definition: ffmpeg_sched.c:256
SchDec::expect_end_ts
int expect_end_ts
Definition: ffmpeg_sched.c:94
enc_open
static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
Definition: ffmpeg_sched.c:1690
sch_alloc
Scheduler * sch_alloc(void)
Definition: ffmpeg_sched.c:585
dec_send_to_dst
static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVFrame *frame)
Definition: ffmpeg_sched.c:2211
task_init
static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx, SchThreadFunc func, void *func_arg)
Definition: ffmpeg_sched.c:433
SchMuxStream::nb_sub_heartbeat_dst
unsigned nb_sub_heartbeat_dst
Definition: ffmpeg_sched.c:195
op
static int op(uint8_t **dst, const uint8_t *dst_end, GetByteContext *gb, int pixel, int count, int *x, int width, int linesize)
Perform decode operation.
Definition: anm.c:76
SchEnc::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:113
sch_add_demux_stream
int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
Add a demuxed stream for a previously added demuxer.
Definition: ffmpeg_sched.c:715
SchMuxStream::src
SchedulerNode src
Definition: ffmpeg_sched.c:191
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:40
SchedulerNodeType
SchedulerNodeType
Definition: ffmpeg_sched.h:93
sch_dec_send
int sch_dec_send(Scheduler *sch, unsigned dec_idx, unsigned out_idx, AVFrame *frame)
Called by decoder tasks to send a decoded frame downstream.
Definition: ffmpeg_sched.c:2241
ctx
AVFormatContext * ctx
Definition: movenc.c:49
nb_streams
static int nb_streams
Definition: ffprobe.c:384
SchMuxStream::source_finished
int source_finished
Definition: ffmpeg_sched.c:209
av_rescale_q
int64_t av_rescale_q(int64_t a, AVRational bq, AVRational cq)
Rescale a 64-bit integer by 2 rational numbers.
Definition: mathematics.c:142
ffmpeg_utils.h
filter_done
static int filter_done(Scheduler *sch, unsigned fg_idx)
Definition: ffmpeg_sched.c:2484
SchFilterGraph::class
const AVClass * class
Definition: ffmpeg_sched.c:248
sch_enc_receive
int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
Called by encoder tasks to obtain frames for encoding.
Definition: ffmpeg_sched.c:2310
AVThreadMessageQueue
Definition: threadmessage.c:30
atomic_load
#define atomic_load(object)
Definition: stdatomic.h:93
objpool_alloc_frames
ObjPool * objpool_alloc_frames(void)
Definition: objpool.c:128
sch_sdp_filename
int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
Set the file path for the SDP.
Definition: ffmpeg_sched.c:619
SchEnc::class
const AVClass * class
Definition: ffmpeg_sched.c:110
demux_send_for_stream
static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:1961
arg
const char * arg
Definition: jacosubdec.c:67
pthread_create
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
SchMuxStream::pre_mux_queue
PreMuxQueue pre_mux_queue
Definition: ffmpeg_sched.c:197
sq_add_stream
int sq_add_stream(SyncQueue *sq, int limiting)
Add a new stream to the sync queue.
Definition: sync_queue.c:620
SchMux::mux_started
atomic_int mux_started
Set to 1 after starting the muxer task and flushing the pre-muxing queues.
Definition: ffmpeg_sched.c:229
Scheduler::finish_cond
pthread_cond_t finish_cond
Definition: ffmpeg_sched.c:290
SCH_NODE_TYPE_DEMUX
@ SCH_NODE_TYPE_DEMUX
Definition: ffmpeg_sched.h:95
Scheduler::demux
SchDemux * demux
Definition: ffmpeg_sched.c:278
pkt_move
static void pkt_move(void *dst, void *src)
Definition: ffmpeg_utils.h:47
AVPacket::buf
AVBufferRef * buf
A reference to the reference-counted buffer where the packet data is stored.
Definition: packet.h:522
tq_free
void tq_free(ThreadQueue **ptq)
Definition: thread_queue.c:55
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
waiter_uninit
static void waiter_uninit(SchWaiter *w)
Definition: ffmpeg_sched.c:368
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:75
Scheduler::sdp_filename
char * sdp_filename
Definition: ffmpeg_sched.c:305
send_to_enc_sq
static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1735
NULL
#define NULL
Definition: coverity.c:32
SchEnc::open_cb
int(* open_cb)(void *opaque, const AVFrame *frame)
Definition: ffmpeg_sched.c:137
av_frame_copy_props
int av_frame_copy_props(AVFrame *dst, const AVFrame *src)
Copy only "metadata" fields from src to dst.
Definition: frame.c:725
SchFilterGraph::nb_inputs
unsigned nb_inputs
Definition: ffmpeg_sched.c:251
Scheduler::mux
SchMux * mux
Definition: ffmpeg_sched.c:281
schedule_update_locked
static void schedule_update_locked(Scheduler *sch)
Definition: ffmpeg_sched.c:1300
tq_receive_finish
void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the receiving side.
Definition: thread_queue.c:241
sch_add_enc
int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, int(*open_cb)(void *opaque, const AVFrame *frame))
Definition: ffmpeg_sched.c:791
SchSyncQueue::enc_idx
unsigned * enc_idx
Definition: ffmpeg_sched.c:105
SCH_STATE_STARTED
@ SCH_STATE_STARTED
Definition: ffmpeg_sched.c:271
dec_done
static int dec_done(Scheduler *sch, unsigned dec_idx)
Definition: ffmpeg_sched.c:2285
SchFilterGraph::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:261
SchDemux::class
const AVClass * class
Definition: ffmpeg_sched.c:157
av_fifo_can_read
size_t av_fifo_can_read(const AVFifo *f)
Definition: fifo.c:87
SchEnc::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:114
sch_add_dec
int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
Add a decoder to the scheduler.
Definition: ffmpeg_sched.c:748
SchWaiter::choked
atomic_int choked
Definition: ffmpeg_sched.c:55
SchWaiter::cond
pthread_cond_t cond
Definition: ffmpeg_sched.c:54
CYCLE_NODE_NEW
@ CYCLE_NODE_NEW
Definition: ffmpeg_sched.c:1367
time.h
DEMUX_SEND_STREAMCOPY_EOF
@ DEMUX_SEND_STREAMCOPY_EOF
Treat the packet as an EOF for SCH_NODE_TYPE_MUX destinations send normally to other types.
Definition: ffmpeg_sched.h:338
sch_fg_class
static const AVClass sch_fg_class
Definition: ffmpeg_sched.c:823
QUEUE_FRAMES
@ QUEUE_FRAMES
Definition: ffmpeg_sched.c:49
av_packet_ref
int av_packet_ref(AVPacket *dst, const AVPacket *src)
Setup a new reference to the data described by a given packet.
Definition: packet.c:437
av_packet_move_ref
void av_packet_move_ref(AVPacket *dst, AVPacket *src)
Move every field in src to dst and reset src.
Definition: packet.c:486
SchTask::thread_running
int thread_running
Definition: ffmpeg_sched.c:71
sch_enc_send
int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
Called by encoder tasks to send encoded packets downstream.
Definition: ffmpeg_sched.c:2354
pthread_mutex_unlock
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:82
error.h
Scheduler
Definition: ffmpeg_sched.c:275
SchMux::nb_streams
unsigned nb_streams
Definition: ffmpeg_sched.c:217
SchSyncQueue::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:103
SchMuxStream::sub_heartbeat_dst
unsigned * sub_heartbeat_dst
Definition: ffmpeg_sched.c:194
SchDec::class
const AVClass * class
Definition: ffmpeg_sched.c:81
sq_frame_samples
void sq_frame_samples(SyncQueue *sq, unsigned int stream_idx, int frame_samples)
Set a constant output audio frame size, in samples.
Definition: sync_queue.c:661
SchEnc::in_finished
int in_finished
Definition: ffmpeg_sched.c:144
SchDemux::task
SchTask task
Definition: ffmpeg_sched.c:162
SchDemuxStream::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:153
SchFilterGraph::nb_inputs_finished_receive
unsigned nb_inputs_finished_receive
Definition: ffmpeg_sched.c:253
tq_send
int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
Send an item for the given stream to the queue.
Definition: thread_queue.c:120
init
int(* init)(AVBSFContext *ctx)
Definition: dts2pts.c:368
AVPacket::size
int size
Definition: packet.h:540
AVFifo
Definition: fifo.c:35
SchSyncQueue::nb_enc_idx
unsigned nb_enc_idx
Definition: ffmpeg_sched.c:106
SchFilterGraph::task_exited
int task_exited
Definition: ffmpeg_sched.c:266
av_frame_ref
int av_frame_ref(AVFrame *dst, const AVFrame *src)
Set up a new reference to the data described by the source frame.
Definition: frame.c:400
threadmessage.h
dst
uint8_t ptrdiff_t const uint8_t ptrdiff_t int intptr_t intptr_t int int16_t * dst
Definition: dsp.h:83
PreMuxQueue::max_packets
int max_packets
Maximum number of packets in fifo.
Definition: ffmpeg_sched.c:180
SchFilterGraph::task
SchTask task
Definition: ffmpeg_sched.c:258
av_err2str
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
Definition: error.h:122
SchWaiter::lock
pthread_mutex_t lock
Definition: ffmpeg_sched.c:53
sq_send
int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
Submit a frame for the stream with index stream_idx.
Definition: sync_queue.c:343
PreMuxQueue::data_threshold
size_t data_threshold
Definition: ffmpeg_sched.c:187
sq_free
void sq_free(SyncQueue **psq)
Definition: sync_queue.c:699
AV_NOPTS_VALUE
#define AV_NOPTS_VALUE
Undefined timestamp value.
Definition: avutil.h:248
sch_dec_class
static const AVClass sch_dec_class
Definition: ffmpeg_sched.c:742
SchFilterGraph::inputs
SchFilterIn * inputs
Definition: ffmpeg_sched.c:250
Scheduler::schedule_lock
pthread_mutex_t schedule_lock
Definition: ffmpeg_sched.c:311
frame.h
SchTask::func_arg
void * func_arg
Definition: ffmpeg_sched.c:68
SCH_NODE_TYPE_FILTER_OUT
@ SCH_NODE_TYPE_FILTER_OUT
Definition: ffmpeg_sched.h:100
AVPacket::dts
int64_t dts
Decompression timestamp in AVStream->time_base units; the time at which the packet is decompressed.
Definition: packet.h:538
ObjPool
Definition: objpool.c:30
enc_send_to_dst
static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt)
Definition: ffmpeg_sched.c:2324
CYCLE_NODE_DONE
@ CYCLE_NODE_DONE
Definition: ffmpeg_sched.c:1369
sch_mux_class
static const AVClass sch_mux_class
Definition: ffmpeg_sched.c:626
SchFilterIn
Definition: ffmpeg_sched.c:236
sch_filter_receive
int sch_filter_receive(Scheduler *sch, unsigned fg_idx, unsigned *in_idx, AVFrame *frame)
Called by filtergraph tasks to obtain frames for filtering.
Definition: ffmpeg_sched.c:2403
Scheduler::nb_mux
unsigned nb_mux
Definition: ffmpeg_sched.c:282
av_packet_alloc
AVPacket * av_packet_alloc(void)
Allocate an AVPacket and set its fields to default values.
Definition: packet.c:63
tq_alloc
ThreadQueue * tq_alloc(unsigned int nb_streams, size_t queue_size, ObjPool *obj_pool, void(*obj_move)(void *dst, void *src))
Allocate a queue for sending data between threads.
Definition: thread_queue.c:79
SchEnc::opened
int opened
Definition: ffmpeg_sched.c:138
scheduler_class
static const AVClass scheduler_class
Definition: ffmpeg_sched.c:580
pthread_t
Definition: os2threads.h:44
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
Scheduler::nb_demux
unsigned nb_demux
Definition: ffmpeg_sched.c:279
av_thread_message_queue_alloc
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:45
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
av_packet_copy_props
int av_packet_copy_props(AVPacket *dst, const AVPacket *src)
Copy only "properties" fields from src to dst.
Definition: packet.c:392
SchDemuxStream::dst_finished
uint8_t * dst_finished
Definition: ffmpeg_sched.c:152
SchDemux::task_exited
int task_exited
Definition: ffmpeg_sched.c:169
SCH_NODE_TYPE_FILTER_IN
@ SCH_NODE_TYPE_FILTER_IN
Definition: ffmpeg_sched.h:99
task_start
static int task_start(SchTask *task)
Definition: ffmpeg_sched.c:414
Scheduler::filters
SchFilterGraph * filters
Definition: ffmpeg_sched.c:302
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:256
AVPacket::pts
int64_t pts
Presentation timestamp in AVStream->time_base units; the time at which the decompressed packet will b...
Definition: packet.h:532
demux_done
static int demux_done(Scheduler *sch, unsigned demux_idx)
Definition: ffmpeg_sched.c:2059
packet.h
SchWaiter::choked_next
int choked_next
Definition: ffmpeg_sched.c:60
SchFilterGraph::best_input
unsigned best_input
Definition: ffmpeg_sched.c:265
av_malloc_array
#define av_malloc_array(a, b)
Definition: tableprint_vlc.h:31
Scheduler::mux_ready_lock
pthread_mutex_t mux_ready_lock
Definition: ffmpeg_sched.c:285
Scheduler::terminate
atomic_int terminate
Definition: ffmpeg_sched.c:309
SchDec
Definition: ffmpeg_sched.c:80
DEFAULT_PACKET_THREAD_QUEUE_SIZE
#define DEFAULT_PACKET_THREAD_QUEUE_SIZE
Default size of a packet thread queue.
Definition: ffmpeg_sched.h:255
QueueType
QueueType
Definition: ffmpeg_sched.c:47
FFMIN
#define FFMIN(a, b)
Definition: macros.h:49
SchDec::nb_outputs
unsigned nb_outputs
Definition: ffmpeg_sched.c:86
av_frame_unref
void av_frame_unref(AVFrame *frame)
Unreference all the buffers referenced by frame and reset the frame fields.
Definition: frame.c:622
trailing_dts
static int64_t trailing_dts(const Scheduler *sch, int count_finished)
Definition: ffmpeg_sched.c:445
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:256
SchFilterGraph::outputs
SchFilterOut * outputs
Definition: ffmpeg_sched.c:255
sch_enc_class
static const AVClass sch_enc_class
Definition: ffmpeg_sched.c:785
SchedulerNode
Definition: ffmpeg_sched.h:103
SCH_NODE_TYPE_DEC
@ SCH_NODE_TYPE_DEC
Definition: ffmpeg_sched.h:97
pthread_cond_t
Definition: os2threads.h:58
SchTask
Definition: ffmpeg_sched.c:63
mux_init
static int mux_init(Scheduler *sch, SchMux *mux)
Definition: ffmpeg_sched.c:1169
send_to_filter
static int send_to_filter(Scheduler *sch, SchFilterGraph *fg, unsigned in_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2194
tq_receive
int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
Read the next item from the queue.
Definition: thread_queue.c:196
SchDemuxStream::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:151
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:264
sch_connect
int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
Definition: ffmpeg_sched.c:927
send_to_enc
static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
Definition: ffmpeg_sched.c:1811
sch_filter_command
int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
Definition: ffmpeg_sched.c:2513
SchDemuxStream
Definition: ffmpeg_sched.c:150
Timestamp::tb
AVRational tb
Definition: ffmpeg_utils.h:32
atomic_int_least64_t
intptr_t atomic_int_least64_t
Definition: stdatomic.h:68
SchFilterIn::src_sched
SchedulerNode src_sched
Definition: ffmpeg_sched.c:238
ret
ret
Definition: filter_design.txt:187
sch_dec_receive
int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
Called by decoder tasks to receive a packet for decoding.
Definition: ffmpeg_sched.c:2165
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:80
frame
these buffered frames must be flushed immediately if a new input produces new the filter must not call request_frame to get more It must just process the frame or queue it The task of requesting more frames is left to the filter s request_frame method or the application If a filter has several the filter must be ready for frames arriving randomly on any input any filter with several inputs will most likely require some kind of queuing mechanism It is perfectly acceptable to have a limited queue and to drop frames when the inputs are too unbalanced request_frame For filters that do not use the this method is called when a frame is wanted on an output For a it should directly call filter_frame on the corresponding output For a if there are queued frames already one of these frames should be pushed If the filter should request a frame on one of its repeatedly until at least one frame has been pushed Return or at least make progress towards producing a frame
Definition: filter_design.txt:264
SchMuxStream::init_eof
int init_eof
Definition: ffmpeg_sched.c:200
mux_queue_packet
static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
Definition: ffmpeg_sched.c:1831
SchMux::init
int(* init)(void *arg)
Definition: ffmpeg_sched.c:220
sch_demux_class
static const AVClass sch_demux_class
Definition: ffmpeg_sched.c:682
ThreadQueue
Definition: thread_queue.c:42
av_fifo_alloc2
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
Definition: fifo.c:47
pthread_cond_signal
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
task_wrapper
static void * task_wrapper(void *arg)
Definition: ffmpeg_sched.c:2535
SchMux::task
SchTask task
Definition: ffmpeg_sched.c:222
SyncQueue
A sync queue provides timestamp synchronization between multiple streams.
Definition: sync_queue.c:88
sch_demux_send
int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt, unsigned flags)
Called by demuxer tasks to communicate with their downstreams.
Definition: ffmpeg_sched.c:2037
SchDemux
Definition: ffmpeg_sched.c:156
Scheduler::dec
SchDec * dec
Definition: ffmpeg_sched.c:293
atomic_uint
intptr_t atomic_uint
Definition: stdatomic.h:56
SchDec::queue_end_ts
AVThreadMessageQueue * queue_end_ts
Definition: ffmpeg_sched.c:93
demux_stream_send_to_dst
static int demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
Definition: ffmpeg_sched.c:1926
SchDec::src
SchedulerNode src
Definition: ffmpeg_sched.c:83
thread_queue.h
AVPacket::stream_index
int stream_index
Definition: packet.h:541
GROW_ARRAY
#define GROW_ARRAY(array, nb_elems)
Definition: cmdutils.h:532
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
SchMux::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:230
SchDemux::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:163
av_gettime
int64_t av_gettime(void)
Get the current time in microseconds.
Definition: time.c:39
waiter_wait
static int waiter_wait(Scheduler *sch, SchWaiter *w)
Wait until this task is allowed to proceed.
Definition: ffmpeg_sched.c:322
av_strdup
char * av_strdup(const char *s)
Duplicate a string.
Definition: mem.c:272
SchSyncQueue::frame
AVFrame * frame
Definition: ffmpeg_sched.c:102
Scheduler::task_failed
unsigned task_failed
Definition: ffmpeg_sched.c:288
SchTask::node
SchedulerNode node
Definition: ffmpeg_sched.c:65
sch_sq_add_enc
int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, int limiting, uint64_t max_frames)
Definition: ffmpeg_sched.c:896
print_sdp
int print_sdp(const char *filename)
Definition: ffmpeg_mux.c:507
mem.h
start_prepare
static int start_prepare(Scheduler *sch)
Definition: ffmpeg_sched.c:1454
sch_mux_sub_heartbeat_add
int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, unsigned dec_idx)
Definition: ffmpeg_sched.c:1246
SchedulerNode::idx
unsigned idx
Definition: ffmpeg_sched.h:105
sch_filter_receive_finish
void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
Called by filter tasks to signal that a filter input will no longer accept input.
Definition: ffmpeg_sched.c:2447
ffmpeg_sched.h
sch_add_dec_output
int sch_add_dec_output(Scheduler *sch, unsigned dec_idx)
Add another output to decoder (e.g.
Definition: ffmpeg_sched.c:727
SchEnc::src
SchedulerNode src
Definition: ffmpeg_sched.c:112
sch_wait
int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
Wait until transcoding terminates or the specified timeout elapses.
Definition: ffmpeg_sched.c:1665
AVPacket
This structure stores compressed data.
Definition: packet.h:516
av_thread_message_queue_free
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:96
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:34
cmdutils.h
SchSyncQueue
Definition: ffmpeg_sched.c:100
SchMux::queue_size
unsigned queue_size
Definition: ffmpeg_sched.c:231
SchTask::parent
Scheduler * parent
Definition: ffmpeg_sched.c:64
SchDec::send_frame
AVFrame * send_frame
Definition: ffmpeg_sched.c:97
queue_alloc
static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size, enum QueueType type)
Definition: ffmpeg_sched.c:374
sch_start
int sch_start(Scheduler *sch)
Definition: ffmpeg_sched.c:1599
flags
#define flags(name, subs,...)
Definition: cbs_av1.c:482
av_thread_message_queue_set_err_recv
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
Definition: threadmessage.c:204
pthread_cond_timedwait
static av_always_inline int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
Definition: os2threads.h:170
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:27
sch_mux_sub_heartbeat
int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, const AVPacket *pkt)
Definition: ffmpeg_sched.c:2112
av_fifo_freep2
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
Definition: fifo.c:286
SchDecOutput::dst
SchedulerNode * dst
Definition: ffmpeg_sched.c:75
SchEnc::queue
ThreadQueue * queue
Definition: ffmpeg_sched.c:142
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
AVERROR_EXIT
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:58
SYNC_QUEUE_FRAMES
@ SYNC_QUEUE_FRAMES
Definition: sync_queue.h:30
sq_alloc
SyncQueue * sq_alloc(enum SyncQueueType type, int64_t buf_size_us, void *logctx)
Allocate a sync queue of the given type.
Definition: sync_queue.c:675
atomic_init
#define atomic_init(obj, value)
Definition: stdatomic.h:33
SchEnc::task
SchTask task
Definition: ffmpeg_sched.c:140
Timestamp
Definition: ffmpeg_utils.h:30
SchFilterIn::src
SchedulerNode src
Definition: ffmpeg_sched.c:237
sch_mux_stream_buffering
void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, size_t data_threshold, int max_packets)
Configure limits on packet buffering performed before the muxer task is started.
Definition: ffmpeg_sched.c:1205
check_acyclic
static int check_acyclic(Scheduler *sch)
Definition: ffmpeg_sched.c:1418
SchDemux::streams
SchDemuxStream * streams
Definition: ffmpeg_sched.c:159
PreMuxQueue
Definition: ffmpeg_sched.c:172
Scheduler::sdp_auto
int sdp_auto
Definition: ffmpeg_sched.c:306
src
#define src
Definition: vp8dsp.c:248
SchFilterIn::send_finished
int send_finished
Definition: ffmpeg_sched.c:239
SchFilterGraph::waiter
SchWaiter waiter
Definition: ffmpeg_sched.c:262
AVPacket::time_base
AVRational time_base
Time base of the packet's timestamps.
Definition: packet.h:583
unchoke_for_stream
static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
Definition: ffmpeg_sched.c:1275
AVPacket::side_data_elems
int side_data_elems
Definition: packet.h:551
sch_mux_receive
int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
Called by muxer tasks to obtain packets for muxing.
Definition: ffmpeg_sched.c:2081
sch_add_sq_enc
int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
Add an pre-encoding sync queue to the scheduler.
Definition: ffmpeg_sched.c:871
pthread_mutex_lock
#define pthread_mutex_lock(a)
Definition: ffprobe.c:78
SchDecOutput::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:77
SchEnc::nb_dst
unsigned nb_dst
Definition: ffmpeg_sched.c:115
tq_send_finish
void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the sending side.
Definition: thread_queue.c:226
Scheduler::nb_filters
unsigned nb_filters
Definition: ffmpeg_sched.c:303