FFmpeg
threadmessage.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014 Nicolas George
3  *
4  * This file is part of FFmpeg.
5  *
6  * FFmpeg is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public License
8  * as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * FFmpeg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License
17  * along with FFmpeg; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include "fifo.h"
22 #include "threadmessage.h"
23 #include "thread.h"
24 
26 #if HAVE_THREADS
27  AVFifoBuffer *fifo;
29  pthread_cond_t cond_recv;
30  pthread_cond_t cond_send;
31  int err_send;
32  int err_recv;
33  unsigned elsize;
34  void (*free_func)(void *msg);
35 #else
36  int dummy;
37 #endif
38 };
39 
41  unsigned nelem,
42  unsigned elsize)
43 {
44 #if HAVE_THREADS
46  int ret = 0;
47 
48  if (nelem > INT_MAX / elsize)
49  return AVERROR(EINVAL);
50  if (!(rmq = av_mallocz(sizeof(*rmq))))
51  return AVERROR(ENOMEM);
52  if ((ret = pthread_mutex_init(&rmq->lock, NULL))) {
53  av_free(rmq);
54  return AVERROR(ret);
55  }
56  if ((ret = pthread_cond_init(&rmq->cond_recv, NULL))) {
57  pthread_mutex_destroy(&rmq->lock);
58  av_free(rmq);
59  return AVERROR(ret);
60  }
61  if ((ret = pthread_cond_init(&rmq->cond_send, NULL))) {
62  pthread_cond_destroy(&rmq->cond_recv);
63  pthread_mutex_destroy(&rmq->lock);
64  av_free(rmq);
65  return AVERROR(ret);
66  }
67  if (!(rmq->fifo = av_fifo_alloc(elsize * nelem))) {
68  pthread_cond_destroy(&rmq->cond_send);
69  pthread_cond_destroy(&rmq->cond_recv);
70  pthread_mutex_destroy(&rmq->lock);
71  av_free(rmq);
72  return AVERROR(ENOMEM);
73  }
74  rmq->elsize = elsize;
75  *mq = rmq;
76  return 0;
77 #else
78  *mq = NULL;
79  return AVERROR(ENOSYS);
80 #endif /* HAVE_THREADS */
81 }
82 
84  void (*free_func)(void *msg))
85 {
86 #if HAVE_THREADS
87  mq->free_func = free_func;
88 #endif
89 }
90 
92 {
93 #if HAVE_THREADS
94  if (*mq) {
96  av_fifo_freep(&(*mq)->fifo);
97  pthread_cond_destroy(&(*mq)->cond_send);
98  pthread_cond_destroy(&(*mq)->cond_recv);
99  pthread_mutex_destroy(&(*mq)->lock);
100  av_freep(mq);
101  }
102 #endif
103 }
104 
106 {
107 #if HAVE_THREADS
108  int ret;
109  pthread_mutex_lock(&mq->lock);
110  ret = av_fifo_size(mq->fifo);
111  pthread_mutex_unlock(&mq->lock);
112  return ret / mq->elsize;
113 #else
114  return AVERROR(ENOSYS);
115 #endif
116 }
117 
118 #if HAVE_THREADS
119 
120 static int av_thread_message_queue_send_locked(AVThreadMessageQueue *mq,
121  void *msg,
122  unsigned flags)
123 {
124  while (!mq->err_send && av_fifo_space(mq->fifo) < mq->elsize) {
126  return AVERROR(EAGAIN);
127  pthread_cond_wait(&mq->cond_send, &mq->lock);
128  }
129  if (mq->err_send)
130  return mq->err_send;
131  av_fifo_generic_write(mq->fifo, msg, mq->elsize, NULL);
132  /* one message is sent, signal one receiver */
133  pthread_cond_signal(&mq->cond_recv);
134  return 0;
135 }
136 
137 static int av_thread_message_queue_recv_locked(AVThreadMessageQueue *mq,
138  void *msg,
139  unsigned flags)
140 {
141  while (!mq->err_recv && av_fifo_size(mq->fifo) < mq->elsize) {
143  return AVERROR(EAGAIN);
144  pthread_cond_wait(&mq->cond_recv, &mq->lock);
145  }
146  if (av_fifo_size(mq->fifo) < mq->elsize)
147  return mq->err_recv;
148  av_fifo_generic_read(mq->fifo, msg, mq->elsize, NULL);
149  /* one message space appeared, signal one sender */
150  pthread_cond_signal(&mq->cond_send);
151  return 0;
152 }
153 
154 #endif /* HAVE_THREADS */
155 
157  void *msg,
158  unsigned flags)
159 {
160 #if HAVE_THREADS
161  int ret;
162 
163  pthread_mutex_lock(&mq->lock);
164  ret = av_thread_message_queue_send_locked(mq, msg, flags);
165  pthread_mutex_unlock(&mq->lock);
166  return ret;
167 #else
168  return AVERROR(ENOSYS);
169 #endif /* HAVE_THREADS */
170 }
171 
173  void *msg,
174  unsigned flags)
175 {
176 #if HAVE_THREADS
177  int ret;
178 
179  pthread_mutex_lock(&mq->lock);
180  ret = av_thread_message_queue_recv_locked(mq, msg, flags);
181  pthread_mutex_unlock(&mq->lock);
182  return ret;
183 #else
184  return AVERROR(ENOSYS);
185 #endif /* HAVE_THREADS */
186 }
187 
189  int err)
190 {
191 #if HAVE_THREADS
192  pthread_mutex_lock(&mq->lock);
193  mq->err_send = err;
194  pthread_cond_broadcast(&mq->cond_send);
195  pthread_mutex_unlock(&mq->lock);
196 #endif /* HAVE_THREADS */
197 }
198 
200  int err)
201 {
202 #if HAVE_THREADS
203  pthread_mutex_lock(&mq->lock);
204  mq->err_recv = err;
205  pthread_cond_broadcast(&mq->cond_recv);
206  pthread_mutex_unlock(&mq->lock);
207 #endif /* HAVE_THREADS */
208 }
209 
210 #if HAVE_THREADS
211 static void free_func_wrap(void *arg, void *msg, int size)
212 {
214  mq->free_func(msg);
215 }
216 #endif
217 
219 {
220 #if HAVE_THREADS
221  int used, off;
222  void *free_func = mq->free_func;
223 
224  pthread_mutex_lock(&mq->lock);
225  used = av_fifo_size(mq->fifo);
226  if (free_func)
227  for (off = 0; off < used; off += mq->elsize)
228  av_fifo_generic_peek_at(mq->fifo, mq, off, mq->elsize, free_func_wrap);
229  av_fifo_drain(mq->fifo, used);
230  /* only the senders need to be notified since the queue is empty and there
231  * is nothing to read */
232  pthread_cond_broadcast(&mq->cond_send);
233  pthread_mutex_unlock(&mq->lock);
234 #endif /* HAVE_THREADS */
235 }
pthread_mutex_t
_fmutex pthread_mutex_t
Definition: os2threads.h:53
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
av_thread_message_queue_nb_elems
int av_thread_message_queue_nb_elems(AVThreadMessageQueue *mq)
Return the current number of messages in the queue.
Definition: threadmessage.c:105
av_fifo_generic_write
int av_fifo_generic_write(AVFifoBuffer *f, void *src, int size, int(*func)(void *, void *, int))
Feed data from a user-supplied callback to an AVFifoBuffer.
Definition: fifo.c:122
thread.h
pthread_mutex_init
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
AV_THREAD_MESSAGE_NONBLOCK
@ AV_THREAD_MESSAGE_NONBLOCK
Perform non-blocking operation.
Definition: threadmessage.h:31
av_fifo_generic_read
int av_fifo_generic_read(AVFifoBuffer *f, void *dest, int buf_size, void(*func)(void *, void *, int))
Feed data from an AVFifoBuffer to a user-supplied callback.
Definition: fifo.c:213
av_fifo_drain
void av_fifo_drain(AVFifoBuffer *f, int size)
Discard data from the FIFO.
Definition: fifo.c:233
AVFifoBuffer
Definition: fifo.h:31
fifo.h
av_thread_message_queue_recv
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
Definition: threadmessage.c:172
av_fifo_space
int av_fifo_space(const AVFifoBuffer *f)
Return the amount of space in bytes in the AVFifoBuffer, that is the amount of data you can write int...
Definition: fifo.c:82
av_thread_message_queue_send
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
Definition: threadmessage.c:156
av_thread_message_flush
void av_thread_message_flush(AVThreadMessageQueue *mq)
Flush the message queue.
Definition: threadmessage.c:218
AVThreadMessageQueue
Definition: threadmessage.c:25
pthread_cond_broadcast
static av_always_inline int pthread_cond_broadcast(pthread_cond_t *cond)
Definition: os2threads.h:162
arg
const char * arg
Definition: jacosubdec.c:67
NULL
#define NULL
Definition: coverity.c:32
pthread_mutex_unlock
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:68
threadmessage.h
size
int size
Definition: twinvq_data.h:10344
pthread_cond_destroy
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
AVThreadMessageQueue::dummy
int dummy
Definition: threadmessage.c:36
av_thread_message_queue_alloc
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:40
pthread_mutex_destroy
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
av_thread_message_queue_set_err_send
void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, int err)
Set the sending error code.
Definition: threadmessage.c:188
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:263
pthread_cond_t
Definition: os2threads.h:58
ret
ret
Definition: filter_design.txt:187
pthread_cond_signal
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
lock
static pthread_mutex_t lock
Definition: ffjni.c:37
pthread_cond_wait
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
av_fifo_size
int av_fifo_size(const AVFifoBuffer *f)
Return the amount of data in bytes in the AVFifoBuffer, that is the amount of data you can read from ...
Definition: fifo.c:77
av_fifo_generic_peek_at
int av_fifo_generic_peek_at(AVFifoBuffer *f, void *dest, int offset, int buf_size, void(*func)(void *, void *, int))
Feed data at specific position from an AVFifoBuffer to a user-supplied callback.
Definition: fifo.c:151
av_fifo_freep
void av_fifo_freep(AVFifoBuffer **f)
Free an AVFifoBuffer and reset pointer to NULL.
Definition: fifo.c:63
av_free
#define av_free(p)
Definition: tableprint_vlc.h:34
av_thread_message_queue_free
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:91
av_freep
#define av_freep(p)
Definition: tableprint_vlc.h:35
av_fifo_alloc
AVFifoBuffer * av_fifo_alloc(unsigned int size)
Initialize an AVFifoBuffer.
Definition: fifo.c:43
flags
#define flags(name, subs,...)
Definition: cbs_av1.c:561
av_thread_message_queue_set_err_recv
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
Definition: threadmessage.c:199
av_thread_message_queue_set_free_func
void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq, void(*free_func)(void *msg))
Set the optional free message callback function which will be called if an operation is removing mess...
Definition: threadmessage.c:83
pthread_cond_init
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
pthread_mutex_lock
#define pthread_mutex_lock(a)
Definition: ffprobe.c:64