aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMax Kellermann <max@duempel.org>2014-05-02 22:31:02 +0200
committerMax Kellermann <max@duempel.org>2014-05-24 14:36:28 +0200
commitfbafb19657f3337a4d0d5e4783cd91ad0c2963aa (patch)
tree5f61ccd2f3646f282e468ce2f8d1873463913802
parent6c4438d8a9ff12db9bb6323e128a69de6cd4e2b7 (diff)
downloadmpd-fbafb19657f3337a4d0d5e4783cd91ad0c2963aa.tar.gz
mpd-fbafb19657f3337a4d0d5e4783cd91ad0c2963aa.tar.xz
mpd-fbafb19657f3337a4d0d5e4783cd91ad0c2963aa.zip
input/curl: move code to AsyncInputStream
New base class for other InputStream implementations that run in the I/O thread.
Diffstat (limited to '')
-rw-r--r--Makefile.am1
-rw-r--r--src/input/AsyncInputStream.cxx239
-rw-r--r--src/input/AsyncInputStream.hxx122
-rw-r--r--src/input/plugins/CurlInputPlugin.cxx259
4 files changed, 410 insertions, 211 deletions
diff --git a/Makefile.am b/Makefile.am
index 913d4bddd..2df5be262 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -1032,6 +1032,7 @@ libinput_a_SOURCES = \
src/input/InputPlugin.hxx \
src/input/TextInputStream.cxx src/input/TextInputStream.hxx \
src/input/ThreadInputStream.cxx src/input/ThreadInputStream.hxx \
+ src/input/AsyncInputStream.cxx src/input/AsyncInputStream.hxx \
src/input/ProxyInputStream.cxx src/input/ProxyInputStream.hxx \
src/input/plugins/RewindInputPlugin.cxx src/input/plugins/RewindInputPlugin.hxx \
src/input/plugins/FileInputPlugin.cxx src/input/plugins/FileInputPlugin.hxx
diff --git a/src/input/AsyncInputStream.cxx b/src/input/AsyncInputStream.cxx
new file mode 100644
index 000000000..3f2d3b90f
--- /dev/null
+++ b/src/input/AsyncInputStream.cxx
@@ -0,0 +1,239 @@
+/*
+ * Copyright (C) 2003-2014 The Music Player Daemon Project
+ * http://www.musicpd.org
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include "config.h"
+#include "AsyncInputStream.hxx"
+#include "tag/Tag.hxx"
+#include "event/Call.hxx"
+#include "thread/Cond.hxx"
+#include "IOThread.hxx"
+#include "util/HugeAllocator.hxx"
+
+#include <assert.h>
+#include <string.h>
+
+AsyncInputStream::AsyncInputStream(const char *_url,
+ Mutex &_mutex, Cond &_cond,
+ void *_buffer, size_t _buffer_size,
+ size_t _resume_at)
+ :InputStream(_url, _mutex, _cond), DeferredMonitor(io_thread_get()),
+ buffer((uint8_t *)_buffer, _buffer_size),
+ resume_at(_resume_at),
+ open(true),
+ paused(false),
+ seek_state(SeekState::NONE),
+ tag(nullptr) {}
+
+AsyncInputStream::~AsyncInputStream()
+{
+ delete tag;
+
+ buffer.Clear();
+ HugeFree(buffer.Write().data, buffer.GetCapacity());
+}
+
+void
+AsyncInputStream::SetTag(Tag *_tag)
+{
+ delete tag;
+ tag = _tag;
+}
+
+void
+AsyncInputStream::Pause()
+{
+ assert(io_thread_inside());
+
+ paused = true;
+}
+
+inline void
+AsyncInputStream::Resume()
+{
+ assert(io_thread_inside());
+
+ if (paused) {
+ paused = false;
+ DoResume();
+ }
+}
+
+bool
+AsyncInputStream::Check(Error &error)
+{
+ bool success = !postponed_error.IsDefined();
+ if (!success) {
+ error = std::move(postponed_error);
+ postponed_error.Clear();
+ }
+
+ return success;
+}
+
+bool
+AsyncInputStream::IsEOF()
+{
+ return !open && buffer.IsEmpty();
+}
+
+bool
+AsyncInputStream::Seek(offset_type new_offset, Error &error)
+{
+ assert(IsReady());
+ assert(seek_state == SeekState::NONE);
+
+ if (new_offset == offset)
+ /* no-op */
+ return true;
+
+ if (!IsSeekable())
+ return false;
+
+ if (new_offset < 0)
+ return false;
+
+ /* check if we can fast-forward the buffer */
+
+ while (new_offset > offset) {
+ auto r = buffer.Read();
+ if (r.IsEmpty())
+ break;
+
+ const size_t nbytes =
+ new_offset - offset < (offset_type)r.size
+ ? new_offset - offset
+ : r.size;
+
+ buffer.Consume(nbytes);
+ offset += nbytes;
+ }
+
+ if (new_offset == offset)
+ return true;
+
+ /* no: ask the implementation to seek */
+
+ seek_offset = new_offset;
+ seek_state = SeekState::SCHEDULED;
+
+ DeferredMonitor::Schedule();
+
+ while (seek_state != SeekState::NONE)
+ cond.wait(mutex);
+
+ if (!Check(error))
+ return false;
+
+ return true;
+}
+
+void
+AsyncInputStream::SeekDone()
+{
+ assert(io_thread_inside());
+ assert(IsSeekPending());
+
+ seek_state = SeekState::NONE;
+ cond.broadcast();
+}
+
+Tag *
+AsyncInputStream::ReadTag()
+{
+ Tag *result = tag;
+ tag = nullptr;
+ return result;
+}
+
+bool
+AsyncInputStream::IsAvailable()
+{
+ return postponed_error.IsDefined() || !open ||
+ !buffer.IsEmpty();
+}
+
+size_t
+AsyncInputStream::Read(void *ptr, size_t read_size, Error &error)
+{
+ assert(!io_thread_inside());
+
+ /* wait for data */
+ CircularBuffer<uint8_t>::Range r;
+ while (true) {
+ if (!Check(error))
+ return 0;
+
+ r = buffer.Read();
+ if (!r.IsEmpty() || !open)
+ break;
+
+ cond.wait(mutex);
+ }
+
+ const size_t nbytes = std::min(read_size, r.size);
+ memcpy(ptr, r.data, nbytes);
+ buffer.Consume(nbytes);
+
+ offset += (offset_type)nbytes;
+
+ if (paused && buffer.GetSize() < resume_at)
+ DeferredMonitor::Schedule();
+
+ return nbytes;
+}
+
+void
+AsyncInputStream::AppendToBuffer(const void *data, size_t append_size)
+{
+ auto w = buffer.Write();
+ assert(!w.IsEmpty());
+
+ size_t nbytes = std::min(w.size, append_size);
+ memcpy(w.data, data, nbytes);
+ buffer.Append(nbytes);
+
+ const size_t remaining = append_size - nbytes;
+ if (remaining > 0) {
+ w = buffer.Write();
+ assert(!w.IsEmpty());
+ assert(w.size >= remaining);
+
+ memcpy(w.data, (const uint8_t *)data + nbytes, remaining);
+ buffer.Append(remaining);
+ }
+
+ if (!IsReady())
+ SetReady();
+ else
+ cond.broadcast();
+}
+
+void
+AsyncInputStream::RunDeferred()
+{
+ const ScopeLock protect(mutex);
+
+ Resume();
+
+ if (seek_state == SeekState::SCHEDULED) {
+ seek_state = SeekState::PENDING;
+ buffer.Clear();
+ DoSeek(seek_offset);
+ }
+}
diff --git a/src/input/AsyncInputStream.hxx b/src/input/AsyncInputStream.hxx
new file mode 100644
index 000000000..f72e7465d
--- /dev/null
+++ b/src/input/AsyncInputStream.hxx
@@ -0,0 +1,122 @@
+/*
+ * Copyright (C) 2003-2014 The Music Player Daemon Project
+ * http://www.musicpd.org
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef MPD_ASYNC_INPUT_STREAM_HXX
+#define MPD_ASYNC_INPUT_STREAM_HXX
+
+#include "InputStream.hxx"
+#include "event/DeferredMonitor.hxx"
+#include "util/CircularBuffer.hxx"
+#include "util/Error.hxx"
+
+/**
+ * Helper class for moving asynchronous (non-blocking) InputStream
+ * implementations to the I/O thread. Data is being read into a ring
+ * buffer, and that buffer is then consumed by another thread using
+ * the regular #InputStream API.
+ */
+class AsyncInputStream : public InputStream, private DeferredMonitor {
+ enum class SeekState : uint8_t {
+ NONE, SCHEDULED, PENDING
+ };
+
+ CircularBuffer<uint8_t> buffer;
+ const size_t resume_at;
+
+ bool open;
+
+ /**
+ * Is the connection currently paused? That happens when the
+ * buffer was getting too large. It will be unpaused when the
+ * buffer is below the threshold again.
+ */
+ bool paused;
+
+ SeekState seek_state;
+
+ /**
+ * The #Tag object ready to be requested via
+ * InputStream::ReadTag().
+ */
+ Tag *tag;
+
+ offset_type seek_offset;
+
+protected:
+ Error postponed_error;
+
+public:
+ AsyncInputStream(const char *_url,
+ Mutex &_mutex, Cond &_cond,
+ void *_buffer, size_t _buffer_size,
+ size_t _resume_at);
+
+ virtual ~AsyncInputStream();
+
+ /* virtual methods from InputStream */
+ bool Check(Error &error) final;
+ bool IsEOF() final;
+ bool Seek(offset_type new_offset, Error &error) final;
+ Tag *ReadTag() final;
+ bool IsAvailable() final;
+ size_t Read(void *ptr, size_t read_size, Error &error) final;
+
+protected:
+ void SetTag(Tag *_tag);
+
+ void Pause();
+
+ void SetClosed() {
+ open = false;
+ }
+
+ bool IsBufferEmpty() const {
+ return buffer.IsEmpty();
+ }
+
+ gcc_pure
+ size_t GetBufferSpace() const {
+ return buffer.GetSpace();
+ }
+
+ void AppendToBuffer(const void *data, size_t append_size);
+
+ virtual void DoResume() = 0;
+
+ /**
+ * The actual Seek() implementation. This virtual method will
+ * be called from within the I/O thread. When the operation
+ * is finished, call SeekDone() to notify the caller.
+ */
+ virtual void DoSeek(offset_type new_offset) = 0;
+
+ bool IsSeekPending() const {
+ return seek_state == SeekState::PENDING;
+ }
+
+ void SeekDone();
+
+private:
+ void Resume();
+
+ /* virtual methods from DeferredMonitor */
+ void RunDeferred() final;
+};
+
+#endif
diff --git a/src/input/plugins/CurlInputPlugin.cxx b/src/input/plugins/CurlInputPlugin.cxx
index b48512ccb..2450c8754 100644
--- a/src/input/plugins/CurlInputPlugin.cxx
+++ b/src/input/plugins/CurlInputPlugin.cxx
@@ -19,8 +19,8 @@
#include "config.h"
#include "CurlInputPlugin.hxx"
+#include "../AsyncInputStream.hxx"
#include "../IcyInputStream.hxx"
-#include "../InputStream.hxx"
#include "../InputPlugin.hxx"
#include "config/ConfigGlobal.hxx"
#include "config/ConfigData.hxx"
@@ -60,7 +60,7 @@ static const size_t CURL_MAX_BUFFERED = 512 * 1024;
*/
static const size_t CURL_RESUME_AT = 384 * 1024;
-struct CurlInputStream final : public InputStream {
+struct CurlInputStream final : public AsyncInputStream {
/* some buffers which were passed to libcurl, which we have
too free */
char range[32];
@@ -69,39 +69,19 @@ struct CurlInputStream final : public InputStream {
/** the curl handles */
CURL *easy;
- /**
- * A buffer where input_curl_writefunction() appends
- * to, and input_curl_read() reads from.
- */
- CircularBuffer<uint8_t> buffer;
-
- /**
- * Is the connection currently paused? That happens when the
- * buffer was getting too large. It will be unpaused when the
- * buffer is below the threshold again.
- */
- bool paused;
-
/** error message provided by libcurl */
char error_buffer[CURL_ERROR_SIZE];
/** parser for icy-metadata */
IcyInputStream *icy;
- /** the tag object ready to be requested via
- InputStream::ReadTag() */
- Tag *tag;
-
- Error postponed_error;
-
CurlInputStream(const char *_url, Mutex &_mutex, Cond &_cond,
void *_buffer)
- :InputStream(_url, _mutex, _cond),
+ :AsyncInputStream(_url, _mutex, _cond,
+ _buffer, CURL_MAX_BUFFERED,
+ CURL_RESUME_AT),
request_headers(nullptr),
- buffer((uint8_t *)_buffer, CURL_MAX_BUFFERED),
- paused(false),
- icy(new IcyInputStream(this)),
- tag(nullptr) {}
+ icy(new IcyInputStream(this)) {}
~CurlInputStream();
@@ -133,19 +113,6 @@ struct CurlInputStream final : public InputStream {
size_t DataReceived(const void *ptr, size_t size);
- void Resume();
- bool FillBuffer(Error &error);
-
- /**
- * Returns the number of bytes stored in the buffer.
- *
- * The caller must lock the mutex.
- */
- gcc_pure
- size_t GetTotalBufferSize() const {
- return buffer.GetSize();
- }
-
/**
* A HTTP request is finished.
*
@@ -153,22 +120,9 @@ struct CurlInputStream final : public InputStream {
*/
void RequestDone(CURLcode result, long status);
- /* virtual methods from InputStream */
- bool Check(Error &error) override;
-
- bool IsEOF() override {
- return easy == nullptr && buffer.IsEmpty();
- }
-
- Tag *ReadTag() override;
-
- bool IsAvailable() override {
- return postponed_error.IsDefined() || easy == nullptr ||
- !buffer.IsEmpty();
- }
-
- size_t Read(void *ptr, size_t size, Error &error) override;
- bool Seek(offset_type offset, Error &error) override;
+ /* virtual methods from AsyncInputStream */
+ virtual void DoResume() override;
+ virtual void DoSeek(offset_type new_offset) override;
};
class CurlMulti;
@@ -327,23 +281,24 @@ input_curl_find_request(CURL *easy)
return (CurlInputStream *)p;
}
-inline void
-CurlInputStream::Resume()
+void
+CurlInputStream::DoResume()
{
assert(io_thread_inside());
- if (paused) {
- paused = false;
- curl_easy_pause(easy, CURLPAUSE_CONT);
+ mutex.unlock();
- if (curl_version_num < 0x072000)
- /* libcurl older than 7.32.0 does not update
- its sockets after curl_easy_pause(); force
- libcurl to do it now */
- curl_multi->ResumeSockets();
+ curl_easy_pause(easy, CURLPAUSE_CONT);
- curl_multi->InvalidateSockets();
- }
+ if (curl_version_num < 0x072000)
+ /* libcurl older than 7.32.0 does not update
+ its sockets after curl_easy_pause(); force
+ libcurl to do it now */
+ curl_multi->ResumeSockets();
+
+ curl_multi->InvalidateSockets();
+
+ mutex.lock();
}
int
@@ -472,6 +427,7 @@ CurlInputStream::RequestDone(CURLcode result, long status)
assert(!postponed_error.IsDefined());
FreeEasy();
+ AsyncInputStream::SetClosed();
const ScopeLock protect(mutex);
@@ -484,7 +440,9 @@ CurlInputStream::RequestDone(CURLcode result, long status)
status);
}
- if (!IsReady())
+ if (IsSeekPending())
+ SeekDone();
+ else if (!IsReady())
SetReady();
}
@@ -630,81 +588,16 @@ input_curl_finish(void)
CurlInputStream::~CurlInputStream()
{
- delete tag;
-
FreeEasyIndirect();
-
- buffer.Clear();
- HugeFree(buffer.Write().data, CURL_MAX_BUFFERED);
-}
-
-inline bool
-CurlInputStream::Check(Error &error)
-{
- bool success = !postponed_error.IsDefined();
- if (!success) {
- error = std::move(postponed_error);
- postponed_error.Clear();
- }
-
- return success;
-}
-
-Tag *
-CurlInputStream::ReadTag()
-{
- Tag *result = tag;
- tag = nullptr;
- return result;
-}
-
-inline bool
-CurlInputStream::FillBuffer(Error &error)
-{
- while (easy != nullptr && buffer.IsEmpty())
- cond.wait(mutex);
-
- if (postponed_error.IsDefined()) {
- error = std::move(postponed_error);
- postponed_error.Clear();
- return false;
- }
-
- return !buffer.IsEmpty();
-}
-
-size_t
-CurlInputStream::Read(void *ptr, size_t read_size, Error &error)
-{
- if (!FillBuffer(error))
- return 0;
-
- auto r = buffer.Read();
- if (r.IsEmpty())
- return 0;
-
- const size_t nbytes = std::min(read_size, r.size);
- memcpy(ptr, r.data, nbytes);
- buffer.Consume(nbytes);
-
- offset += (InputPlugin::offset_type)nbytes;
-
- if (paused && GetTotalBufferSize() < CURL_RESUME_AT) {
- mutex.unlock();
-
- BlockingCall(io_thread_get(), [this](){
- Resume();
- });
-
- mutex.lock();
- }
-
- return nbytes;
}
inline void
CurlInputStream::HeaderReceived(const char *name, std::string &&value)
{
+ if (IsSeekPending())
+ /* don't update metadata while seeking */
+ return;
+
if (StringEqualsCaseASCII(name, "accept-ranges")) {
/* a stream with icy-metadata is not seekable */
if (!icy->IsEnabled())
@@ -716,12 +609,10 @@ CurlInputStream::HeaderReceived(const char *name, std::string &&value)
} else if (StringEqualsCaseASCII(name, "icy-name") ||
StringEqualsCaseASCII(name, "ice-name") ||
StringEqualsCaseASCII(name, "x-audiocast-name")) {
- delete tag;
-
TagBuilder tag_builder;
tag_builder.AddItem(TAG_NAME, value.c_str());
- tag = tag_builder.CommitNew();
+ SetTag(tag_builder.CommitNew());
} else if (StringEqualsCaseASCII(name, "icy-metaint")) {
if (icy->IsEnabled())
return;
@@ -782,30 +673,15 @@ CurlInputStream::DataReceived(const void *ptr, size_t received_size)
const ScopeLock protect(mutex);
- if (received_size > buffer.GetSpace()) {
- paused = true;
- return CURL_WRITEFUNC_PAUSE;
- }
-
- auto w = buffer.Write();
- assert(!w.IsEmpty());
-
- size_t nbytes = std::min(w.size, received_size);
- memcpy(w.data, ptr, nbytes);
- buffer.Append(nbytes);
-
- const size_t remaining = received_size - nbytes;
- if (remaining > 0) {
- w = buffer.Write();
- assert(!w.IsEmpty());
- assert(w.size >= remaining);
+ if (IsSeekPending())
+ SeekDone();
- memcpy(w.data, (const uint8_t *)ptr + nbytes, remaining);
- buffer.Append(remaining);
+ if (received_size > GetBufferSpace()) {
+ AsyncInputStream::Pause();
+ return CURL_WRITEFUNC_PAUSE;
}
- ready = true;
- cond.broadcast();
+ AppendToBuffer(ptr, received_size);
return received_size;
}
@@ -880,48 +756,16 @@ CurlInputStream::InitEasy(Error &error)
return true;
}
-inline bool
-CurlInputStream::Seek(offset_type new_offset, Error &error)
+void
+CurlInputStream::DoSeek(offset_type new_offset)
{
assert(IsReady());
- if (new_offset == offset)
- /* no-op */
- return true;
-
- if (!IsSeekable())
- return false;
-
- /* calculate the absolute offset */
-
- if (new_offset < 0)
- return false;
-
- /* check if we can fast-forward the buffer */
-
- while (new_offset > offset) {
- auto r = buffer.Read();
- if (r.IsEmpty())
- break;
-
- const size_t nbytes =
- new_offset - offset < (InputPlugin::offset_type)r.size
- ? new_offset - offset
- : r.size;
-
- buffer.Consume(nbytes);
- offset += nbytes;
- }
-
- if (new_offset == offset)
- return true;
-
/* close the old connection and open a new one */
mutex.unlock();
FreeEasyIndirect();
- buffer.Clear();
offset = new_offset;
if (offset == size) {
@@ -929,12 +773,14 @@ CurlInputStream::Seek(offset_type new_offset, Error &error)
triggering a "416 Requested Range Not Satisfiable"
response */
mutex.lock();
- return true;
+ SeekDone();
+ return;
}
- if (!InitEasy(error)) {
+ if (!InitEasy(postponed_error)) {
mutex.lock();
- return false;
+ SeekDone();
+ return;
}
/* send the "Range" header */
@@ -944,23 +790,14 @@ CurlInputStream::Seek(offset_type new_offset, Error &error)
curl_easy_setopt(easy, CURLOPT_RANGE, range);
}
- ready = false;
-
- if (!input_curl_easy_add_indirect(this, error)) {
+ if (!input_curl_easy_add_indirect(this, postponed_error)) {
mutex.lock();
- return false;
+ SeekDone();
+ return;
}
mutex.lock();
- WaitReady();
-
- if (postponed_error.IsDefined()) {
- error = std::move(postponed_error);
- postponed_error.Clear();
- return false;
- }
-
- return true;
+ offset = new_offset;
}
inline InputStream *