diff options
author | Max Kellermann <max@duempel.org> | 2014-05-02 22:31:02 +0200 |
---|---|---|
committer | Max Kellermann <max@duempel.org> | 2014-05-24 14:36:28 +0200 |
commit | fbafb19657f3337a4d0d5e4783cd91ad0c2963aa (patch) | |
tree | 5f61ccd2f3646f282e468ce2f8d1873463913802 /src/input/plugins | |
parent | 6c4438d8a9ff12db9bb6323e128a69de6cd4e2b7 (diff) | |
download | mpd-fbafb19657f3337a4d0d5e4783cd91ad0c2963aa.tar.gz mpd-fbafb19657f3337a4d0d5e4783cd91ad0c2963aa.tar.xz mpd-fbafb19657f3337a4d0d5e4783cd91ad0c2963aa.zip |
input/curl: move code to AsyncInputStream
New base class for other InputStream implementations that run in the
I/O thread.
Diffstat (limited to 'src/input/plugins')
-rw-r--r-- | src/input/plugins/CurlInputPlugin.cxx | 259 |
1 files changed, 48 insertions, 211 deletions
diff --git a/src/input/plugins/CurlInputPlugin.cxx b/src/input/plugins/CurlInputPlugin.cxx index b48512ccb..2450c8754 100644 --- a/src/input/plugins/CurlInputPlugin.cxx +++ b/src/input/plugins/CurlInputPlugin.cxx @@ -19,8 +19,8 @@ #include "config.h" #include "CurlInputPlugin.hxx" +#include "../AsyncInputStream.hxx" #include "../IcyInputStream.hxx" -#include "../InputStream.hxx" #include "../InputPlugin.hxx" #include "config/ConfigGlobal.hxx" #include "config/ConfigData.hxx" @@ -60,7 +60,7 @@ static const size_t CURL_MAX_BUFFERED = 512 * 1024; */ static const size_t CURL_RESUME_AT = 384 * 1024; -struct CurlInputStream final : public InputStream { +struct CurlInputStream final : public AsyncInputStream { /* some buffers which were passed to libcurl, which we have too free */ char range[32]; @@ -69,39 +69,19 @@ struct CurlInputStream final : public InputStream { /** the curl handles */ CURL *easy; - /** - * A buffer where input_curl_writefunction() appends - * to, and input_curl_read() reads from. - */ - CircularBuffer<uint8_t> buffer; - - /** - * Is the connection currently paused? That happens when the - * buffer was getting too large. It will be unpaused when the - * buffer is below the threshold again. - */ - bool paused; - /** error message provided by libcurl */ char error_buffer[CURL_ERROR_SIZE]; /** parser for icy-metadata */ IcyInputStream *icy; - /** the tag object ready to be requested via - InputStream::ReadTag() */ - Tag *tag; - - Error postponed_error; - CurlInputStream(const char *_url, Mutex &_mutex, Cond &_cond, void *_buffer) - :InputStream(_url, _mutex, _cond), + :AsyncInputStream(_url, _mutex, _cond, + _buffer, CURL_MAX_BUFFERED, + CURL_RESUME_AT), request_headers(nullptr), - buffer((uint8_t *)_buffer, CURL_MAX_BUFFERED), - paused(false), - icy(new IcyInputStream(this)), - tag(nullptr) {} + icy(new IcyInputStream(this)) {} ~CurlInputStream(); @@ -133,19 +113,6 @@ struct CurlInputStream final : public InputStream { size_t DataReceived(const void *ptr, size_t size); - void Resume(); - bool FillBuffer(Error &error); - - /** - * Returns the number of bytes stored in the buffer. - * - * The caller must lock the mutex. - */ - gcc_pure - size_t GetTotalBufferSize() const { - return buffer.GetSize(); - } - /** * A HTTP request is finished. * @@ -153,22 +120,9 @@ struct CurlInputStream final : public InputStream { */ void RequestDone(CURLcode result, long status); - /* virtual methods from InputStream */ - bool Check(Error &error) override; - - bool IsEOF() override { - return easy == nullptr && buffer.IsEmpty(); - } - - Tag *ReadTag() override; - - bool IsAvailable() override { - return postponed_error.IsDefined() || easy == nullptr || - !buffer.IsEmpty(); - } - - size_t Read(void *ptr, size_t size, Error &error) override; - bool Seek(offset_type offset, Error &error) override; + /* virtual methods from AsyncInputStream */ + virtual void DoResume() override; + virtual void DoSeek(offset_type new_offset) override; }; class CurlMulti; @@ -327,23 +281,24 @@ input_curl_find_request(CURL *easy) return (CurlInputStream *)p; } -inline void -CurlInputStream::Resume() +void +CurlInputStream::DoResume() { assert(io_thread_inside()); - if (paused) { - paused = false; - curl_easy_pause(easy, CURLPAUSE_CONT); + mutex.unlock(); - if (curl_version_num < 0x072000) - /* libcurl older than 7.32.0 does not update - its sockets after curl_easy_pause(); force - libcurl to do it now */ - curl_multi->ResumeSockets(); + curl_easy_pause(easy, CURLPAUSE_CONT); - curl_multi->InvalidateSockets(); - } + if (curl_version_num < 0x072000) + /* libcurl older than 7.32.0 does not update + its sockets after curl_easy_pause(); force + libcurl to do it now */ + curl_multi->ResumeSockets(); + + curl_multi->InvalidateSockets(); + + mutex.lock(); } int @@ -472,6 +427,7 @@ CurlInputStream::RequestDone(CURLcode result, long status) assert(!postponed_error.IsDefined()); FreeEasy(); + AsyncInputStream::SetClosed(); const ScopeLock protect(mutex); @@ -484,7 +440,9 @@ CurlInputStream::RequestDone(CURLcode result, long status) status); } - if (!IsReady()) + if (IsSeekPending()) + SeekDone(); + else if (!IsReady()) SetReady(); } @@ -630,81 +588,16 @@ input_curl_finish(void) CurlInputStream::~CurlInputStream() { - delete tag; - FreeEasyIndirect(); - - buffer.Clear(); - HugeFree(buffer.Write().data, CURL_MAX_BUFFERED); -} - -inline bool -CurlInputStream::Check(Error &error) -{ - bool success = !postponed_error.IsDefined(); - if (!success) { - error = std::move(postponed_error); - postponed_error.Clear(); - } - - return success; -} - -Tag * -CurlInputStream::ReadTag() -{ - Tag *result = tag; - tag = nullptr; - return result; -} - -inline bool -CurlInputStream::FillBuffer(Error &error) -{ - while (easy != nullptr && buffer.IsEmpty()) - cond.wait(mutex); - - if (postponed_error.IsDefined()) { - error = std::move(postponed_error); - postponed_error.Clear(); - return false; - } - - return !buffer.IsEmpty(); -} - -size_t -CurlInputStream::Read(void *ptr, size_t read_size, Error &error) -{ - if (!FillBuffer(error)) - return 0; - - auto r = buffer.Read(); - if (r.IsEmpty()) - return 0; - - const size_t nbytes = std::min(read_size, r.size); - memcpy(ptr, r.data, nbytes); - buffer.Consume(nbytes); - - offset += (InputPlugin::offset_type)nbytes; - - if (paused && GetTotalBufferSize() < CURL_RESUME_AT) { - mutex.unlock(); - - BlockingCall(io_thread_get(), [this](){ - Resume(); - }); - - mutex.lock(); - } - - return nbytes; } inline void CurlInputStream::HeaderReceived(const char *name, std::string &&value) { + if (IsSeekPending()) + /* don't update metadata while seeking */ + return; + if (StringEqualsCaseASCII(name, "accept-ranges")) { /* a stream with icy-metadata is not seekable */ if (!icy->IsEnabled()) @@ -716,12 +609,10 @@ CurlInputStream::HeaderReceived(const char *name, std::string &&value) } else if (StringEqualsCaseASCII(name, "icy-name") || StringEqualsCaseASCII(name, "ice-name") || StringEqualsCaseASCII(name, "x-audiocast-name")) { - delete tag; - TagBuilder tag_builder; tag_builder.AddItem(TAG_NAME, value.c_str()); - tag = tag_builder.CommitNew(); + SetTag(tag_builder.CommitNew()); } else if (StringEqualsCaseASCII(name, "icy-metaint")) { if (icy->IsEnabled()) return; @@ -782,30 +673,15 @@ CurlInputStream::DataReceived(const void *ptr, size_t received_size) const ScopeLock protect(mutex); - if (received_size > buffer.GetSpace()) { - paused = true; - return CURL_WRITEFUNC_PAUSE; - } - - auto w = buffer.Write(); - assert(!w.IsEmpty()); - - size_t nbytes = std::min(w.size, received_size); - memcpy(w.data, ptr, nbytes); - buffer.Append(nbytes); - - const size_t remaining = received_size - nbytes; - if (remaining > 0) { - w = buffer.Write(); - assert(!w.IsEmpty()); - assert(w.size >= remaining); + if (IsSeekPending()) + SeekDone(); - memcpy(w.data, (const uint8_t *)ptr + nbytes, remaining); - buffer.Append(remaining); + if (received_size > GetBufferSpace()) { + AsyncInputStream::Pause(); + return CURL_WRITEFUNC_PAUSE; } - ready = true; - cond.broadcast(); + AppendToBuffer(ptr, received_size); return received_size; } @@ -880,48 +756,16 @@ CurlInputStream::InitEasy(Error &error) return true; } -inline bool -CurlInputStream::Seek(offset_type new_offset, Error &error) +void +CurlInputStream::DoSeek(offset_type new_offset) { assert(IsReady()); - if (new_offset == offset) - /* no-op */ - return true; - - if (!IsSeekable()) - return false; - - /* calculate the absolute offset */ - - if (new_offset < 0) - return false; - - /* check if we can fast-forward the buffer */ - - while (new_offset > offset) { - auto r = buffer.Read(); - if (r.IsEmpty()) - break; - - const size_t nbytes = - new_offset - offset < (InputPlugin::offset_type)r.size - ? new_offset - offset - : r.size; - - buffer.Consume(nbytes); - offset += nbytes; - } - - if (new_offset == offset) - return true; - /* close the old connection and open a new one */ mutex.unlock(); FreeEasyIndirect(); - buffer.Clear(); offset = new_offset; if (offset == size) { @@ -929,12 +773,14 @@ CurlInputStream::Seek(offset_type new_offset, Error &error) triggering a "416 Requested Range Not Satisfiable" response */ mutex.lock(); - return true; + SeekDone(); + return; } - if (!InitEasy(error)) { + if (!InitEasy(postponed_error)) { mutex.lock(); - return false; + SeekDone(); + return; } /* send the "Range" header */ @@ -944,23 +790,14 @@ CurlInputStream::Seek(offset_type new_offset, Error &error) curl_easy_setopt(easy, CURLOPT_RANGE, range); } - ready = false; - - if (!input_curl_easy_add_indirect(this, error)) { + if (!input_curl_easy_add_indirect(this, postponed_error)) { mutex.lock(); - return false; + SeekDone(); + return; } mutex.lock(); - WaitReady(); - - if (postponed_error.IsDefined()) { - error = std::move(postponed_error); - postponed_error.Clear(); - return false; - } - - return true; + offset = new_offset; } inline InputStream * |