[2/3] applehttp: Restructure the demuxer to use a custom AVIOContext

Message ID 1301602776-7844-2-git-send-email-martin@martin.st
State Committed
Headers show

Commit Message

Martin Storsjö March 31, 2011, 8:19 p.m.
This avoids issues where EOF at the end of the segment is given
the variant demuxer. Now the demuxers only see one single data
stream (as when using the applehttp protocol handler).
---
 libavformat/applehttp.c |  335 ++++++++++++++++++++++++-----------------------
 1 files changed, 169 insertions(+), 166 deletions(-)

Comments

Diego Biurrun April 1, 2011, 3:26 p.m. | #1
On Thu, Mar 31, 2011 at 11:19:35PM +0300, Martin Storsjö wrote:
> This avoids issues where EOF at the end of the segment is given
> the variant demuxer. Now the demuxers only see one single data
> stream (as when using the applehttp protocol handler).

.. some nits ..

> --- a/libavformat/applehttp.c
> +++ b/libavformat/applehttp.c
> @@ -246,6 +256,74 @@ fail:
>  
> +static int read_data(void* opaque, uint8_t *buf, int buf_size)

*opaque

> +        if (!v->finished) {
> +            /* If this is a live stream and target_duration has elapsed since
> +             * the last playlist reload, reload the variant playlists now. */
> +            int64_t now = av_gettime();
> +            if (now - v->last_load_time >= v->target_duration*1000000) {
> +                if ((ret = parse_playlist(c, v->url, v, NULL)) < 0)
> +                    return ret;
> +            }
> +        }

I'd merge all of this into one if-statement and drop the variable, i.e.

   if (!v->finished &&
       /* If this is a live stream and target_duration has elapsed since
        * the last playlist reload, reload the variant playlists now. */
       av_gettime() - v->last_load_time >= v->target_duration * 1000000 &&
       (ret = parse_playlist(c, v->url, v, NULL)) < 0)
       return ret;

Diego
Martin Storsjö April 4, 2011, 10 a.m. | #2
On Fri, 1 Apr 2011, Diego Biurrun wrote:

> On Thu, Mar 31, 2011 at 11:19:35PM +0300, Martin Storsjö wrote:
> > This avoids issues where EOF at the end of the segment is given
> > the variant demuxer. Now the demuxers only see one single data
> > stream (as when using the applehttp protocol handler).
> 
> .. some nits ..
> 
> > --- a/libavformat/applehttp.c
> > +++ b/libavformat/applehttp.c
> > @@ -246,6 +256,74 @@ fail:
> >  
> > +static int read_data(void* opaque, uint8_t *buf, int buf_size)
> 
> *opaque
> 
> > +        if (!v->finished) {
> > +            /* If this is a live stream and target_duration has elapsed since
> > +             * the last playlist reload, reload the variant playlists now. */
> > +            int64_t now = av_gettime();
> > +            if (now - v->last_load_time >= v->target_duration*1000000) {
> > +                if ((ret = parse_playlist(c, v->url, v, NULL)) < 0)
> > +                    return ret;
> > +            }
> > +        }
> 
> I'd merge all of this into one if-statement and drop the variable, i.e.
> 
>    if (!v->finished &&
>        /* If this is a live stream and target_duration has elapsed since
>         * the last playlist reload, reload the variant playlists now. */
>        av_gettime() - v->last_load_time >= v->target_duration * 1000000 &&
>        (ret = parse_playlist(c, v->url, v, NULL)) < 0)
>        return ret;

Both fixed locally, thanks!

// Martin

Patch

diff --git a/libavformat/applehttp.c b/libavformat/applehttp.c
index 815013d..db1dc61 100644
--- a/libavformat/applehttp.c
+++ b/libavformat/applehttp.c
@@ -30,6 +30,9 @@ 
 #include "avformat.h"
 #include "internal.h"
 #include <unistd.h>
+#include "avio_internal.h"
+
+#define INITIAL_BUFFER_SIZE 32768
 
 /*
  * An apple http stream consists of a playlist with media segment files,
@@ -56,7 +59,11 @@  struct segment {
 struct variant {
     int bandwidth;
     char url[MAX_URL_SIZE];
-    AVIOContext *pb;
+    AVIOContext pb;
+    uint8_t* read_buffer;
+    URLContext *input;
+    AVFormatContext *parent;
+    int index;
     AVFormatContext *ctx;
     AVPacket pkt;
     int stream_offset;
@@ -66,16 +73,17 @@  struct variant {
     int start_seq_no;
     int n_segments;
     struct segment **segments;
-    int needed;
+    int needed, cur_needed;
+    int cur_seq_no;
+    int64_t last_load_time;
 };
 
 typedef struct AppleHTTPContext {
     int n_variants;
     struct variant **variants;
     int cur_seq_no;
-    int64_t last_load_time;
-    int64_t last_packet_dts;
-    int max_start_seq, min_end_seq;
+    int end_of_segment;
+    int first_packet;
 } AppleHTTPContext;
 
 static int read_chomp_line(AVIOContext *s, char *buf, int maxlen)
@@ -102,8 +110,9 @@  static void free_variant_list(AppleHTTPContext *c)
         struct variant *var = c->variants[i];
         free_segment_list(var);
         av_free_packet(&var->pkt);
-        if (var->pb)
-            avio_close(var->pb);
+        av_free(var->pb.buffer);
+        if (var->input)
+            url_close(var->input);
         if (var->ctx) {
             var->ctx->pb = NULL;
             av_close_input_file(var->ctx);
@@ -238,7 +247,8 @@  static int parse_playlist(AppleHTTPContext *c, const char *url,
             }
         }
     }
-    c->last_load_time = av_gettime();
+    if (var)
+        var->last_load_time = av_gettime();
 
 fail:
     if (close_in)
@@ -246,6 +256,74 @@  fail:
     return ret;
 }
 
+static int read_data(void* opaque, uint8_t *buf, int buf_size)
+{
+    struct variant *v = opaque;
+    AppleHTTPContext *c = v->parent->priv_data;
+    int ret, i;
+
+restart:
+    if (!v->input) {
+reload:
+        if (!v->finished) {
+            /* If this is a live stream and target_duration has elapsed since
+             * the last playlist reload, reload the variant playlists now. */
+            int64_t now = av_gettime();
+            if (now - v->last_load_time >= v->target_duration*1000000) {
+                if ((ret = parse_playlist(c, v->url, v, NULL)) < 0)
+                    return ret;
+            }
+        }
+        if (v->cur_seq_no < v->start_seq_no) {
+            av_log(NULL, AV_LOG_WARNING,
+                   "skipping %d segments ahead, expired from playlists\n",
+                   v->start_seq_no - v->cur_seq_no);
+            v->cur_seq_no = v->start_seq_no;
+        }
+        if (v->cur_seq_no >= v->start_seq_no + v->n_segments) {
+            if (v->finished)
+                return AVERROR_EOF;
+            while (av_gettime() - v->last_load_time <
+                   v->target_duration*1000000) {
+                if (url_interrupt_cb())
+                    return AVERROR_EXIT;
+                usleep(100*1000);
+            }
+            /* Enough time has elapsed since the last reload */
+            goto reload;
+        }
+
+        ret = url_open(&v->input,
+                       v->segments[v->cur_seq_no - v->start_seq_no]->url,
+                       URL_RDONLY);
+        if (ret < 0)
+            return ret;
+    }
+    ret = url_read(v->input, buf, buf_size);
+    if (ret > 0)
+        return ret;
+    if (ret < 0 && ret != AVERROR_EOF)
+        return ret;
+    url_close(v->input);
+    v->input = NULL;
+    v->cur_seq_no++;
+
+    c->end_of_segment = 1;
+    c->cur_seq_no = v->cur_seq_no;
+
+    v->needed = 0;
+    for (i = v->stream_offset; i < v->stream_offset + v->ctx->nb_streams; i++) {
+        if (v->parent->streams[i]->discard < AVDISCARD_ALL)
+            v->needed = 1;
+    }
+    if (!v->needed) {
+        av_log(v->parent, AV_LOG_INFO, "No longer receiving variant %d\n",
+               v->index);
+        return AVERROR_EOF;
+    }
+    goto restart;
+}
+
 static int applehttp_read_header(AVFormatContext *s, AVFormatParameters *ap)
 {
     AppleHTTPContext *c = s->priv_data;
@@ -284,20 +362,35 @@  static int applehttp_read_header(AVFormatContext *s, AVFormatParameters *ap)
         s->duration = duration * AV_TIME_BASE;
     }
 
-    c->min_end_seq = INT_MAX;
     /* Open the demuxer for each variant */
     for (i = 0; i < c->n_variants; i++) {
         struct variant *v = c->variants[i];
+        AVInputFormat *in_fmt = NULL;
         if (v->n_segments == 0)
             continue;
-        c->max_start_seq = FFMAX(c->max_start_seq, v->start_seq_no);
-        c->min_end_seq   = FFMIN(c->min_end_seq,   v->start_seq_no +
-                                                   v->n_segments);
-        ret = av_open_input_file(&v->ctx, v->segments[0]->url, NULL, 0, NULL);
+
+        v->index  = i;
+        v->needed = 1;
+        v->parent = s;
+
+        /* If this is a live stream with more than 3 segments, start at the
+         * third last segment. */
+        v->cur_seq_no = v->start_seq_no;
+        if (!v->finished && v->n_segments > 3)
+            v->cur_seq_no = v->start_seq_no + v->n_segments - 3;
+
+        v->read_buffer = av_malloc(INITIAL_BUFFER_SIZE);
+        ffio_init_context(&v->pb, v->read_buffer, INITIAL_BUFFER_SIZE, 0, v,
+                          read_data, NULL, NULL);
+        v->pb.is_streamed = 1;
+        ret = av_probe_input_buffer(&v->pb, &in_fmt, v->segments[0]->url,
+                                    NULL, 0, 0);
+        if (ret < 0)
+            goto fail;
+        ret = av_open_input_stream(&v->ctx, &v->pb, v->segments[0]->url,
+                                   in_fmt, NULL);
         if (ret < 0)
             goto fail;
-        avio_close(v->ctx->pb);
-        v->ctx->pb = NULL;
         v->stream_offset = stream_offset;
         /* Create new AVStreams for each stream in this variant */
         for (j = 0; j < v->ctx->nb_streams; j++) {
@@ -310,13 +403,8 @@  static int applehttp_read_header(AVFormatContext *s, AVFormatParameters *ap)
         }
         stream_offset += v->ctx->nb_streams;
     }
-    c->last_packet_dts = AV_NOPTS_VALUE;
 
-    c->cur_seq_no = c->max_start_seq;
-    /* If this is a live stream with more than 3 segments, start at the
-     * third last segment. */
-    if (!c->variants[0]->finished && c->min_end_seq - c->max_start_seq > 3)
-        c->cur_seq_no = c->min_end_seq - 2;
+    c->first_packet = 1;
 
     return 0;
 fail:
@@ -324,98 +412,61 @@  fail:
     return ret;
 }
 
-static int open_variant(AppleHTTPContext *c, struct variant *var, int skip)
+static int recheck_discard_flags(AVFormatContext *s, int first)
 {
-    int ret;
+    AppleHTTPContext *c = s->priv_data;
+    int i, changed = 0;
 
-    if (c->cur_seq_no < var->start_seq_no) {
-        av_log(NULL, AV_LOG_WARNING,
-               "seq %d not available in variant %s, skipping\n",
-               var->start_seq_no, var->url);
-        return 0;
+    /* Check if any new streams are needed */
+    for (i = 0; i < c->n_variants; i++)
+        c->variants[i]->cur_needed = 0;;
+
+    for (i = 0; i < s->nb_streams; i++) {
+        AVStream *st = s->streams[i];
+        struct variant *var = c->variants[s->streams[i]->id];
+        if (st->discard < AVDISCARD_ALL)
+            var->cur_needed = 1;
     }
-    if (c->cur_seq_no - var->start_seq_no >= var->n_segments)
-        return c->variants[0]->finished ? AVERROR_EOF : 0;
-    ret = avio_open(&var->pb,
-                    var->segments[c->cur_seq_no - var->start_seq_no]->url,
-                    URL_RDONLY);
-    if (ret < 0)
-        return ret;
-    var->ctx->pb = var->pb;
-    /* If this is a new segment in parallel with another one already opened,
-     * skip ahead so they're all at the same dts. */
-    if (skip && c->last_packet_dts != AV_NOPTS_VALUE) {
-        while (1) {
-            ret = av_read_frame(var->ctx, &var->pkt);
-            if (ret < 0) {
-                if (ret == AVERROR_EOF) {
-                    reset_packet(&var->pkt);
-                    return 0;
-                }
-                return ret;
-            }
-            if (var->pkt.dts >= c->last_packet_dts)
-                break;
-            av_free_packet(&var->pkt);
+    for (i = 0; i < c->n_variants; i++) {
+        struct variant *v = c->variants[i];
+        if (v->cur_needed && !v->needed) {
+            v->needed = 1;
+            changed = 1;
+            v->cur_seq_no = c->cur_seq_no;
+            v->pb.eof_reached = 0;
+            av_log(s, AV_LOG_INFO, "Now receiving variant %d\n", i);
+        } else if (first && !v->cur_needed && v->needed) {
+            if (v->input)
+                url_close(v->input);
+            v->input = NULL;
+            v->needed = 0;
+            changed = 1;
+            av_log(s, AV_LOG_INFO, "No longer receiving variant %d\n", i);
         }
     }
-    return 0;
+    return changed;
 }
 
 static int applehttp_read_packet(AVFormatContext *s, AVPacket *pkt)
 {
     AppleHTTPContext *c = s->priv_data;
-    int ret, i, minvariant = -1, first = 1, needed = 0, changed = 0,
-        variants = 0;
+    int ret, i, minvariant = -1;
 
-    /* Recheck the discard flags - which streams are desired at the moment */
-    for (i = 0; i < c->n_variants; i++)
-        c->variants[i]->needed = 0;
-    for (i = 0; i < s->nb_streams; i++) {
-        AVStream *st = s->streams[i];
-        struct variant *var = c->variants[s->streams[i]->id];
-        if (st->discard < AVDISCARD_ALL) {
-            var->needed = 1;
-            needed++;
-        }
-        /* Copy the discard flag to the chained demuxer, to indicate which
-         * streams are desired. */
-        var->ctx->streams[i - var->stream_offset]->discard = st->discard;
+    if (c->first_packet) {
+        recheck_discard_flags(s, 1);
+        c->first_packet = 0;
     }
-    if (!needed)
-        return AVERROR_EOF;
+
 start:
+    c->end_of_segment = 0;
     for (i = 0; i < c->n_variants; i++) {
         struct variant *var = c->variants[i];
-        /* Close unneeded streams, open newly requested streams */
-        if (var->pb && !var->needed) {
-            av_log(s, AV_LOG_DEBUG,
-                   "Closing variant stream %d, no longer needed\n", i);
-            av_free_packet(&var->pkt);
-            reset_packet(&var->pkt);
-            avio_close(var->pb);
-            var->pb = NULL;
-            changed = 1;
-        } else if (!var->pb && var->needed) {
-            if (first)
-                av_log(s, AV_LOG_DEBUG, "Opening variant stream %d\n", i);
-            if (first && !var->finished)
-                if ((ret = parse_playlist(c, var->url, var, NULL)) < 0)
-                    return ret;
-            ret = open_variant(c, var, first);
-            if (ret < 0)
-                return ret;
-            changed = 1;
-        }
-        /* Count the number of open variants */
-        if (var->pb)
-            variants++;
         /* Make sure we've got one buffered packet from each open variant
          * stream */
-        if (var->pb && !var->pkt.data) {
+        if (var->needed && !var->pkt.data) {
             ret = av_read_frame(var->ctx, &var->pkt);
             if (ret < 0) {
-                if (!var->pb->eof_reached)
+                if (!var->pb.eof_reached)
                     return ret;
                 reset_packet(&var->pkt);
             }
@@ -427,71 +478,18 @@  start:
                 minvariant = i;
         }
     }
-    if (first && changed)
-        av_log(s, AV_LOG_INFO, "Receiving %d variant streams\n", variants);
+    if (c->end_of_segment) {
+        if (recheck_discard_flags(s, 0))
+            goto start;
+    }
     /* If we got a packet, return it */
     if (minvariant >= 0) {
         *pkt = c->variants[minvariant]->pkt;
         pkt->stream_index += c->variants[minvariant]->stream_offset;
         reset_packet(&c->variants[minvariant]->pkt);
-        c->last_packet_dts = pkt->dts;
         return 0;
     }
-    /* No more packets - eof reached in all variant streams, close the
-     * current segments. */
-    for (i = 0; i < c->n_variants; i++) {
-        struct variant *var = c->variants[i];
-        if (var->pb) {
-            avio_close(var->pb);
-            var->pb = NULL;
-        }
-    }
-    /* Indicate that we're opening the next segment, not opening a new
-     * variant stream in parallel, so we shouldn't try to skip ahead. */
-    first = 0;
-    c->cur_seq_no++;
-reload:
-    if (!c->variants[0]->finished) {
-        /* If this is a live stream and target_duration has elapsed since
-         * the last playlist reload, reload the variant playlists now. */
-        int64_t now = av_gettime();
-        if (now - c->last_load_time >= c->variants[0]->target_duration*1000000) {
-            c->max_start_seq = 0;
-            c->min_end_seq   = INT_MAX;
-            for (i = 0; i < c->n_variants; i++) {
-                struct variant *var = c->variants[i];
-                if (var->needed) {
-                    if ((ret = parse_playlist(c, var->url, var, NULL)) < 0)
-                        return ret;
-                    c->max_start_seq = FFMAX(c->max_start_seq,
-                                             var->start_seq_no);
-                    c->min_end_seq   = FFMIN(c->min_end_seq,
-                                             var->start_seq_no + var->n_segments);
-                }
-            }
-        }
-    }
-    if (c->cur_seq_no < c->max_start_seq) {
-        av_log(NULL, AV_LOG_WARNING,
-               "skipping %d segments ahead, expired from playlists\n",
-               c->max_start_seq - c->cur_seq_no);
-        c->cur_seq_no = c->max_start_seq;
-    }
-    /* If more segments exist, open the next one */
-    if (c->cur_seq_no < c->min_end_seq)
-        goto start;
-    /* We've reached the end of the playlists - return eof if this is a
-     * non-live stream, wait until the next playlist reload if it is live. */
-    if (c->variants[0]->finished)
-        return AVERROR_EOF;
-    while (av_gettime() - c->last_load_time <
-           c->variants[0]->target_duration*1000000) {
-        if (url_interrupt_cb())
-            return AVERROR_EXIT;
-        usleep(100*1000);
-    }
-    /* Enough time has elapsed since the last reload */
-    goto reload;
+    return AVERROR_EOF;
 }
 
 static int applehttp_close(AVFormatContext *s)
@@ -506,38 +504,43 @@  static int applehttp_read_seek(AVFormatContext *s, int stream_index,
                                int64_t timestamp, int flags)
 {
     AppleHTTPContext *c = s->priv_data;
-    int64_t pos = 0;
-    int i;
-    struct variant *var = c->variants[0];
+    int i, j, ret;
 
     if ((flags & AVSEEK_FLAG_BYTE) || !c->variants[0]->finished)
         return AVERROR(ENOSYS);
 
     /* Reset the variants */
-    c->last_packet_dts = AV_NOPTS_VALUE;
     for (i = 0; i < c->n_variants; i++) {
         struct variant *var = c->variants[i];
-        if (var->pb) {
-            avio_close(var->pb);
-            var->pb = NULL;
+        if (var->input) {
+            url_close(var->input);
+            var->input = NULL;
         }
         av_free_packet(&var->pkt);
         reset_packet(&var->pkt);
+        var->pb.eof_reached = 0;
     }
 
     timestamp = av_rescale_rnd(timestamp, 1, stream_index >= 0 ?
                                s->streams[stream_index]->time_base.den :
                                AV_TIME_BASE, flags & AVSEEK_FLAG_BACKWARD ?
                                AV_ROUND_DOWN : AV_ROUND_UP);
-    /* Locate the segment that contains the target timestamp */
-    for (i = 0; i < var->n_segments; i++) {
-        if (timestamp >= pos && timestamp < pos + var->segments[i]->duration) {
-            c->cur_seq_no = var->start_seq_no + i;
-            return 0;
+    ret = AVERROR(EIO);
+    for (i = 0; i < c->n_variants; i++) {
+        struct variant *var = c->variants[i];
+        int64_t pos = 0;
+        /* Locate the segment that contains the target timestamp */
+        for (j = 0; j < var->n_segments; j++) {
+            if (timestamp >= pos &&
+                timestamp < pos + var->segments[j]->duration) {
+                var->cur_seq_no = var->start_seq_no + j;
+                ret = 0;
+                break;
+            }
+            pos += var->segments[j]->duration;
         }
-        pos += var->segments[i]->duration;
     }
-    return AVERROR(EIO);
+    return ret;
 }
 
 static int applehttp_probe(AVProbeData *p)