FFmpeg
slicethread.c
Go to the documentation of this file.
1 /*
2  * This file is part of FFmpeg.
3  *
4  * FFmpeg is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * FFmpeg is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with FFmpeg; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
19 #include <stdatomic.h>
20 #include "cpu.h"
21 #include "slicethread.h"
22 #include "mem.h"
23 #include "thread.h"
24 #include "avassert.h"
25 
26 #if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
27 
28 typedef struct WorkerContext {
32  pthread_t thread;
33  int done;
34 } WorkerContext;
35 
36 struct AVSliceThread {
37  WorkerContext *workers;
38  int nb_threads;
39  int nb_active_threads;
40  int nb_jobs;
41 
42  atomic_uint first_job;
43  atomic_uint current_job;
44  pthread_mutex_t done_mutex;
45  pthread_cond_t done_cond;
46  int done;
47  int finished;
48 
49  void *priv;
50  void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
51  void (*main_func)(void *priv);
52 };
53 
54 static int run_jobs(AVSliceThread *ctx)
55 {
56  unsigned nb_jobs = ctx->nb_jobs;
57  unsigned nb_active_threads = ctx->nb_active_threads;
58  unsigned first_job = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
59  unsigned current_job = first_job;
60 
61  do {
62  ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
63  } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
64 
65  return current_job == nb_jobs + nb_active_threads - 1;
66 }
67 
68 static void *attribute_align_arg thread_worker(void *v)
69 {
70  WorkerContext *w = v;
71  AVSliceThread *ctx = w->ctx;
72 
73  pthread_mutex_lock(&w->mutex);
74  pthread_cond_signal(&w->cond);
75 
76  while (1) {
77  w->done = 1;
78  while (w->done)
79  pthread_cond_wait(&w->cond, &w->mutex);
80 
81  if (ctx->finished) {
82  pthread_mutex_unlock(&w->mutex);
83  return NULL;
84  }
85 
86  if (run_jobs(ctx)) {
87  pthread_mutex_lock(&ctx->done_mutex);
88  ctx->done = 1;
89  pthread_cond_signal(&ctx->done_cond);
90  pthread_mutex_unlock(&ctx->done_mutex);
91  }
92  }
93 }
94 
95 int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
96  void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
97  void (*main_func)(void *priv),
98  int nb_threads)
99 {
101  int nb_workers, i;
102 
103  av_assert0(nb_threads >= 0);
104  if (!nb_threads) {
105  int nb_cpus = av_cpu_count();
106  if (nb_cpus > 1)
107  nb_threads = nb_cpus + 1;
108  else
109  nb_threads = 1;
110  }
111 
112  nb_workers = nb_threads;
113  if (!main_func)
114  nb_workers--;
115 
116  *pctx = ctx = av_mallocz(sizeof(*ctx));
117  if (!ctx)
118  return AVERROR(ENOMEM);
119 
120  if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
121  av_freep(pctx);
122  return AVERROR(ENOMEM);
123  }
124 
125  ctx->priv = priv;
126  ctx->worker_func = worker_func;
127  ctx->main_func = main_func;
128  ctx->nb_threads = nb_threads;
129  ctx->nb_active_threads = 0;
130  ctx->nb_jobs = 0;
131  ctx->finished = 0;
132 
133  atomic_init(&ctx->first_job, 0);
134  atomic_init(&ctx->current_job, 0);
135  pthread_mutex_init(&ctx->done_mutex, NULL);
136  pthread_cond_init(&ctx->done_cond, NULL);
137  ctx->done = 0;
138 
139  for (i = 0; i < nb_workers; i++) {
140  WorkerContext *w = &ctx->workers[i];
141  int ret;
142  w->ctx = ctx;
143  pthread_mutex_init(&w->mutex, NULL);
144  pthread_cond_init(&w->cond, NULL);
145  pthread_mutex_lock(&w->mutex);
146  w->done = 0;
147 
148  if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
149  ctx->nb_threads = main_func ? i : i + 1;
150  pthread_mutex_unlock(&w->mutex);
151  pthread_cond_destroy(&w->cond);
152  pthread_mutex_destroy(&w->mutex);
154  return AVERROR(ret);
155  }
156 
157  while (!w->done)
158  pthread_cond_wait(&w->cond, &w->mutex);
159  pthread_mutex_unlock(&w->mutex);
160  }
161 
162  return nb_threads;
163 }
164 
165 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
166 {
167  int nb_workers, i, is_last = 0;
168 
169  av_assert0(nb_jobs > 0);
170  ctx->nb_jobs = nb_jobs;
171  ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
172  atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
173  atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
174  nb_workers = ctx->nb_active_threads;
175  if (!ctx->main_func || !execute_main)
176  nb_workers--;
177 
178  for (i = 0; i < nb_workers; i++) {
179  WorkerContext *w = &ctx->workers[i];
180  pthread_mutex_lock(&w->mutex);
181  w->done = 0;
182  pthread_cond_signal(&w->cond);
183  pthread_mutex_unlock(&w->mutex);
184  }
185 
186  if (ctx->main_func && execute_main)
187  ctx->main_func(ctx->priv);
188  else
189  is_last = run_jobs(ctx);
190 
191  if (!is_last) {
192  pthread_mutex_lock(&ctx->done_mutex);
193  while (!ctx->done)
194  pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
195  ctx->done = 0;
196  pthread_mutex_unlock(&ctx->done_mutex);
197  }
198 }
199 
201 {
203  int nb_workers, i;
204 
205  if (!pctx || !*pctx)
206  return;
207 
208  ctx = *pctx;
209  nb_workers = ctx->nb_threads;
210  if (!ctx->main_func)
211  nb_workers--;
212 
213  ctx->finished = 1;
214  for (i = 0; i < nb_workers; i++) {
215  WorkerContext *w = &ctx->workers[i];
216  pthread_mutex_lock(&w->mutex);
217  w->done = 0;
218  pthread_cond_signal(&w->cond);
219  pthread_mutex_unlock(&w->mutex);
220  }
221 
222  for (i = 0; i < nb_workers; i++) {
223  WorkerContext *w = &ctx->workers[i];
224  pthread_join(w->thread, NULL);
225  pthread_cond_destroy(&w->cond);
226  pthread_mutex_destroy(&w->mutex);
227  }
228 
229  pthread_cond_destroy(&ctx->done_cond);
230  pthread_mutex_destroy(&ctx->done_mutex);
231  av_freep(&ctx->workers);
232  av_freep(pctx);
233 }
234 
235 #else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
236 
238  void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
239  void (*main_func)(void *priv),
240  int nb_threads)
241 {
242  *pctx = NULL;
243  return AVERROR(ENOSYS);
244 }
245 
246 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
247 {
248  av_assert0(0);
249 }
250 
252 {
253  av_assert0(!pctx || !*pctx);
254 }
255 
256 #endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
pthread_join
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
thread.h
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
avpriv_slicethread_execute
void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
Execute slice threading.
Definition: slicethread.c:246
w
uint8_t w
Definition: llviddspenc.c:38
AVSliceThread
struct AVSliceThread AVSliceThread
Definition: slicethread.h:22
avpriv_slicethread_create
int avpriv_slicethread_create(AVSliceThread **pctx, void *priv, void(*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads), void(*main_func)(void *priv), int nb_threads)
Create slice threading context.
Definition: slicethread.c:237
avassert.h
av_assert0
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:37
ctx
AVFormatContext * ctx
Definition: movenc.c:48
pthread_create
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
NULL
#define NULL
Definition: coverity.c:32
worker_func
static void worker_func(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads)
Definition: pthread_slice.c:65
pthread_mutex_unlock
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:68
av_cpu_count
int av_cpu_count(void)
Definition: cpu.c:191
cpu.h
atomic_fetch_add_explicit
#define atomic_fetch_add_explicit(object, operand, order)
Definition: stdatomic.h:149
pthread_t
Definition: os2threads.h:44
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
slicethread.h
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:271
atomic_store_explicit
#define atomic_store_explicit(object, desired, order)
Definition: stdatomic.h:90
FFMIN
#define FFMIN(a, b)
Definition: macros.h:49
main_func
int() main_func(AVCodecContext *c)
Definition: pthread_slice.c:41
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:263
pthread_cond_t
Definition: os2threads.h:58
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:271
ret
ret
Definition: filter_design.txt:187
pthread_cond_signal
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
atomic_uint
intptr_t atomic_uint
Definition: stdatomic.h:56
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
mem.h
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:35
avpriv_slicethread_free
void avpriv_slicethread_free(AVSliceThread **pctx)
Destroy slice threading context.
Definition: slicethread.c:251
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
atomic_init
#define atomic_init(obj, value)
Definition: stdatomic.h:33
cond
int(* cond)(enum AVPixelFormat pix_fmt)
Definition: pixdesc_query.c:28
mutex
static AVMutex mutex
Definition: log.c:44
pthread_mutex_lock
#define pthread_mutex_lock(a)
Definition: ffprobe.c:64