FFmpeg
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
udp.c
Go to the documentation of this file.
1 /*
2  * UDP prototype streaming system
3  * Copyright (c) 2000, 2001, 2002 Fabrice Bellard
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 /**
23  * @file
24  * UDP protocol
25  */
26 
27 #define _BSD_SOURCE /* Needed for using struct ip_mreq with recent glibc */
28 
29 #include "avformat.h"
30 #include "avio_internal.h"
31 #include "libavutil/parseutils.h"
32 #include "libavutil/fifo.h"
33 #include "libavutil/intreadwrite.h"
34 #include "libavutil/avstring.h"
35 #include "libavutil/opt.h"
36 #include "libavutil/log.h"
37 #include "internal.h"
38 #include "network.h"
39 #include "os_support.h"
40 #include "url.h"
41 
42 #if HAVE_PTHREAD_CANCEL
43 #include <pthread.h>
44 #endif
45 
46 #ifndef HAVE_PTHREAD_CANCEL
47 #define HAVE_PTHREAD_CANCEL 0
48 #endif
49 
50 #ifndef IPV6_ADD_MEMBERSHIP
51 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
52 #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
53 #endif
54 
55 #define UDP_TX_BUF_SIZE 32768
56 #define UDP_MAX_PKT_SIZE 65536
57 
58 typedef struct {
59  const AVClass *class;
60  int udp_fd;
61  int ttl;
67  struct sockaddr_storage dest_addr;
70 
71  /* Circular Buffer variables for use in UDP receive code */
75 #if HAVE_PTHREAD_CANCEL
76  pthread_t circular_buffer_thread;
78  pthread_cond_t cond;
79  int thread_started;
80 #endif
83  char *local_addr;
85  int timeout;
86 } UDPContext;
87 
88 #define OFFSET(x) offsetof(UDPContext, x)
89 #define D AV_OPT_FLAG_DECODING_PARAM
90 #define E AV_OPT_FLAG_ENCODING_PARAM
91 static const AVOption options[] = {
92 {"buffer_size", "Socket buffer size in bytes", OFFSET(buffer_size), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E },
93 {"localport", "Set local port to bind to", OFFSET(local_port), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E },
94 {"localaddr", "Choose local IP address", OFFSET(local_addr), AV_OPT_TYPE_STRING, {.str = ""}, 0, 0, D|E },
95 {"pkt_size", "Set size of UDP packets", OFFSET(packet_size), AV_OPT_TYPE_INT, {.i64 = 1472}, 0, INT_MAX, D|E },
96 {"reuse", "Explicitly allow or disallow reusing UDP sockets", OFFSET(reuse_socket), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D|E },
97 {"ttl", "Set the time to live value (for multicast only)", OFFSET(ttl), AV_OPT_TYPE_INT, {.i64 = 16}, 0, INT_MAX, E },
98 {"connect", "Should connect() be called on socket", OFFSET(is_connected), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D|E },
99 /* TODO 'sources', 'block' option */
100 {"fifo_size", "Set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
101 {"overrun_nonfatal", "Survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1, D },
102 {"timeout", "In read mode: if no data arrived in more than this time interval, raise error", OFFSET(timeout), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D },
103 {NULL}
104 };
105 
106 static const AVClass udp_context_class = {
107  .class_name = "udp",
108  .item_name = av_default_item_name,
109  .option = options,
110  .version = LIBAVUTIL_VERSION_INT,
111 };
112 
113 static void log_net_error(void *ctx, int level, const char* prefix)
114 {
115  char errbuf[100];
116  av_strerror(ff_neterrno(), errbuf, sizeof(errbuf));
117  av_log(ctx, level, "%s: %s\n", prefix, errbuf);
118 }
119 
120 static int udp_set_multicast_ttl(int sockfd, int mcastTTL,
121  struct sockaddr *addr)
122 {
123 #ifdef IP_MULTICAST_TTL
124  if (addr->sa_family == AF_INET) {
125  if (setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL, &mcastTTL, sizeof(mcastTTL)) < 0) {
126  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_MULTICAST_TTL)");
127  return -1;
128  }
129  }
130 #endif
131 #if defined(IPPROTO_IPV6) && defined(IPV6_MULTICAST_HOPS)
132  if (addr->sa_family == AF_INET6) {
133  if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &mcastTTL, sizeof(mcastTTL)) < 0) {
134  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_MULTICAST_HOPS)");
135  return -1;
136  }
137  }
138 #endif
139  return 0;
140 }
141 
142 static int udp_join_multicast_group(int sockfd, struct sockaddr *addr)
143 {
144 #ifdef IP_ADD_MEMBERSHIP
145  if (addr->sa_family == AF_INET) {
146  struct ip_mreq mreq;
147 
148  mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
149  mreq.imr_interface.s_addr= INADDR_ANY;
150  if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
151  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP)");
152  return -1;
153  }
154  }
155 #endif
156 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
157  if (addr->sa_family == AF_INET6) {
158  struct ipv6_mreq mreq6;
159 
160  memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
161  mreq6.ipv6mr_interface= 0;
162  if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
163  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_ADD_MEMBERSHIP)");
164  return -1;
165  }
166  }
167 #endif
168  return 0;
169 }
170 
171 static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr)
172 {
173 #ifdef IP_DROP_MEMBERSHIP
174  if (addr->sa_family == AF_INET) {
175  struct ip_mreq mreq;
176 
177  mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
178  mreq.imr_interface.s_addr= INADDR_ANY;
179  if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
180  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP)");
181  return -1;
182  }
183  }
184 #endif
185 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
186  if (addr->sa_family == AF_INET6) {
187  struct ipv6_mreq mreq6;
188 
189  memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
190  mreq6.ipv6mr_interface= 0;
191  if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
192  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_DROP_MEMBERSHIP)");
193  return -1;
194  }
195  }
196 #endif
197  return 0;
198 }
199 
200 static struct addrinfo* udp_resolve_host(const char *hostname, int port,
201  int type, int family, int flags)
202 {
203  struct addrinfo hints = { 0 }, *res = 0;
204  int error;
205  char sport[16];
206  const char *node = 0, *service = "0";
207 
208  if (port > 0) {
209  snprintf(sport, sizeof(sport), "%d", port);
210  service = sport;
211  }
212  if ((hostname) && (hostname[0] != '\0') && (hostname[0] != '?')) {
213  node = hostname;
214  }
215  hints.ai_socktype = type;
216  hints.ai_family = family;
217  hints.ai_flags = flags;
218  if ((error = getaddrinfo(node, service, &hints, &res))) {
219  res = NULL;
220  av_log(NULL, AV_LOG_ERROR, "udp_resolve_host: %s\n", gai_strerror(error));
221  }
222 
223  return res;
224 }
225 
226 static int udp_set_multicast_sources(int sockfd, struct sockaddr *addr,
227  int addr_len, char **sources,
228  int nb_sources, int include)
229 {
230 #if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE) && !defined(_WIN32)
231  /* These ones are available in the microsoft SDK, but don't seem to work
232  * as on linux, so just prefer the v4-only approach there for now. */
233  int i;
234  for (i = 0; i < nb_sources; i++) {
235  struct group_source_req mreqs;
236  int level = addr->sa_family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6;
237  struct addrinfo *sourceaddr = udp_resolve_host(sources[i], 0,
238  SOCK_DGRAM, AF_UNSPEC,
240  if (!sourceaddr)
241  return AVERROR(ENOENT);
242 
243  mreqs.gsr_interface = 0;
244  memcpy(&mreqs.gsr_group, addr, addr_len);
245  memcpy(&mreqs.gsr_source, sourceaddr->ai_addr, sourceaddr->ai_addrlen);
246  freeaddrinfo(sourceaddr);
247 
248  if (setsockopt(sockfd, level,
249  include ? MCAST_JOIN_SOURCE_GROUP : MCAST_BLOCK_SOURCE,
250  (const void *)&mreqs, sizeof(mreqs)) < 0) {
251  if (include)
252  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_JOIN_SOURCE_GROUP)");
253  else
254  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_BLOCK_SOURCE)");
255  return ff_neterrno();
256  }
257  }
258 #elif HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE)
259  int i;
260  if (addr->sa_family != AF_INET) {
262  "Setting multicast sources only supported for IPv4\n");
263  return AVERROR(EINVAL);
264  }
265  for (i = 0; i < nb_sources; i++) {
266  struct ip_mreq_source mreqs;
267  struct addrinfo *sourceaddr = udp_resolve_host(sources[i], 0,
268  SOCK_DGRAM, AF_UNSPEC,
270  if (!sourceaddr)
271  return AVERROR(ENOENT);
272  if (sourceaddr->ai_addr->sa_family != AF_INET) {
273  freeaddrinfo(sourceaddr);
274  av_log(NULL, AV_LOG_ERROR, "%s is of incorrect protocol family\n",
275  sources[i]);
276  return AVERROR(EINVAL);
277  }
278 
279  mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
280  mreqs.imr_interface.s_addr = INADDR_ANY;
281  mreqs.imr_sourceaddr.s_addr = ((struct sockaddr_in *)sourceaddr->ai_addr)->sin_addr.s_addr;
282  freeaddrinfo(sourceaddr);
283 
284  if (setsockopt(sockfd, IPPROTO_IP,
285  include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE,
286  (const void *)&mreqs, sizeof(mreqs)) < 0) {
287  if (include)
288  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)");
289  else
290  log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_BLOCK_SOURCE)");
291  return ff_neterrno();
292  }
293  }
294 #else
295  return AVERROR(ENOSYS);
296 #endif
297  return 0;
298 }
299 static int udp_set_url(struct sockaddr_storage *addr,
300  const char *hostname, int port)
301 {
302  struct addrinfo *res0;
303  int addr_len;
304 
305  res0 = udp_resolve_host(hostname, port, SOCK_DGRAM, AF_UNSPEC, 0);
306  if (res0 == 0) return AVERROR(EIO);
307  memcpy(addr, res0->ai_addr, res0->ai_addrlen);
308  addr_len = res0->ai_addrlen;
309  freeaddrinfo(res0);
310 
311  return addr_len;
312 }
313 
314 static int udp_socket_create(UDPContext *s, struct sockaddr_storage *addr,
315  socklen_t *addr_len, const char *localaddr)
316 {
317  int udp_fd = -1;
318  struct addrinfo *res0 = NULL, *res = NULL;
319  int family = AF_UNSPEC;
320 
321  if (((struct sockaddr *) &s->dest_addr)->sa_family)
322  family = ((struct sockaddr *) &s->dest_addr)->sa_family;
323  res0 = udp_resolve_host(localaddr[0] ? localaddr : NULL, s->local_port,
324  SOCK_DGRAM, family, AI_PASSIVE);
325  if (res0 == 0)
326  goto fail;
327  for (res = res0; res; res=res->ai_next) {
328  udp_fd = socket(res->ai_family, SOCK_DGRAM, 0);
329  if (udp_fd != -1) break;
330  log_net_error(NULL, AV_LOG_ERROR, "socket");
331  }
332 
333  if (udp_fd < 0)
334  goto fail;
335 
336  memcpy(addr, res->ai_addr, res->ai_addrlen);
337  *addr_len = res->ai_addrlen;
338 
339  freeaddrinfo(res0);
340 
341  return udp_fd;
342 
343  fail:
344  if (udp_fd >= 0)
345  closesocket(udp_fd);
346  if(res0)
347  freeaddrinfo(res0);
348  return -1;
349 }
350 
351 static int udp_port(struct sockaddr_storage *addr, int addr_len)
352 {
353  char sbuf[sizeof(int)*3+1];
354  int error;
355 
356  if ((error = getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0, sbuf, sizeof(sbuf), NI_NUMERICSERV)) != 0) {
357  av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(error));
358  return -1;
359  }
360 
361  return strtol(sbuf, NULL, 10);
362 }
363 
364 
365 /**
366  * If no filename is given to av_open_input_file because you want to
367  * get the local port first, then you must call this function to set
368  * the remote server address.
369  *
370  * url syntax: udp://host:port[?option=val...]
371  * option: 'ttl=n' : set the ttl value (for multicast only)
372  * 'localport=n' : set the local port
373  * 'pkt_size=n' : set max packet size
374  * 'reuse=1' : enable reusing the socket
375  * 'overrun_nonfatal=1': survive in case of circular buffer overrun
376  *
377  * @param h media file context
378  * @param uri of the remote server
379  * @return zero if no error.
380  */
381 int ff_udp_set_remote_url(URLContext *h, const char *uri)
382 {
383  UDPContext *s = h->priv_data;
384  char hostname[256], buf[10];
385  int port;
386  const char *p;
387 
388  av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
389 
390  /* set the destination address */
391  s->dest_addr_len = udp_set_url(&s->dest_addr, hostname, port);
392  if (s->dest_addr_len < 0) {
393  return AVERROR(EIO);
394  }
395  s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr);
396  p = strchr(uri, '?');
397  if (p) {
398  if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
399  int was_connected = s->is_connected;
400  s->is_connected = strtol(buf, NULL, 10);
401  if (s->is_connected && !was_connected) {
402  if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr,
403  s->dest_addr_len)) {
404  s->is_connected = 0;
405  log_net_error(h, AV_LOG_ERROR, "connect");
406  return AVERROR(EIO);
407  }
408  }
409  }
410  }
411 
412  return 0;
413 }
414 
415 /**
416  * Return the local port used by the UDP connection
417  * @param h media file context
418  * @return the local port number
419  */
421 {
422  UDPContext *s = h->priv_data;
423  return s->local_port;
424 }
425 
426 /**
427  * Return the udp file handle for select() usage to wait for several RTP
428  * streams at the same time.
429  * @param h media file context
430  */
432 {
433  UDPContext *s = h->priv_data;
434  return s->udp_fd;
435 }
436 
437 #if HAVE_PTHREAD_CANCEL
438 static void *circular_buffer_task( void *_URLContext)
439 {
440  URLContext *h = _URLContext;
441  UDPContext *s = h->priv_data;
442  int old_cancelstate;
443 
444  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
445  pthread_mutex_lock(&s->mutex);
446  if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
447  av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
448  s->circular_buffer_error = AVERROR(EIO);
449  goto end;
450  }
451  while(1) {
452  int len;
453 
454  pthread_mutex_unlock(&s->mutex);
455  /* Blocking operations are always cancellation points;
456  see "General Information" / "Thread Cancelation Overview"
457  in Single Unix. */
458  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
459  len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0);
460  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
461  pthread_mutex_lock(&s->mutex);
462  if (len < 0) {
463  if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
465  goto end;
466  }
467  continue;
468  }
469  AV_WL32(s->tmp, len);
470 
471  if(av_fifo_space(s->fifo) < len + 4) {
472  /* No Space left */
473  if (s->overrun_nonfatal) {
474  av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
475  "Surviving due to overrun_nonfatal option\n");
476  continue;
477  } else {
478  av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
479  "To avoid, increase fifo_size URL option. "
480  "To survive in such case, use overrun_nonfatal option\n");
481  s->circular_buffer_error = AVERROR(EIO);
482  goto end;
483  }
484  }
485  av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
486  pthread_cond_signal(&s->cond);
487  }
488 
489 end:
490  pthread_cond_signal(&s->cond);
491  pthread_mutex_unlock(&s->mutex);
492  return NULL;
493 }
494 #endif
495 
496 /* put it in UDP context */
497 /* return non zero if error */
498 static int udp_open(URLContext *h, const char *uri, int flags)
499 {
500  char hostname[1024], localaddr[1024] = "";
501  int port, udp_fd = -1, tmp, bind_ret = -1;
502  UDPContext *s = h->priv_data;
503  int is_output;
504  const char *p;
505  char buf[256];
506  struct sockaddr_storage my_addr;
507  socklen_t len;
508  int reuse_specified = 0;
509  int i, include = 0, num_sources = 0;
510  char *sources[32];
511 
512  h->is_streamed = 1;
513 
514  is_output = !(flags & AVIO_FLAG_READ);
515  if (!s->buffer_size) /* if not set explicitly */
516  s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE;
517 
518  p = strchr(uri, '?');
519  if (p) {
520  if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
521  char *endptr = NULL;
522  s->reuse_socket = strtol(buf, &endptr, 10);
523  /* assume if no digits were found it is a request to enable it */
524  if (buf == endptr)
525  s->reuse_socket = 1;
526  reuse_specified = 1;
527  }
528  if (av_find_info_tag(buf, sizeof(buf), "overrun_nonfatal", p)) {
529  char *endptr = NULL;
530  s->overrun_nonfatal = strtol(buf, &endptr, 10);
531  /* assume if no digits were found it is a request to enable it */
532  if (buf == endptr)
533  s->overrun_nonfatal = 1;
534  if (!HAVE_PTHREAD_CANCEL)
536  "'overrun_nonfatal' option was set but it is not supported "
537  "on this build (pthread support is required)\n");
538  }
539  if (av_find_info_tag(buf, sizeof(buf), "ttl", p)) {
540  s->ttl = strtol(buf, NULL, 10);
541  }
542  if (av_find_info_tag(buf, sizeof(buf), "localport", p)) {
543  s->local_port = strtol(buf, NULL, 10);
544  }
545  if (av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) {
546  s->packet_size = strtol(buf, NULL, 10);
547  }
548  if (av_find_info_tag(buf, sizeof(buf), "buffer_size", p)) {
549  s->buffer_size = strtol(buf, NULL, 10);
550  }
551  if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
552  s->is_connected = strtol(buf, NULL, 10);
553  }
554  if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
555  s->circular_buffer_size = strtol(buf, NULL, 10);
556  if (!HAVE_PTHREAD_CANCEL)
558  "'circular_buffer_size' option was set but it is not supported "
559  "on this build (pthread support is required)\n");
560  }
561  if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
562  av_strlcpy(localaddr, buf, sizeof(localaddr));
563  }
564  if (av_find_info_tag(buf, sizeof(buf), "sources", p))
565  include = 1;
566  if (include || av_find_info_tag(buf, sizeof(buf), "block", p)) {
567  char *source_start;
568 
569  source_start = buf;
570  while (1) {
571  char *next = strchr(source_start, ',');
572  if (next)
573  *next = '\0';
574  sources[num_sources] = av_strdup(source_start);
575  if (!sources[num_sources])
576  goto fail;
577  source_start = next + 1;
578  num_sources++;
579  if (num_sources >= FF_ARRAY_ELEMS(sources) || !next)
580  break;
581  }
582  }
583  if (!is_output && av_find_info_tag(buf, sizeof(buf), "timeout", p))
584  s->timeout = strtol(buf, NULL, 10);
585  }
586  /* handling needed to support options picking from both AVOption and URL */
587  s->circular_buffer_size *= 188;
589  h->rw_timeout = s->timeout;
590 
591  /* fill the dest addr */
592  av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
593 
594  /* XXX: fix av_url_split */
595  if (hostname[0] == '\0' || hostname[0] == '?') {
596  /* only accepts null hostname if input */
597  if (!(flags & AVIO_FLAG_READ))
598  goto fail;
599  } else {
600  if (ff_udp_set_remote_url(h, uri) < 0)
601  goto fail;
602  }
603 
604  if ((s->is_multicast || !s->local_port) && (h->flags & AVIO_FLAG_READ))
605  s->local_port = port;
606  udp_fd = udp_socket_create(s, &my_addr, &len, localaddr[0] ? localaddr : s->local_addr);
607  if (udp_fd < 0)
608  goto fail;
609 
610  /* Follow the requested reuse option, unless it's multicast in which
611  * case enable reuse unless explicitly disabled.
612  */
613  if (s->reuse_socket || (s->is_multicast && !reuse_specified)) {
614  s->reuse_socket = 1;
615  if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)) != 0)
616  goto fail;
617  }
618 
619  /* If multicast, try binding the multicast address first, to avoid
620  * receiving UDP packets from other sources aimed at the same UDP
621  * port. This fails on windows. This makes sending to the same address
622  * using sendto() fail, so only do it if we're opened in read-only mode. */
623  if (s->is_multicast && !(h->flags & AVIO_FLAG_WRITE)) {
624  bind_ret = bind(udp_fd,(struct sockaddr *)&s->dest_addr, len);
625  }
626  /* bind to the local address if not multicast or if the multicast
627  * bind failed */
628  /* the bind is needed to give a port to the socket now */
629  if (bind_ret < 0 && bind(udp_fd,(struct sockaddr *)&my_addr, len) < 0) {
630  log_net_error(h, AV_LOG_ERROR, "bind failed");
631  goto fail;
632  }
633 
634  len = sizeof(my_addr);
635  getsockname(udp_fd, (struct sockaddr *)&my_addr, &len);
636  s->local_port = udp_port(&my_addr, len);
637 
638  if (s->is_multicast) {
639  if (h->flags & AVIO_FLAG_WRITE) {
640  /* output */
641  if (udp_set_multicast_ttl(udp_fd, s->ttl, (struct sockaddr *)&s->dest_addr) < 0)
642  goto fail;
643  }
644  if (h->flags & AVIO_FLAG_READ) {
645  /* input */
646  if (num_sources == 0 || !include) {
647  if (udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr) < 0)
648  goto fail;
649 
650  if (num_sources) {
651  if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, sources, num_sources, 0) < 0)
652  goto fail;
653  }
654  } else if (include && num_sources) {
655  if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, sources, num_sources, 1) < 0)
656  goto fail;
657  } else {
658  av_log(NULL, AV_LOG_ERROR, "invalid udp settings: inclusive multicast but no sources given\n");
659  goto fail;
660  }
661  }
662  }
663 
664  if (is_output) {
665  /* limit the tx buf size to limit latency */
666  tmp = s->buffer_size;
667  if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &tmp, sizeof(tmp)) < 0) {
668  log_net_error(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF)");
669  goto fail;
670  }
671  } else {
672  /* set udp recv buffer size to the largest possible udp packet size to
673  * avoid losing data on OSes that set this too low by default. */
674  tmp = s->buffer_size;
675  if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
676  log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)");
677  }
678  /* make the socket non-blocking */
679  ff_socket_nonblock(udp_fd, 1);
680  }
681  if (s->is_connected) {
682  if (connect(udp_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) {
683  log_net_error(h, AV_LOG_ERROR, "connect");
684  goto fail;
685  }
686  }
687 
688  for (i = 0; i < num_sources; i++)
689  av_freep(&sources[i]);
690 
691  s->udp_fd = udp_fd;
692 
693 #if HAVE_PTHREAD_CANCEL
694  if (!is_output && s->circular_buffer_size) {
695  int ret;
696 
697  /* start the task going */
699  ret = pthread_mutex_init(&s->mutex, NULL);
700  if (ret != 0) {
701  av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
702  goto fail;
703  }
704  ret = pthread_cond_init(&s->cond, NULL);
705  if (ret != 0) {
706  av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
707  goto cond_fail;
708  }
709  ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h);
710  if (ret != 0) {
711  av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
712  goto thread_fail;
713  }
714  s->thread_started = 1;
715  }
716 #endif
717 
718  return 0;
719 #if HAVE_PTHREAD_CANCEL
720  thread_fail:
721  pthread_cond_destroy(&s->cond);
722  cond_fail:
723  pthread_mutex_destroy(&s->mutex);
724 #endif
725  fail:
726  if (udp_fd >= 0)
727  closesocket(udp_fd);
728  av_fifo_free(s->fifo);
729  for (i = 0; i < num_sources; i++)
730  av_freep(&sources[i]);
731  return AVERROR(EIO);
732 }
733 
734 static int udp_read(URLContext *h, uint8_t *buf, int size)
735 {
736  UDPContext *s = h->priv_data;
737  int ret;
738  int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
739 
740 #if HAVE_PTHREAD_CANCEL
741  if (s->fifo) {
742  pthread_mutex_lock(&s->mutex);
743  do {
744  avail = av_fifo_size(s->fifo);
745  if (avail) { // >=size) {
746  uint8_t tmp[4];
747 
748  av_fifo_generic_read(s->fifo, tmp, 4, NULL);
749  avail= AV_RL32(tmp);
750  if(avail > size){
751  av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
752  avail= size;
753  }
754 
755  av_fifo_generic_read(s->fifo, buf, avail, NULL);
756  av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
757  pthread_mutex_unlock(&s->mutex);
758  return avail;
759  } else if(s->circular_buffer_error){
760  int err = s->circular_buffer_error;
761  pthread_mutex_unlock(&s->mutex);
762  return err;
763  } else if(nonblock) {
764  pthread_mutex_unlock(&s->mutex);
765  return AVERROR(EAGAIN);
766  }
767  else {
768  /* FIXME: using the monotonic clock would be better,
769  but it does not exist on all supported platforms. */
770  int64_t t = av_gettime() + 100000;
771  struct timespec tv = { .tv_sec = t / 1000000,
772  .tv_nsec = (t % 1000000) * 1000 };
773  if (pthread_cond_timedwait(&s->cond, &s->mutex, &tv) < 0) {
774  pthread_mutex_unlock(&s->mutex);
775  return AVERROR(errno == ETIMEDOUT ? EAGAIN : errno);
776  }
777  nonblock = 1;
778  }
779  } while( 1);
780  }
781 #endif
782 
783  if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
784  ret = ff_network_wait_fd(s->udp_fd, 0);
785  if (ret < 0)
786  return ret;
787  }
788  ret = recv(s->udp_fd, buf, size, 0);
789 
790  return ret < 0 ? ff_neterrno() : ret;
791 }
792 
793 static int udp_write(URLContext *h, const uint8_t *buf, int size)
794 {
795  UDPContext *s = h->priv_data;
796  int ret;
797 
798  if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
799  ret = ff_network_wait_fd(s->udp_fd, 1);
800  if (ret < 0)
801  return ret;
802  }
803 
804  if (!s->is_connected) {
805  ret = sendto (s->udp_fd, buf, size, 0,
806  (struct sockaddr *) &s->dest_addr,
807  s->dest_addr_len);
808  } else
809  ret = send(s->udp_fd, buf, size, 0);
810 
811  return ret < 0 ? ff_neterrno() : ret;
812 }
813 
814 static int udp_close(URLContext *h)
815 {
816  UDPContext *s = h->priv_data;
817  int ret;
818 
819  if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
820  udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr);
821  closesocket(s->udp_fd);
822 #if HAVE_PTHREAD_CANCEL
823  if (s->thread_started) {
824  pthread_cancel(s->circular_buffer_thread);
825  ret = pthread_join(s->circular_buffer_thread, NULL);
826  if (ret != 0)
827  av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
828  pthread_mutex_destroy(&s->mutex);
829  pthread_cond_destroy(&s->cond);
830  }
831 #endif
832  av_fifo_free(s->fifo);
833  return 0;
834 }
835 
837  .name = "udp",
838  .url_open = udp_open,
839  .url_read = udp_read,
840  .url_write = udp_write,
841  .url_close = udp_close,
842  .url_get_file_handle = udp_get_file_handle,
843  .priv_data_size = sizeof(UDPContext),
844  .priv_data_class = &udp_context_class,
846 };