[RFC,3/3] rtsp: Use buffered IO for reading from the control channel

Message ID 1421847687-2895-3-git-send-email-martin@martin.st
State New
Headers show

Commit Message

Martin Storsjö Jan. 21, 2015, 1:41 p.m.
Since R/W AVIOContexts aren't fully supported (it only
supports one single buffer, either for reading or writing),
we open this with the read flag only, and keep doing all the writes
directly to the underlying URLContext, bypassing the read buffer.
---
This may potentially help utilize the underlying network better,
by reading larger buffers from the socket at a time into the
buffer, instead of consuming one single byte at a time (when
reading rtsp reply headers).

I'm not sure if this actually does help significantly or not
- testing of that is appreciated.
---
 libavformat/avio_internal.h |  2 ++
 libavformat/aviobuf.c       |  5 +++++
 libavformat/rtsp.c          | 45 ++++++++++++++++++++++++++++++---------------
 libavformat/rtsp.h          |  2 +-
 libavformat/rtspdec.c       | 15 ++++++++-------
 libavformat/rtspenc.c       |  4 ++--
 6 files changed, 48 insertions(+), 25 deletions(-)

Comments

Luca Barbato Jan. 21, 2015, 2:08 p.m. | #1
On 21/01/15 14:41, Martin Storsjö wrote:
> Since R/W AVIOContexts aren't fully supported (it only
> supports one single buffer, either for reading or writing),
> we open this with the read flag only, and keep doing all the writes
> directly to the underlying URLContext, bypassing the read buffer.
> ---
> This may potentially help utilize the underlying network better,
> by reading larger buffers from the socket at a time into the
> buffer, instead of consuming one single byte at a time (when
> reading rtsp reply headers).
>
> I'm not sure if this actually does help significantly or not
> - testing of that is appreciated.

The idea looks nice, some notes below.

> ---
>   libavformat/avio_internal.h |  2 ++
>   libavformat/aviobuf.c       |  5 +++++
>   libavformat/rtsp.c          | 45 ++++++++++++++++++++++++++++++---------------
>   libavformat/rtsp.h          |  2 +-
>   libavformat/rtspdec.c       | 15 ++++++++-------
>   libavformat/rtspenc.c       |  4 ++--
>   6 files changed, 48 insertions(+), 25 deletions(-)
>
> diff --git a/libavformat/avio_internal.h b/libavformat/avio_internal.h
> index a8bcadd..2a6dac3 100644
> --- a/libavformat/avio_internal.h
> +++ b/libavformat/avio_internal.h
> @@ -85,6 +85,8 @@ int ffio_rewind_with_probe_data(AVIOContext *s, unsigned char *buf, int buf_size
>
>   uint64_t ffio_read_varlen(AVIOContext *bc);
>
> +int ffio_has_buffered(AVIOContext *s);
> +
>   /** @warning must be called before any I/O */
>   int ffio_set_buf_size(AVIOContext *s, int buf_size);
>
> diff --git a/libavformat/aviobuf.c b/libavformat/aviobuf.c
> index 6923b78..9015648 100644
> --- a/libavformat/aviobuf.c
> +++ b/libavformat/aviobuf.c
> @@ -496,6 +496,11 @@ int avio_read(AVIOContext *s, unsigned char *buf, int size)
>       return size1 - size;
>   }
>
> +int ffio_has_buffered(AVIOContext *s)
> +{
> +    return s->buf_end > s->buf_ptr;
> +}

if buf_ptr is equal to buf_start the condition would be true as well.


> -        ffurl_read_complete(rt->rtsp_hd, content, content_length);
> +        avio_read(rt->rtsp_hd, content, content_length);

We should check the return value I guess.

>           content[content_length] = '\0';


The rest seems workable, might be interesting wrapping the check in 
something like url_poll()/avio_poll() later.

I'll try to think something about it.

Julius, may I rely on you regarding additional testing?

lu
Anton Khirnov July 31, 2015, 5:31 a.m. | #2
Quoting Martin Storsjö (2015-01-21 14:41:27)
> Since R/W AVIOContexts aren't fully supported (it only
> supports one single buffer, either for reading or writing),
> we open this with the read flag only, and keep doing all the writes
> directly to the underlying URLContext, bypassing the read buffer.
> ---
> This may potentially help utilize the underlying network better,
> by reading larger buffers from the socket at a time into the
> buffer, instead of consuming one single byte at a time (when
> reading rtsp reply headers).
> 
> I'm not sure if this actually does help significantly or not
> - testing of that is appreciated.
> ---

Hmm, I don't know about this. Using both layers simultaneously, when
they are not designed for it, sounds like it could be more fragile than
the current approach. So unless there are significant gains from it, I'd
probably say it's better not to do it.

Patch

diff --git a/libavformat/avio_internal.h b/libavformat/avio_internal.h
index a8bcadd..2a6dac3 100644
--- a/libavformat/avio_internal.h
+++ b/libavformat/avio_internal.h
@@ -85,6 +85,8 @@  int ffio_rewind_with_probe_data(AVIOContext *s, unsigned char *buf, int buf_size
 
 uint64_t ffio_read_varlen(AVIOContext *bc);
 
+int ffio_has_buffered(AVIOContext *s);
+
 /** @warning must be called before any I/O */
 int ffio_set_buf_size(AVIOContext *s, int buf_size);
 
diff --git a/libavformat/aviobuf.c b/libavformat/aviobuf.c
index 6923b78..9015648 100644
--- a/libavformat/aviobuf.c
+++ b/libavformat/aviobuf.c
@@ -496,6 +496,11 @@  int avio_read(AVIOContext *s, unsigned char *buf, int size)
     return size1 - size;
 }
 
+int ffio_has_buffered(AVIOContext *s)
+{
+    return s->buf_end > s->buf_ptr;
+}
+
 int ffio_read_indirect(AVIOContext *s, unsigned char *buf, int size, const unsigned char **data)
 {
     if (s->buf_end - s->buf_ptr >= size && !s->write_flag) {
diff --git a/libavformat/rtsp.c b/libavformat/rtsp.c
index 947e88f..73d8778 100644
--- a/libavformat/rtsp.c
+++ b/libavformat/rtsp.c
@@ -1053,7 +1053,7 @@  void ff_rtsp_skip_packet(AVFormatContext *s)
     int ret, len, len1;
     uint8_t buf[1024];
 
-    ret = ffurl_read_complete(rt->rtsp_hd, buf, 3);
+    ret = avio_read(rt->rtsp_hd, buf, 3);
     if (ret != 3)
         return;
     len = AV_RB16(buf + 1);
@@ -1065,7 +1065,7 @@  void ff_rtsp_skip_packet(AVFormatContext *s)
         len1 = len;
         if (len1 > sizeof(buf))
             len1 = sizeof(buf);
-        ret = ffurl_read_complete(rt->rtsp_hd, buf, len1);
+        ret = avio_read(rt->rtsp_hd, buf, len1);
         if (ret != len1)
             return;
         len -= len1;
@@ -1095,7 +1095,7 @@  start:
     for (;;) {
         q = buf;
         for (;;) {
-            ret = ffurl_read_complete(rt->rtsp_hd, &ch, 1);
+            ret = avio_read(rt->rtsp_hd, &ch, 1);
             av_dlog(s, "ret=%d c=%02x [%c]\n", ret, ch, ch);
             if (ret != 1)
                 return AVERROR_EOF;
@@ -1149,7 +1149,7 @@  start:
         content = av_malloc(content_length + 1);
         if (!content)
             return AVERROR(ENOMEM);
-        ffurl_read_complete(rt->rtsp_hd, content, content_length);
+        avio_read(rt->rtsp_hd, content, content_length);
         content[content_length] = '\0';
     }
     if (content_ptr)
@@ -1584,9 +1584,10 @@  fail:
 void ff_rtsp_close_connections(AVFormatContext *s)
 {
     RTSPState *rt = s->priv_data;
-    if (rt->rtsp_hd_out != rt->rtsp_hd) ffurl_close(rt->rtsp_hd_out);
-    ffurl_close(rt->rtsp_hd);
-    rt->rtsp_hd = rt->rtsp_hd_out = NULL;
+    if (rt->rtsp_hd_out != rt->rtsp_hd->opaque) ffurl_close(rt->rtsp_hd_out);
+    avio_close(rt->rtsp_hd);
+    rt->rtsp_hd     = NULL;
+    rt->rtsp_hd_out = NULL;
 }
 
 int ff_rtsp_connect(AVFormatContext *s)
@@ -1669,13 +1670,14 @@  redirect:
         char httpname[1024];
         char sessioncookie[17];
         char headers[1024];
+        URLContext *rtsp_hd;
 
         ff_url_join(httpname, sizeof(httpname), "http", auth, host, port, "%s", path);
         snprintf(sessioncookie, sizeof(sessioncookie), "%08x%08x",
                  av_get_random_seed(), av_get_random_seed());
 
         /* GET requests */
-        if (ffurl_alloc(&rt->rtsp_hd, httpname, AVIO_FLAG_READ,
+        if (ffurl_alloc(&rtsp_hd, httpname, AVIO_FLAG_READ,
                         &s->interrupt_callback) < 0) {
             err = AVERROR(EIO);
             goto fail;
@@ -1688,14 +1690,19 @@  redirect:
                  "Pragma: no-cache\r\n"
                  "Cache-Control: no-cache\r\n",
                  sessioncookie);
-        av_opt_set(rt->rtsp_hd->priv_data, "headers", headers, 0);
+        av_opt_set(rtsp_hd->priv_data, "headers", headers, 0);
 
         /* complete the connection */
-        if (ffurl_connect(rt->rtsp_hd, NULL)) {
+        if (ffurl_connect(rtsp_hd, NULL)) {
             err = AVERROR(EIO);
             goto fail;
         }
 
+        if ((err = ffio_fdopen(&rt->rtsp_hd, rtsp_hd)) < 0) {
+            ffurl_close(rtsp_hd);
+            goto fail;
+        }
+
         /* POST requests */
         if (ffurl_alloc(&rt->rtsp_hd_out, httpname, AVIO_FLAG_WRITE,
                         &s->interrupt_callback) < 0 ) {
@@ -1731,7 +1738,7 @@  redirect:
          * count variable between the two sessions, if we'd do more requests
          * with the original session, though.)
          */
-        ff_http_init_auth_state(rt->rtsp_hd_out, rt->rtsp_hd);
+        ff_http_init_auth_state(rt->rtsp_hd_out, rtsp_hd);
 
         /* complete the connection */
         if (ffurl_connect(rt->rtsp_hd_out, NULL)) {
@@ -1742,16 +1749,17 @@  redirect:
         /* open the tcp connection */
         ff_url_join(tcpname, sizeof(tcpname), lower_rtsp_proto, NULL,
                     host, port, NULL);
-        if (ffurl_open(&rt->rtsp_hd, tcpname, AVIO_FLAG_READ_WRITE,
+        if (avio_open2(&rt->rtsp_hd, tcpname, AVIO_FLAG_READ,
                        &s->interrupt_callback, NULL) < 0) {
             err = AVERROR(EIO);
             goto fail;
         }
-        rt->rtsp_hd_out = rt->rtsp_hd;
+        rt->rtsp_hd_out = rt->rtsp_hd->opaque;
+        rt->rtsp_hd_out->flags = AVIO_FLAG_READ_WRITE;
     }
     rt->seq = 0;
 
-    tcp_fd = ffurl_get_file_handle(rt->rtsp_hd);
+    tcp_fd = ffurl_get_file_handle(rt->rtsp_hd->opaque);
     if (tcp_fd < 0) {
         err = tcp_fd;
         goto fail;
@@ -1860,7 +1868,7 @@  static int udp_read_packet(AVFormatContext *s, RTSPStream **prtsp_st,
             return AVERROR(EAGAIN);
         max_p = 0;
         if (rt->rtsp_hd) {
-            tcp_fd = ffurl_get_file_handle(rt->rtsp_hd);
+            tcp_fd = ffurl_get_file_handle(rt->rtsp_hd->opaque);
             p[max_p].fd = tcp_fd;
             p[max_p++].events = POLLIN;
         } else {
@@ -1887,6 +1895,13 @@  static int udp_read_packet(AVFormatContext *s, RTSPStream **prtsp_st,
             }
         }
         n = poll(p, max_p, POLL_TIMEOUT_MS);
+        if (rt->rtsp_hd && ffio_has_buffered(rt->rtsp_hd)) {
+            // If there is buffered data available in the in channel,
+            // treat that as if POLLIN was signaled, indicating that it's
+            // possible to read.
+            p[0].revents |= POLLIN;
+            n++;
+        }
         if (n > 0) {
             int j = 1 - (tcp_fd == -1);
             timeout_cnt = 0;
diff --git a/libavformat/rtsp.h b/libavformat/rtsp.h
index e83ad54..a8dbb14 100644
--- a/libavformat/rtsp.h
+++ b/libavformat/rtsp.h
@@ -217,7 +217,7 @@  enum RTSPServerType {
  */
 typedef struct RTSPState {
     const AVClass *class;             /**< Class for private options. */
-    URLContext *rtsp_hd; /* RTSP TCP connection handle */
+    AVIOContext *rtsp_hd; /* RTSP TCP connection handle */
 
     /** number of items in the 'rtsp_streams' variable */
     int nb_rtsp_streams;
diff --git a/libavformat/rtspdec.c b/libavformat/rtspdec.c
index 418f383..0676bae 100644
--- a/libavformat/rtspdec.c
+++ b/libavformat/rtspdec.c
@@ -76,7 +76,7 @@  static inline int read_line(AVFormatContext *s, char *rbuf, const int rbufsize,
     *rbuflen      = 0;
 
     do {
-        ret = ffurl_read_complete(rt->rtsp_hd, rbuf + idx, 1);
+        ret = avio_read(rt->rtsp_hd, rbuf + idx, 1);
         if (ret <= 0)
             return ret ? ret : AVERROR_EOF;
         if (rbuf[idx] == '\r') {
@@ -186,7 +186,7 @@  static int rtsp_read_announce(AVFormatContext *s)
     }
     if (request.content_length && request.content_length < sizeof(sdp) - 1) {
         /* Read SDP */
-        if (ffurl_read_complete(rt->rtsp_hd, sdp, request.content_length)
+        if (avio_read(rt->rtsp_hd, sdp, request.content_length)
             < request.content_length) {
             av_log(s, AV_LOG_ERROR,
                    "Unable to get complete SDP Description in ANNOUNCE\n");
@@ -641,13 +641,14 @@  static int rtsp_listen(AVFormatContext *s)
     ff_url_join(tcpname, sizeof(tcpname), lower_proto, NULL, host, port,
                 "?listen&listen_timeout=%d", rt->initial_timeout * 1000);
 
-    if (ret = ffurl_open(&rt->rtsp_hd, tcpname, AVIO_FLAG_READ_WRITE,
+    if (ret = avio_open2(&rt->rtsp_hd, tcpname, AVIO_FLAG_READ,
                          &s->interrupt_callback, NULL)) {
         av_log(s, AV_LOG_ERROR, "Unable to open RTSP for listening\n");
         return ret;
     }
     rt->state       = RTSP_STATE_IDLE;
-    rt->rtsp_hd_out = rt->rtsp_hd;
+    rt->rtsp_hd_out = rt->rtsp_hd->opaque;
+    rt->rtsp_hd_out->flags = AVIO_FLAG_READ_WRITE;
     for (;;) { /* Wait for incoming RTSP messages */
         ret = read_line(s, rbuf, sizeof(rbuf), &rbuflen);
         if (ret < 0)
@@ -671,7 +672,7 @@  static int rtsp_listen(AVFormatContext *s)
         } else if (methodcode == SETUP)
             ret = rtsp_read_setup(s, host, uri);
         if (ret) {
-            ffurl_close(rt->rtsp_hd);
+            avio_close(rt->rtsp_hd);
             return AVERROR_INVALIDDATA;
         }
     }
@@ -747,7 +748,7 @@  redo:
         if (rt->state != RTSP_STATE_STREAMING)
             return 0;
     }
-    ret = ffurl_read_complete(rt->rtsp_hd, buf, 3);
+    ret = avio_read(rt->rtsp_hd, buf, 3);
     if (ret != 3)
         return -1;
     id  = buf[0];
@@ -756,7 +757,7 @@  redo:
     if (len > buf_size || len < 12)
         goto redo;
     /* get the data */
-    ret = ffurl_read_complete(rt->rtsp_hd, buf, len);
+    ret = avio_read(rt->rtsp_hd, buf, len);
     if (ret != len)
         return -1;
     if (rt->transport == RTSP_TRANSPORT_RDT &&
diff --git a/libavformat/rtspenc.c b/libavformat/rtspenc.c
index 3db53ac..583c0da 100644
--- a/libavformat/rtspenc.c
+++ b/libavformat/rtspenc.c
@@ -180,7 +180,7 @@  static int rtsp_write_packet(AVFormatContext *s, AVPacket *pkt)
     RTSPState *rt = s->priv_data;
     RTSPStream *rtsp_st;
     int n;
-    struct pollfd p = {ffurl_get_file_handle(rt->rtsp_hd), POLLIN, 0};
+    struct pollfd p = {ffurl_get_file_handle(rt->rtsp_hd->opaque), POLLIN, 0};
     AVFormatContext *rtpctx;
     int ret;
 
@@ -188,7 +188,7 @@  static int rtsp_write_packet(AVFormatContext *s, AVPacket *pkt)
         n = poll(&p, 1, 0);
         if (n <= 0)
             break;
-        if (p.revents & POLLIN) {
+        if (p.revents & POLLIN || ffio_has_buffered(rt->rtsp_hd)) {
             RTSPMessageHeader reply;
 
             /* Don't let ff_rtsp_read_reply handle interleaved packets,