udp: Support IGMPv3 source specific multicast and source blocking

Message ID 1340278096-69012-1-git-send-email-martin@martin.st
State Committed
Headers show

Commit Message

Martin Storsjö June 21, 2012, 11:28 a.m.
Based on an original patch by Stephen D'Angelo <SDAngelo@evertz.com>.
---
This is finally a version that should be good to go, if Aviad tests
it and finds it to be working.

The syntax is slightly updated, now you specify sources=1.2.3.4 or
block=1.2.3.4 instead of listing sources=...&include=0/1.

The code is updated to use protocol family independent methods for
setting this, if available. The code is also updated to avoid storing
the source list in the context, since it's only used during the init.

 configure          |    6 +++
 doc/protocols.texi |    8 ++++
 libavformat/udp.c  |  112 +++++++++++++++++++++++++++++++++++++++++++++++++++-
 3 files changed, 125 insertions(+), 1 deletion(-)

Comments

aviad rozenhek June 21, 2012, 1:15 p.m. | #1
On Thu, Jun 21, 2012 at 2:28 PM, Martin Storsjö <martin@martin.st> wrote:

> Based on an original patch by Stephen D'Angelo <SDAngelo@evertz.com>.
> ---
> This is finally a version that should be good to go, if Aviad tests
> it and finds it to be working.
>
> The syntax is slightly updated, now you specify sources=1.2.3.4 or
> block=1.2.3.4 instead of listing sources=...&include=0/1.
>
> The code is updated to use protocol family independent methods for
> setting this, if available. The code is also updated to avoid storing
> the source list in the context, since it's only used during the init.
>
>  configure          |    6 +++
>  doc/protocols.texi |    8 ++++
>  libavformat/udp.c  |  112
> +++++++++++++++++++++++++++++++++++++++++++++++++++-
>  3 files changed, 125 insertions(+), 1 deletion(-)
>
> diff --git a/configure b/configure
> index 07608c4..67e851c 100755
> --- a/configure
> +++ b/configure
> @@ -1130,6 +1130,8 @@ HAVE_LIST="
>     strptime
>     strtok_r
>     struct_addrinfo
> +    struct_group_source_req
> +    struct_ip_mreq_source
>     struct_ipv6_mreq
>     struct_rusage_ru_maxrss
>     struct_sockaddr_in6
> @@ -2801,6 +2803,8 @@ fi
>  if enabled network; then
>     check_type "sys/types.h sys/socket.h" socklen_t
>     check_type netdb.h "struct addrinfo"
> +    check_type netinet/in.h "struct group_source_req" -D_BSD_SOURCE
> +    check_type netinet/in.h "struct ip_mreq_source" -D_BSD_SOURCE
>     check_type netinet/in.h "struct ipv6_mreq" -D_DARWIN_C_SOURCE
>     check_type netinet/in.h "struct sockaddr_in6"
>     check_type "sys/types.h sys/socket.h" "struct sockaddr_storage"
> @@ -2816,6 +2820,8 @@ if enabled network; then
>             network_extralibs="-lws2_32"; }
>         check_type ws2tcpip.h socklen_t
>         check_type ws2tcpip.h "struct addrinfo"
> +        check_type ws2tcpip.h "struct group_source_req"
> +        check_type ws2tcpip.h "struct ip_mreq_source"
>         check_type ws2tcpip.h "struct ipv6_mreq"
>         check_type ws2tcpip.h "struct sockaddr_in6"
>         check_type ws2tcpip.h "struct sockaddr_storage"
> diff --git a/doc/protocols.texi b/doc/protocols.texi
> index e90d1b4..e75f108 100644
> --- a/doc/protocols.texi
> +++ b/doc/protocols.texi
> @@ -536,6 +536,14 @@ and makes writes return with AVERROR(ECONNREFUSED) if
> "destination
>  unreachable" is received.
>  For receiving, this gives the benefit of only receiving packets from
>  the specified peer address/port.
> +
> +@item sources=@var{address}[,@var{address}]
> +Only receive packets sent to the multicast group from one of the
> +specified sender IP addresses.
> +
> +@item block=@var{address}[,@var{address}]
> +Ignore packets sent to the multicast group from the specified
> +sender IP addresses.
>  @end table
>
>  Some usage examples of the udp protocol with @command{avconv} follow.
> diff --git a/libavformat/udp.c b/libavformat/udp.c
> index 39db263..1320206 100644
> --- a/libavformat/udp.c
> +++ b/libavformat/udp.c
> @@ -169,6 +169,77 @@ static struct addrinfo* udp_resolve_host(const char
> *hostname, int port,
>     return res;
>  }
>
> +static int udp_set_multicast_sources(int sockfd, struct sockaddr *addr,
> +                                     int addr_len, char **sources,
> +                                     int nb_sources, int include)
> +{
> +#if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE)
> +    int i;
> +    for (i = 0; i < nb_sources; i++) {
> +        struct group_source_req mreqs;
> +        int level = addr->sa_family == AF_INET ? SOL_IP : SOL_IPV6;
> +        struct addrinfo *sourceaddr = udp_resolve_host(sources[i], 0,
> +                                                       SOCK_DGRAM,
> AF_UNSPEC,
> +                                                       AI_NUMERICHOST);
> +        if (!sourceaddr)
> +            return AVERROR(ENOENT);
> +
> +        mreqs.gsr_interface = 0;
> +        memcpy(&mreqs.gsr_group, addr, addr_len);
> +        memcpy(&mreqs.gsr_source, sourceaddr->ai_addr,
> sourceaddr->ai_addrlen);
> +        freeaddrinfo(sourceaddr);
> +
> +        if (setsockopt(sockfd, level,
> +                       include ? MCAST_JOIN_SOURCE_GROUP :
> MCAST_BLOCK_SOURCE,
> +                       (const void *)&mreqs, sizeof(mreqs)) < 0) {
> +            if (include)
> +                log_net_error(NULL, AV_LOG_ERROR,
> "setsockopt(MCAST_JOIN_SOURCE_GROUP)");
> +            else
> +                log_net_error(NULL, AV_LOG_ERROR,
> "setsockopt(MCAST_BLOCK_SOURCE)");
> +            return ff_neterrno();
> +        }
> +    }
> +#elif HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE)
> +    int i;
> +    if (addr->sa_family != AF_INET) {
> +        av_log(NULL, AV_LOG_ERROR,
> +               "Setting multicast sources only supported for IPv4\n");
> +        return AVERROR(EINVAL);
> +    }
> +    for (i = 0; i < nb_sources; i++) {
> +        struct ip_mreq_source mreqs;
> +        struct addrinfo *sourceaddr = udp_resolve_host(sources[i], 0,
> +                                                       SOCK_DGRAM,
> AF_UNSPEC,
> +                                                       AI_NUMERICHOST);
> +        if (!sourceaddr)
> +            return AVERROR(ENOENT);
> +        if (sourceaddr->ai_addr->sa_family != AF_INET) {
> +            freeaddrinfo(sourceaddr);
> +            av_log(NULL, AV_LOG_ERROR, "%s is of incorrect protocol
> family\n",
> +                   sources[i]);
> +            return AVERROR(EINVAL);
> +        }
> +
> +        mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in
> *)addr)->sin_addr.s_addr;
> +        mreqs.imr_interface.s_addr = INADDR_ANY;
> +        mreqs.imr_sourceaddr.s_addr = ((struct sockaddr_in
> *)sourceaddr->ai_addr)->sin_addr.s_addr;
> +        freeaddrinfo(sourceaddr);
> +
> +        if (setsockopt(sockfd, IPPROTO_IP,
> +                       include ? IP_ADD_SOURCE_MEMBERSHIP :
> IP_BLOCK_SOURCE,
> +                       (const void *)&mreqs, sizeof(mreqs)) < 0) {
> +            if (include)
> +                log_net_error(NULL, AV_LOG_ERROR,
> "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)");
> +            else
> +                log_net_error(NULL, AV_LOG_ERROR,
> "setsockopt(IP_BLOCK_SOURCE)");
> +            return ff_neterrno();
> +        }
> +    }
> +#else
> +    return AVERROR(ENOSYS);
> +#endif
> +    return 0;
> +}
>  static int udp_set_url(struct sockaddr_storage *addr,
>                        const char *hostname, int port)
>  {
> @@ -319,6 +390,8 @@ static int udp_open(URLContext *h, const char *uri,
> int flags)
>     struct sockaddr_storage my_addr;
>     int len;
>     int reuse_specified = 0;
> +    int i, include = 0, num_sources = 0;
> +    char *sources[32];
>
>     h->is_streamed = 1;
>     h->max_packet_size = 1472;
> @@ -356,6 +429,25 @@ static int udp_open(URLContext *h, const char *uri,
> int flags)
>         if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
>             av_strlcpy(localaddr, buf, sizeof(localaddr));
>         }
> +        if (av_find_info_tag(buf, sizeof(buf), "sources", p))
> +            include = 1;
> +        if (include || av_find_info_tag(buf, sizeof(buf), "block", p)) {
> +            char *source_start;
> +
> +            source_start = buf;
> +            while (1) {
> +                char *next = strchr(source_start, ',');
> +                if (next)
> +                    *next = '\0';
> +                sources[num_sources] = av_strdup(source_start);
> +                if (!sources[num_sources])
> +                    goto fail;
> +                source_start = next + 1;
> +                num_sources++;
> +                if (num_sources >= FF_ARRAY_ELEMS(sources) || !next)
> +                    break;
> +            }
> +        }
>     }
>
>     /* fill the dest addr */
> @@ -413,8 +505,21 @@ static int udp_open(URLContext *h, const char *uri,
> int flags)
>         }
>         if (h->flags & AVIO_FLAG_READ) {
>             /* input */
> -            if (udp_join_multicast_group(udp_fd, (struct sockaddr
> *)&s->dest_addr) < 0)
> +            if (num_sources == 0 || !include) {
> +                if (udp_join_multicast_group(udp_fd, (struct sockaddr
> *)&s->dest_addr) < 0)
> +                    goto fail;
> +
> +                if (num_sources) {
> +                    if (udp_set_multicast_sources(udp_fd, (struct
> sockaddr *)&s->dest_addr, s->dest_addr_len, sources, num_sources, 0) < 0)
> +                        goto fail;
> +                }
> +            } else if (include && num_sources) {
> +                if (udp_set_multicast_sources(udp_fd, (struct sockaddr
> *)&s->dest_addr, s->dest_addr_len, sources, num_sources, 1) < 0)
> +                    goto fail;
> +            } else {
> +                av_log(NULL, AV_LOG_ERROR, "invalid udp settings:
> inclusive multicast but no sources given\n");
>                 goto fail;
> +            }
>         }
>     }
>
> @@ -442,11 +547,16 @@ static int udp_open(URLContext *h, const char *uri,
> int flags)
>         }
>     }
>
> +    for (i = 0; i < num_sources; i++)
> +        av_free(sources[i]);
> +
>     s->udp_fd = udp_fd;
>     return 0;
>  fail:
>     if (udp_fd >= 0)
>         closesocket(udp_fd);
> +    for (i = 0; i < num_sources; i++)
> +        av_free(sources[i]);
>     return AVERROR(EIO);
>  }
>
> --
> 1.7.9.4
>
>
thanks!

Patch

diff --git a/configure b/configure
index 07608c4..67e851c 100755
--- a/configure
+++ b/configure
@@ -1130,6 +1130,8 @@  HAVE_LIST="
     strptime
     strtok_r
     struct_addrinfo
+    struct_group_source_req
+    struct_ip_mreq_source
     struct_ipv6_mreq
     struct_rusage_ru_maxrss
     struct_sockaddr_in6
@@ -2801,6 +2803,8 @@  fi
 if enabled network; then
     check_type "sys/types.h sys/socket.h" socklen_t
     check_type netdb.h "struct addrinfo"
+    check_type netinet/in.h "struct group_source_req" -D_BSD_SOURCE
+    check_type netinet/in.h "struct ip_mreq_source" -D_BSD_SOURCE
     check_type netinet/in.h "struct ipv6_mreq" -D_DARWIN_C_SOURCE
     check_type netinet/in.h "struct sockaddr_in6"
     check_type "sys/types.h sys/socket.h" "struct sockaddr_storage"
@@ -2816,6 +2820,8 @@  if enabled network; then
             network_extralibs="-lws2_32"; }
         check_type ws2tcpip.h socklen_t
         check_type ws2tcpip.h "struct addrinfo"
+        check_type ws2tcpip.h "struct group_source_req"
+        check_type ws2tcpip.h "struct ip_mreq_source"
         check_type ws2tcpip.h "struct ipv6_mreq"
         check_type ws2tcpip.h "struct sockaddr_in6"
         check_type ws2tcpip.h "struct sockaddr_storage"
diff --git a/doc/protocols.texi b/doc/protocols.texi
index e90d1b4..e75f108 100644
--- a/doc/protocols.texi
+++ b/doc/protocols.texi
@@ -536,6 +536,14 @@  and makes writes return with AVERROR(ECONNREFUSED) if "destination
 unreachable" is received.
 For receiving, this gives the benefit of only receiving packets from
 the specified peer address/port.
+
+@item sources=@var{address}[,@var{address}]
+Only receive packets sent to the multicast group from one of the
+specified sender IP addresses.
+
+@item block=@var{address}[,@var{address}]
+Ignore packets sent to the multicast group from the specified
+sender IP addresses.
 @end table
 
 Some usage examples of the udp protocol with @command{avconv} follow.
diff --git a/libavformat/udp.c b/libavformat/udp.c
index 39db263..1320206 100644
--- a/libavformat/udp.c
+++ b/libavformat/udp.c
@@ -169,6 +169,77 @@  static struct addrinfo* udp_resolve_host(const char *hostname, int port,
     return res;
 }
 
+static int udp_set_multicast_sources(int sockfd, struct sockaddr *addr,
+                                     int addr_len, char **sources,
+                                     int nb_sources, int include)
+{
+#if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE)
+    int i;
+    for (i = 0; i < nb_sources; i++) {
+        struct group_source_req mreqs;
+        int level = addr->sa_family == AF_INET ? SOL_IP : SOL_IPV6;
+        struct addrinfo *sourceaddr = udp_resolve_host(sources[i], 0,
+                                                       SOCK_DGRAM, AF_UNSPEC,
+                                                       AI_NUMERICHOST);
+        if (!sourceaddr)
+            return AVERROR(ENOENT);
+
+        mreqs.gsr_interface = 0;
+        memcpy(&mreqs.gsr_group, addr, addr_len);
+        memcpy(&mreqs.gsr_source, sourceaddr->ai_addr, sourceaddr->ai_addrlen);
+        freeaddrinfo(sourceaddr);
+
+        if (setsockopt(sockfd, level,
+                       include ? MCAST_JOIN_SOURCE_GROUP : MCAST_BLOCK_SOURCE,
+                       (const void *)&mreqs, sizeof(mreqs)) < 0) {
+            if (include)
+                log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_JOIN_SOURCE_GROUP)");
+            else
+                log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_BLOCK_SOURCE)");
+            return ff_neterrno();
+        }
+    }
+#elif HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE)
+    int i;
+    if (addr->sa_family != AF_INET) {
+        av_log(NULL, AV_LOG_ERROR,
+               "Setting multicast sources only supported for IPv4\n");
+        return AVERROR(EINVAL);
+    }
+    for (i = 0; i < nb_sources; i++) {
+        struct ip_mreq_source mreqs;
+        struct addrinfo *sourceaddr = udp_resolve_host(sources[i], 0,
+                                                       SOCK_DGRAM, AF_UNSPEC,
+                                                       AI_NUMERICHOST);
+        if (!sourceaddr)
+            return AVERROR(ENOENT);
+        if (sourceaddr->ai_addr->sa_family != AF_INET) {
+            freeaddrinfo(sourceaddr);
+            av_log(NULL, AV_LOG_ERROR, "%s is of incorrect protocol family\n",
+                   sources[i]);
+            return AVERROR(EINVAL);
+        }
+
+        mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
+        mreqs.imr_interface.s_addr = INADDR_ANY;
+        mreqs.imr_sourceaddr.s_addr = ((struct sockaddr_in *)sourceaddr->ai_addr)->sin_addr.s_addr;
+        freeaddrinfo(sourceaddr);
+
+        if (setsockopt(sockfd, IPPROTO_IP,
+                       include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE,
+                       (const void *)&mreqs, sizeof(mreqs)) < 0) {
+            if (include)
+                log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)");
+            else
+                log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_BLOCK_SOURCE)");
+            return ff_neterrno();
+        }
+    }
+#else
+    return AVERROR(ENOSYS);
+#endif
+    return 0;
+}
 static int udp_set_url(struct sockaddr_storage *addr,
                        const char *hostname, int port)
 {
@@ -319,6 +390,8 @@  static int udp_open(URLContext *h, const char *uri, int flags)
     struct sockaddr_storage my_addr;
     int len;
     int reuse_specified = 0;
+    int i, include = 0, num_sources = 0;
+    char *sources[32];
 
     h->is_streamed = 1;
     h->max_packet_size = 1472;
@@ -356,6 +429,25 @@  static int udp_open(URLContext *h, const char *uri, int flags)
         if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
             av_strlcpy(localaddr, buf, sizeof(localaddr));
         }
+        if (av_find_info_tag(buf, sizeof(buf), "sources", p))
+            include = 1;
+        if (include || av_find_info_tag(buf, sizeof(buf), "block", p)) {
+            char *source_start;
+
+            source_start = buf;
+            while (1) {
+                char *next = strchr(source_start, ',');
+                if (next)
+                    *next = '\0';
+                sources[num_sources] = av_strdup(source_start);
+                if (!sources[num_sources])
+                    goto fail;
+                source_start = next + 1;
+                num_sources++;
+                if (num_sources >= FF_ARRAY_ELEMS(sources) || !next)
+                    break;
+            }
+        }
     }
 
     /* fill the dest addr */
@@ -413,8 +505,21 @@  static int udp_open(URLContext *h, const char *uri, int flags)
         }
         if (h->flags & AVIO_FLAG_READ) {
             /* input */
-            if (udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr) < 0)
+            if (num_sources == 0 || !include) {
+                if (udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr) < 0)
+                    goto fail;
+
+                if (num_sources) {
+                    if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, sources, num_sources, 0) < 0)
+                        goto fail;
+                }
+            } else if (include && num_sources) {
+                if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, sources, num_sources, 1) < 0)
+                    goto fail;
+            } else {
+                av_log(NULL, AV_LOG_ERROR, "invalid udp settings: inclusive multicast but no sources given\n");
                 goto fail;
+            }
         }
     }
 
@@ -442,11 +547,16 @@  static int udp_open(URLContext *h, const char *uri, int flags)
         }
     }
 
+    for (i = 0; i < num_sources; i++)
+        av_free(sources[i]);
+
     s->udp_fd = udp_fd;
     return 0;
  fail:
     if (udp_fd >= 0)
         closesocket(udp_fd);
+    for (i = 0; i < num_sources; i++)
+        av_free(sources[i]);
     return AVERROR(EIO);
 }