FFmpeg
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
threadmessage.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014 Nicolas George
3  *
4  * This file is part of FFmpeg.
5  *
6  * FFmpeg is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public License
8  * as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * FFmpeg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License
17  * along with FFmpeg; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include "fifo.h"
22 #include "threadmessage.h"
23 #include "thread.h"
24 
26 #if HAVE_THREADS
27  AVFifoBuffer *fifo;
29  pthread_cond_t cond_recv;
30  pthread_cond_t cond_send;
31  int err_send;
32  int err_recv;
33  unsigned elsize;
34  void (*free_func)(void *msg);
35 #else
36  int dummy;
37 #endif
38 };
39 
41  unsigned nelem,
42  unsigned elsize)
43 {
44 #if HAVE_THREADS
46  int ret = 0;
47 
48  if (nelem > INT_MAX / elsize)
49  return AVERROR(EINVAL);
50  if (!(rmq = av_mallocz(sizeof(*rmq))))
51  return AVERROR(ENOMEM);
52  if ((ret = pthread_mutex_init(&rmq->lock, NULL))) {
53  av_free(rmq);
54  return AVERROR(ret);
55  }
56  if ((ret = pthread_cond_init(&rmq->cond_recv, NULL))) {
57  pthread_mutex_destroy(&rmq->lock);
58  av_free(rmq);
59  return AVERROR(ret);
60  }
61  if ((ret = pthread_cond_init(&rmq->cond_send, NULL))) {
62  pthread_cond_destroy(&rmq->cond_recv);
63  pthread_mutex_destroy(&rmq->lock);
64  av_free(rmq);
65  return AVERROR(ret);
66  }
67  if (!(rmq->fifo = av_fifo_alloc(elsize * nelem))) {
68  pthread_cond_destroy(&rmq->cond_send);
69  pthread_cond_destroy(&rmq->cond_recv);
70  pthread_mutex_destroy(&rmq->lock);
71  av_free(rmq);
72  return AVERROR(ENOMEM);
73  }
74  rmq->elsize = elsize;
75  *mq = rmq;
76  return 0;
77 #else
78  *mq = NULL;
79  return AVERROR(ENOSYS);
80 #endif /* HAVE_THREADS */
81 }
82 
84  void (*free_func)(void *msg))
85 {
86 #if HAVE_THREADS
87  mq->free_func = free_func;
88 #endif
89 }
90 
92 {
93 #if HAVE_THREADS
94  if (*mq) {
96  av_fifo_freep(&(*mq)->fifo);
97  pthread_cond_destroy(&(*mq)->cond_send);
98  pthread_cond_destroy(&(*mq)->cond_recv);
99  pthread_mutex_destroy(&(*mq)->lock);
100  av_freep(mq);
101  }
102 #endif
103 }
104 
106 {
107 #if HAVE_THREADS
108  int ret;
109  pthread_mutex_lock(&mq->lock);
110  ret = av_fifo_size(mq->fifo);
111  pthread_mutex_unlock(&mq->lock);
112  return ret / mq->elsize;
113 #else
114  return AVERROR(ENOSYS);
115 #endif
116 }
117 
118 #if HAVE_THREADS
119 
120 static int av_thread_message_queue_send_locked(AVThreadMessageQueue *mq,
121  void *msg,
122  unsigned flags)
123 {
124  while (!mq->err_send && av_fifo_space(mq->fifo) < mq->elsize) {
125  if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
126  return AVERROR(EAGAIN);
127  pthread_cond_wait(&mq->cond_send, &mq->lock);
128  }
129  if (mq->err_send)
130  return mq->err_send;
131  av_fifo_generic_write(mq->fifo, msg, mq->elsize, NULL);
132  /* one message is sent, signal one receiver */
133  pthread_cond_signal(&mq->cond_recv);
134  return 0;
135 }
136 
137 static int av_thread_message_queue_recv_locked(AVThreadMessageQueue *mq,
138  void *msg,
139  unsigned flags)
140 {
141  while (!mq->err_recv && av_fifo_size(mq->fifo) < mq->elsize) {
142  if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
143  return AVERROR(EAGAIN);
144  pthread_cond_wait(&mq->cond_recv, &mq->lock);
145  }
146  if (av_fifo_size(mq->fifo) < mq->elsize)
147  return mq->err_recv;
148  av_fifo_generic_read(mq->fifo, msg, mq->elsize, NULL);
149  /* one message space appeared, signal one sender */
150  pthread_cond_signal(&mq->cond_send);
151  return 0;
152 }
153 
154 #endif /* HAVE_THREADS */
155 
157  void *msg,
158  unsigned flags)
159 {
160 #if HAVE_THREADS
161  int ret;
162 
163  pthread_mutex_lock(&mq->lock);
164  ret = av_thread_message_queue_send_locked(mq, msg, flags);
165  pthread_mutex_unlock(&mq->lock);
166  return ret;
167 #else
168  return AVERROR(ENOSYS);
169 #endif /* HAVE_THREADS */
170 }
171 
173  void *msg,
174  unsigned flags)
175 {
176 #if HAVE_THREADS
177  int ret;
178 
179  pthread_mutex_lock(&mq->lock);
180  ret = av_thread_message_queue_recv_locked(mq, msg, flags);
181  pthread_mutex_unlock(&mq->lock);
182  return ret;
183 #else
184  return AVERROR(ENOSYS);
185 #endif /* HAVE_THREADS */
186 }
187 
189  int err)
190 {
191 #if HAVE_THREADS
192  pthread_mutex_lock(&mq->lock);
193  mq->err_send = err;
194  pthread_cond_broadcast(&mq->cond_send);
195  pthread_mutex_unlock(&mq->lock);
196 #endif /* HAVE_THREADS */
197 }
198 
200  int err)
201 {
202 #if HAVE_THREADS
203  pthread_mutex_lock(&mq->lock);
204  mq->err_recv = err;
205  pthread_cond_broadcast(&mq->cond_recv);
206  pthread_mutex_unlock(&mq->lock);
207 #endif /* HAVE_THREADS */
208 }
209 
210 #if HAVE_THREADS
211 static void free_func_wrap(void *arg, void *msg, int size)
212 {
214  mq->free_func(msg);
215 }
216 #endif
217 
219 {
220 #if HAVE_THREADS
221  int used, off;
222  void *free_func = mq->free_func;
223 
224  pthread_mutex_lock(&mq->lock);
225  used = av_fifo_size(mq->fifo);
226  if (free_func)
227  for (off = 0; off < used; off += mq->elsize)
228  av_fifo_generic_peek_at(mq->fifo, mq, off, mq->elsize, free_func_wrap);
229  av_fifo_drain(mq->fifo, used);
230  /* only the senders need to be notified since the queue is empty and there
231  * is nothing to read */
232  pthread_cond_broadcast(&mq->cond_send);
233  pthread_mutex_unlock(&mq->lock);
234 #endif /* HAVE_THREADS */
235 }
#define NULL
Definition: coverity.c:32
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:108
#define pthread_mutex_lock(a)
Definition: ffprobe.c:61
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:166
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq, void(*free_func)(void *msg))
Set the optional free message callback function which will be called if an operation is removing mess...
Definition: threadmessage.c:83
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:236
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:140
int av_fifo_generic_write(AVFifoBuffer *f, void *src, int size, int(*func)(void *, void *, int))
Feed data from a user-supplied callback to an AVFifoBuffer.
Definition: fifo.c:122
int av_thread_message_queue_nb_elems(AVThreadMessageQueue *mq)
Return the current number of messages in the queue.
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
int av_fifo_space(const AVFifoBuffer *f)
Return the amount of space in bytes in the AVFifoBuffer, that is the amount of data you can write int...
Definition: fifo.c:82
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:148
ptrdiff_t size
Definition: opengl_enc.c:101
void av_thread_message_flush(AVThreadMessageQueue *mq)
Flush the message queue.
#define AVERROR(e)
Definition: error.h:43
int av_fifo_generic_read(AVFifoBuffer *f, void *dest, int buf_size, void(*func)(void *, void *, int))
Feed data from an AVFifoBuffer to a user-supplied callback.
Definition: fifo.c:213
const char * arg
Definition: jacosubdec.c:66
typedef void(APIENTRY *FF_PFNGLACTIVETEXTUREPROC)(GLenum texture)
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:100
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:65
void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, int err)
Set the sending error code.
int av_fifo_size(const AVFifoBuffer *f)
Return the amount of data in bytes in the AVFifoBuffer, that is the amount of data you can read from ...
Definition: fifo.c:77
int av_fifo_generic_peek_at(AVFifoBuffer *f, void *dest, int offset, int buf_size, void(*func)(void *, void *, int))
Feed data at specific position from an AVFifoBuffer to a user-supplied callback.
Definition: fifo.c:151
a very simple circular buffer FIFO implementation
Perform non-blocking operation.
Definition: threadmessage.h:31
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:40
#define flags(name, subs,...)
Definition: cbs_av1.c:596
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:91
static pthread_mutex_t lock
Definition: ffjni.c:37
_fmutex pthread_mutex_t
Definition: os2threads.h:49
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:129
#define av_free(p)
AVFifoBuffer * av_fifo_alloc(unsigned int size)
Initialize an AVFifoBuffer.
Definition: fifo.c:43
static av_always_inline int pthread_cond_broadcast(pthread_cond_t *cond)
Definition: os2threads.h:158
#define av_freep(p)
void av_fifo_freep(AVFifoBuffer **f)
Free an AVFifoBuffer and reset pointer to NULL.
Definition: fifo.c:63
void av_fifo_drain(AVFifoBuffer *f, int size)
Discard data from the FIFO.
Definition: fifo.c:233