diff options
Diffstat (limited to 'src/input/plugins/CurlInputPlugin.cxx')
-rw-r--r-- | src/input/plugins/CurlInputPlugin.cxx | 100 |
1 files changed, 49 insertions, 51 deletions
diff --git a/src/input/plugins/CurlInputPlugin.cxx b/src/input/plugins/CurlInputPlugin.cxx index 5f8b9150a..63fb84190 100644 --- a/src/input/plugins/CurlInputPlugin.cxx +++ b/src/input/plugins/CurlInputPlugin.cxx @@ -67,9 +67,7 @@ static const size_t CURL_MAX_BUFFERED = 512 * 1024; */ static const size_t CURL_RESUME_AT = 384 * 1024; -struct CurlInputStream { - InputStream base; - +struct CurlInputStream final : public InputStream { /* some buffers which were passed to libcurl, which we have too free */ char range[32]; @@ -106,9 +104,9 @@ struct CurlInputStream { Error postponed_error; - CurlInputStream(const char *url, Mutex &mutex, Cond &cond, + CurlInputStream(const char *_url, Mutex &_mutex, Cond &_cond, void *_buffer) - :base(input_plugin_curl, url, mutex, cond), + :InputStream(input_plugin_curl, _url, _mutex, _cond), request_headers(nullptr), buffer((uint8_t *)_buffer, CURL_MAX_BUFFERED), paused(false), @@ -486,7 +484,7 @@ CurlInputStream::RequestDone(CURLcode result, long status) FreeEasy(); - const ScopeLock protect(base.mutex); + const ScopeLock protect(mutex); if (result != CURLE_OK) { postponed_error.Format(curl_domain, result, @@ -497,7 +495,7 @@ CurlInputStream::RequestDone(CURLcode result, long status) status); } - base.SetReady(); + SetReady(); } static void @@ -688,7 +686,7 @@ inline bool CurlInputStream::FillBuffer(Error &error) { while (easy != nullptr && buffer.IsEmpty()) - base.cond.wait(base.mutex); + cond.wait(mutex); if (postponed_error.IsDefined()) { error = std::move(postponed_error); @@ -768,7 +766,7 @@ input_curl_available(InputStream *is) } inline size_t -CurlInputStream::Read(void *ptr, size_t size, Error &error) +CurlInputStream::Read(void *ptr, size_t read_size, Error &error) { size_t nbytes; @@ -778,22 +776,22 @@ CurlInputStream::Read(void *ptr, size_t size, Error &error) if (!FillBuffer(error)) return 0; - nbytes = read_from_buffer(icy, buffer, ptr, size); + nbytes = read_from_buffer(icy, buffer, ptr, read_size); } while (nbytes == 0); if (icy.IsDefined()) CopyIcyTag(); - base.offset += (InputPlugin::offset_type)nbytes; + offset += (InputPlugin::offset_type)nbytes; if (paused && GetTotalBufferSize() < CURL_RESUME_AT) { - base.mutex.unlock(); + mutex.unlock(); BlockingCall(io_thread_get(), [this](){ Resume(); }); - base.mutex.lock(); + mutex.lock(); } return nbytes; @@ -828,11 +826,11 @@ CurlInputStream::HeaderReceived(const char *name, std::string &&value) if (StringEqualsCaseASCII(name, "accept-ranges")) { /* a stream with icy-metadata is not seekable */ if (!icy.IsDefined()) - base.seekable = true; + seekable = true; } else if (StringEqualsCaseASCII(name, "content-length")) { - base.size = base.offset + ParseUint64(value.c_str()); + size = offset + ParseUint64(value.c_str()); } else if (StringEqualsCaseASCII(name, "content-type")) { - base.SetMimeType(std::move(value)); + SetMimeType(std::move(value)); } else if (StringEqualsCaseASCII(name, "icy-name") || StringEqualsCaseASCII(name, "ice-name") || StringEqualsCaseASCII(name, "x-audiocast-name")) { @@ -856,7 +854,7 @@ CurlInputStream::HeaderReceived(const char *name, std::string &&value) /* a stream with icy-metadata is not seekable */ - base.seekable = false; + seekable = false; } } } @@ -898,13 +896,13 @@ input_curl_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream) } inline size_t -CurlInputStream::DataReceived(const void *ptr, size_t size) +CurlInputStream::DataReceived(const void *ptr, size_t received_size) { - assert(size > 0); + assert(received_size > 0); - const ScopeLock protect(base.mutex); + const ScopeLock protect(mutex); - if (size > buffer.GetSpace()) { + if (received_size > buffer.GetSpace()) { paused = true; return CURL_WRITEFUNC_PAUSE; } @@ -912,23 +910,23 @@ CurlInputStream::DataReceived(const void *ptr, size_t size) auto w = buffer.Write(); assert(!w.IsEmpty()); - size_t nbytes = std::min(w.size, size); + size_t nbytes = std::min(w.size, received_size); memcpy(w.data, ptr, nbytes); buffer.Append(nbytes); - const size_t remaining = size - nbytes; + const size_t remaining = received_size - nbytes; if (remaining > 0) { w = buffer.Write(); assert(!w.IsEmpty()); assert(w.size >= remaining); memcpy(w.data, (const uint8_t *)ptr + nbytes, remaining); - buffer.Append(size); + buffer.Append(received_size); } - base.ready = true; - base.cond.broadcast(); - return size; + ready = true; + cond.broadcast(); + return received_size; } /** called by curl when new data is available */ @@ -986,7 +984,7 @@ CurlInputStream::InitEasy(Error &error) curl_easy_setopt(easy, CURLOPT_PROXYUSERPWD, proxy_auth_str); } - CURLcode code = curl_easy_setopt(easy, CURLOPT_URL, base.GetURI()); + CURLcode code = curl_easy_setopt(easy, CURLOPT_URL, GetURI()); if (code != CURLE_OK) { error.Format(curl_domain, code, "curl_easy_setopt() failed: %s", @@ -1003,16 +1001,16 @@ CurlInputStream::InitEasy(Error &error) } inline bool -CurlInputStream::Seek(InputPlugin::offset_type offset, int whence, +CurlInputStream::Seek(InputPlugin::offset_type new_offset, int whence, Error &error) { - assert(base.ready); + assert(IsReady()); - if (whence == SEEK_SET && offset == base.offset) + if (whence == SEEK_SET && new_offset == offset) /* no-op */ return true; - if (!base.seekable) + if (!IsSeekable()) return false; /* calculate the absolute offset */ @@ -1022,52 +1020,52 @@ CurlInputStream::Seek(InputPlugin::offset_type offset, int whence, break; case SEEK_CUR: - offset += base.offset; + new_offset += offset; break; case SEEK_END: - if (base.size < 0) + if (size < 0) /* stream size is not known */ return false; - offset += base.size; + new_offset += size; break; default: return false; } - if (offset < 0) + if (new_offset < 0) return false; /* check if we can fast-forward the buffer */ - while (offset > base.offset) { + while (new_offset > offset) { auto r = buffer.Read(); if (r.IsEmpty()) break; const size_t nbytes = - offset - base.offset < (InputPlugin::offset_type)r.size - ? offset - base.offset - : r.size; + new_offset - offset < (InputPlugin::offset_type)r.size + ? new_offset - offset + : r.size; buffer.Consume(nbytes); - base.offset += nbytes; + offset += nbytes; } - if (offset == base.offset) + if (new_offset == offset) return true; /* close the old connection and open a new one */ - base.mutex.unlock(); + mutex.unlock(); FreeEasyIndirect(); buffer.Clear(); - base.offset = offset; - if (base.offset == base.size) { + offset = new_offset; + if (offset == size) { /* seek to EOF: simulate empty result; avoid triggering a "416 Requested Range Not Satisfiable" response */ @@ -1079,18 +1077,18 @@ CurlInputStream::Seek(InputPlugin::offset_type offset, int whence, /* send the "Range" header */ - if (base.offset > 0) { - sprintf(range, "%lld-", (long long)base.offset); + if (offset > 0) { + sprintf(range, "%lld-", (long long)offset); curl_easy_setopt(easy, CURLOPT_RANGE, range); } - base.ready = false; + ready = false; if (!input_curl_easy_add_indirect(this, error)) return false; - base.mutex.lock(); - base.WaitReady(); + mutex.lock(); + WaitReady(); if (postponed_error.IsDefined()) { error = std::move(postponed_error); @@ -1127,7 +1125,7 @@ CurlInputStream::Open(const char *url, Mutex &mutex, Cond &cond, return nullptr; } - return &c->base; + return c; } static InputStream * |