aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMax Kellermann <max@duempel.org>2014-03-15 20:40:43 +0100
committerMax Kellermann <max@duempel.org>2014-03-15 20:43:37 +0100
commit23eacbd1326268777e080baa4fcbfbe54652156d (patch)
tree87a1aaad1d8c46cbd7ba3b0a72690cdd1681b0dd
parente9f16fca969bc34068f9ec11299a7302dcb90580 (diff)
downloadmpd-23eacbd1326268777e080baa4fcbfbe54652156d.tar.gz
mpd-23eacbd1326268777e080baa4fcbfbe54652156d.tar.xz
mpd-23eacbd1326268777e080baa4fcbfbe54652156d.zip
input/curl: move code to CurlInputStream methods
-rw-r--r--src/input/plugins/CurlInputPlugin.cxx409
1 files changed, 231 insertions, 178 deletions
diff --git a/src/input/plugins/CurlInputPlugin.cxx b/src/input/plugins/CurlInputPlugin.cxx
index bc4b0e5b6..a339353cd 100644
--- a/src/input/plugins/CurlInputPlugin.cxx
+++ b/src/input/plugins/CurlInputPlugin.cxx
@@ -177,6 +177,63 @@ struct CurlInputStream {
CurlInputStream(const CurlInputStream &) = delete;
CurlInputStream &operator=(const CurlInputStream &) = delete;
+
+ bool Check(Error &error);
+
+ bool IsEOF() const {
+ return easy == nullptr && buffers.empty();
+ }
+
+ Tag *ReadTag();
+
+ bool IsAvailable() const {
+ return postponed_error.IsDefined() || easy == nullptr ||
+ !buffers.empty();
+ }
+
+ size_t Read(void *ptr, size_t size, Error &error);
+
+ /**
+ * Frees the current "libcurl easy" handle, and everything
+ * associated with it.
+ *
+ * Runs in the I/O thread.
+ */
+ void FreeEasy();
+
+ /**
+ * Frees the current "libcurl easy" handle, and everything associated
+ * with it.
+ *
+ * The mutex must not be locked.
+ */
+ void FreeEasyIndirect();
+
+ void HeaderReceived(const char *name,
+ const char *value, const char *end);
+
+ size_t DataReceived(const void *ptr, size_t size);
+
+ void Resume();
+ bool FillBuffer(Error &error);
+
+ /**
+ * Determine the total sizes of all buffers, including
+ * portions that have already been consumed.
+ *
+ * The caller must lock the mutex.
+ */
+ gcc_pure
+ size_t GetTotalBufferSize() const;
+
+ void CopyIcyTag();
+
+ /**
+ * A HTTP request is finished.
+ *
+ * Runs in the I/O thread. The caller must not hold locks.
+ */
+ void RequestDone(CURLcode result, long status);
};
class CurlMulti;
@@ -335,14 +392,14 @@ input_curl_find_request(CURL *easy)
return (CurlInputStream *)p;
}
-static void
-input_curl_resume(CurlInputStream *c)
+inline void
+CurlInputStream::Resume()
{
assert(io_thread_inside());
- if (c->paused) {
- c->paused = false;
- curl_easy_pause(c->easy, CURLPAUSE_CONT);
+ if (paused) {
+ paused = false;
+ curl_easy_pause(easy, CURLPAUSE_CONT);
if (curl_version_num < 0x072000)
/* libcurl older than 7.32.0 does not update
@@ -445,74 +502,55 @@ CurlMulti::Remove(CurlInputStream *c)
curl_multi_remove_handle(multi, c->easy);
}
-/**
- * Frees the current "libcurl easy" handle, and everything associated
- * with it.
- *
- * Runs in the I/O thread.
- */
-static void
-input_curl_easy_free(CurlInputStream *c)
+void
+CurlInputStream::FreeEasy()
{
assert(io_thread_inside());
- assert(c != nullptr);
- if (c->easy == nullptr)
+ if (easy == nullptr)
return;
- curl_multi->Remove(c);
+ curl_multi->Remove(this);
- curl_easy_cleanup(c->easy);
- c->easy = nullptr;
+ curl_easy_cleanup(easy);
+ easy = nullptr;
- curl_slist_free_all(c->request_headers);
- c->request_headers = nullptr;
+ curl_slist_free_all(request_headers);
+ request_headers = nullptr;
}
-/**
- * Frees the current "libcurl easy" handle, and everything associated
- * with it.
- *
- * The mutex must not be locked.
- */
-static void
-input_curl_easy_free_indirect(CurlInputStream *c)
+void
+CurlInputStream::FreeEasyIndirect()
{
- BlockingCall(io_thread_get(), [c](){
- input_curl_easy_free(c);
+ BlockingCall(io_thread_get(), [this](){
+ FreeEasy();
curl_multi->InvalidateSockets();
});
- assert(c->easy == nullptr);
+ assert(easy == nullptr);
}
-/**
- * A HTTP request is finished.
- *
- * Runs in the I/O thread. The caller must not hold locks.
- */
-static void
-input_curl_request_done(CurlInputStream *c, CURLcode result, long status)
+inline void
+CurlInputStream::RequestDone(CURLcode result, long status)
{
assert(io_thread_inside());
- assert(c != nullptr);
- assert(c->easy == nullptr);
- assert(!c->postponed_error.IsDefined());
+ assert(!postponed_error.IsDefined());
+
+ FreeEasy();
- const ScopeLock protect(c->base.mutex);
+ const ScopeLock protect(base.mutex);
if (result != CURLE_OK) {
- c->postponed_error.Format(curl_domain, result,
- "curl failed: %s", c->error_buffer);
+ postponed_error.Format(curl_domain, result,
+ "curl failed: %s", error_buffer);
} else if (status < 200 || status >= 300) {
- c->postponed_error.Format(http_domain, status,
- "got HTTP status %ld",
- status);
+ postponed_error.Format(http_domain, status,
+ "got HTTP status %ld",
+ status);
}
- c->base.ready = true;
-
- c->base.cond.broadcast();
+ base.ready = true;
+ base.cond.broadcast();
}
static void
@@ -524,8 +562,7 @@ input_curl_handle_done(CURL *easy_handle, CURLcode result)
long status = 0;
curl_easy_getinfo(easy_handle, CURLINFO_RESPONSE_CODE, &status);
- input_curl_easy_free(c);
- input_curl_request_done(c, result, status);
+ c->RequestDone(result, status);
}
void
@@ -656,19 +693,11 @@ input_curl_finish(void)
curl_global_cleanup();
}
-/**
- * Determine the total sizes of all buffers, including portions that
- * have already been consumed.
- *
- * The caller must lock the mutex.
- */
-gcc_pure
-static size_t
-curl_total_buffer_size(const CurlInputStream *c)
+size_t
+CurlInputStream::GetTotalBufferSize() const
{
size_t total = 0;
-
- for (const auto &i : c->buffers)
+ for (const auto &i : buffers)
total += i.TotalSize();
return total;
@@ -678,46 +707,56 @@ CurlInputStream::~CurlInputStream()
{
delete tag;
- input_curl_easy_free_indirect(this);
+ FreeEasyIndirect();
}
-static bool
-input_curl_check(InputStream *is, Error &error)
+inline bool
+CurlInputStream::Check(Error &error)
{
- CurlInputStream *c = (CurlInputStream *)is;
-
- bool success = !c->postponed_error.IsDefined();
+ bool success = !postponed_error.IsDefined();
if (!success) {
- error = std::move(c->postponed_error);
- c->postponed_error.Clear();
+ error = std::move(postponed_error);
+ postponed_error.Clear();
}
return success;
}
+static bool
+input_curl_check(InputStream *is, Error &error)
+{
+ CurlInputStream &c = *(CurlInputStream *)is;
+ return c.Check(error);
+}
+
+inline Tag *
+CurlInputStream::ReadTag()
+{
+ Tag *result = tag;
+ tag = nullptr;
+ return result;
+}
+
static Tag *
input_curl_tag(InputStream *is)
{
- CurlInputStream *c = (CurlInputStream *)is;
- Tag *tag = c->tag;
-
- c->tag = nullptr;
- return tag;
+ CurlInputStream &c = *(CurlInputStream *)is;
+ return c.ReadTag();
}
-static bool
-fill_buffer(CurlInputStream *c, Error &error)
+inline bool
+CurlInputStream::FillBuffer(Error &error)
{
- while (c->easy != nullptr && c->buffers.empty())
- c->base.cond.wait(c->base.mutex);
+ while (easy != nullptr && buffers.empty())
+ base.cond.wait(base.mutex);
- if (c->postponed_error.IsDefined()) {
- error = std::move(c->postponed_error);
- c->postponed_error.Clear();
+ if (postponed_error.IsDefined()) {
+ error = std::move(postponed_error);
+ postponed_error.Clear();
return false;
}
- return !c->buffers.empty();
+ return !buffers.empty();
}
static size_t
@@ -770,54 +809,47 @@ read_from_buffer(IcyMetaDataParser &icy, std::list<CurlInputBuffer> &buffers,
return nbytes;
}
-static void
-copy_icy_tag(CurlInputStream *c)
+inline void
+CurlInputStream::CopyIcyTag()
{
- Tag *tag = c->icy.ReadTag();
-
- if (tag == nullptr)
+ Tag *new_tag = icy.ReadTag();
+ if (new_tag == nullptr)
return;
- delete c->tag;
+ delete tag;
- if (!c->meta_name.empty() && !tag->HasType(TAG_NAME)) {
- TagBuilder tag_builder(std::move(*tag));
- tag_builder.AddItem(TAG_NAME, c->meta_name.c_str());
- *tag = tag_builder.Commit();
+ if (!meta_name.empty() && !new_tag->HasType(TAG_NAME)) {
+ TagBuilder tag_builder(std::move(*new_tag));
+ tag_builder.AddItem(TAG_NAME, meta_name.c_str());
+ *new_tag = tag_builder.Commit();
}
- c->tag = tag;
+ tag = new_tag;
}
static bool
input_curl_available(InputStream *is)
{
- CurlInputStream *c = (CurlInputStream *)is;
-
- return c->postponed_error.IsDefined() || c->easy == nullptr ||
- !c->buffers.empty();
+ const CurlInputStream &c = *(const CurlInputStream *)is;
+ return c.IsAvailable();
}
-static size_t
-input_curl_read(InputStream *is, void *ptr, size_t size,
- Error &error)
+inline size_t
+CurlInputStream::Read(void *ptr, size_t size, Error &error)
{
- CurlInputStream *c = (CurlInputStream *)is;
- bool success;
size_t nbytes = 0;
char *dest = (char *)ptr;
do {
/* fill the buffer */
- success = fill_buffer(c, error);
- if (!success)
+ if (!FillBuffer(error))
return 0;
/* send buffer contents */
- while (size > 0 && !c->buffers.empty()) {
- size_t copy = read_from_buffer(c->icy, c->buffers,
+ while (size > 0 && !buffers.empty()) {
+ size_t copy = read_from_buffer(icy, buffers,
dest + nbytes, size);
nbytes += copy;
@@ -825,24 +857,32 @@ input_curl_read(InputStream *is, void *ptr, size_t size,
}
} while (nbytes == 0);
- if (c->icy.IsDefined())
- copy_icy_tag(c);
+ if (icy.IsDefined())
+ CopyIcyTag();
- is->offset += (InputPlugin::offset_type)nbytes;
+ base.offset += (InputPlugin::offset_type)nbytes;
- if (c->paused && curl_total_buffer_size(c) < CURL_RESUME_AT) {
- c->base.mutex.unlock();
+ if (paused && GetTotalBufferSize() < CURL_RESUME_AT) {
+ base.mutex.unlock();
- BlockingCall(io_thread_get(), [c](){
- input_curl_resume(c);
+ BlockingCall(io_thread_get(), [this](){
+ Resume();
});
- c->base.mutex.lock();
+ base.mutex.lock();
}
return nbytes;
}
+static size_t
+input_curl_read(InputStream *is, void *ptr, size_t size,
+ Error &error)
+{
+ CurlInputStream &c = *(CurlInputStream *)is;
+ return c.Read(ptr, size, error);
+}
+
static void
input_curl_close(InputStream *is)
{
@@ -854,76 +894,48 @@ input_curl_close(InputStream *is)
static bool
input_curl_eof(gcc_unused InputStream *is)
{
- CurlInputStream *c = (CurlInputStream *)is;
-
- return c->easy == nullptr && c->buffers.empty();
+ const CurlInputStream &c = *(const CurlInputStream *)is;
+ return c.IsEOF();
}
-/** called by curl when new data is available */
-static size_t
-input_curl_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream)
+inline void
+CurlInputStream::HeaderReceived(const char *name,
+ const char *value, const char *end)
{
- CurlInputStream *c = (CurlInputStream *)stream;
- char name[64];
-
- size *= nmemb;
-
- const char *header = (const char *)ptr;
- const char *end = header + size;
-
- const char *value = (const char *)memchr(header, ':', size);
- if (value == nullptr || (size_t)(value - header) >= sizeof(name))
- return size;
-
- memcpy(name, header, value - header);
- name[value - header] = 0;
-
- /* skip the colon */
-
- ++value;
-
- /* strip the value */
-
- while (value < end && IsWhitespaceOrNull(*value))
- ++value;
-
- while (end > value && IsWhitespaceOrNull(end[-1]))
- --end;
-
if (StringEqualsCaseASCII(name, "accept-ranges")) {
/* a stream with icy-metadata is not seekable */
- if (!c->icy.IsDefined())
- c->base.seekable = true;
+ if (!icy.IsDefined())
+ base.seekable = true;
} else if (StringEqualsCaseASCII(name, "content-length")) {
char buffer[64];
- if ((size_t)(end - header) >= sizeof(buffer))
- return size;
+ if ((size_t)(end - value) >= sizeof(buffer))
+ return;
memcpy(buffer, value, end - value);
buffer[end - value] = 0;
- c->base.size = c->base.offset + ParseUint64(buffer);
+ base.size = base.offset + ParseUint64(buffer);
} else if (StringEqualsCaseASCII(name, "content-type")) {
- c->base.mime.assign(value, end);
+ base.mime.assign(value, end);
} else if (StringEqualsCaseASCII(name, "icy-name") ||
StringEqualsCaseASCII(name, "ice-name") ||
StringEqualsCaseASCII(name, "x-audiocast-name")) {
- c->meta_name.assign(value, end);
+ meta_name.assign(value, end);
- delete c->tag;
+ delete tag;
TagBuilder tag_builder;
- tag_builder.AddItem(TAG_NAME, c->meta_name.c_str());
+ tag_builder.AddItem(TAG_NAME, meta_name.c_str());
- c->tag = tag_builder.CommitNew();
+ tag = tag_builder.CommitNew();
} else if (StringEqualsCaseASCII(name, "icy-metaint")) {
char buffer[64];
size_t icy_metaint;
- if ((size_t)(end - header) >= sizeof(buffer) ||
- c->icy.IsDefined())
- return size;
+ if ((size_t)(end - value) >= sizeof(buffer) ||
+ icy.IsDefined())
+ return;
memcpy(buffer, value, end - value);
buffer[end - value] = 0;
@@ -932,41 +944,82 @@ input_curl_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream)
FormatDebug(curl_domain, "icy-metaint=%zu", icy_metaint);
if (icy_metaint > 0) {
- c->icy.Start(icy_metaint);
+ icy.Start(icy_metaint);
/* a stream with icy-metadata is not
seekable */
- c->base.seekable = false;
+ base.seekable = false;
}
}
-
- return size;
}
/** called by curl when new data is available */
static size_t
-input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream)
+input_curl_headerfunction(void *ptr, size_t size, size_t nmemb, void *stream)
{
- CurlInputStream *c = (CurlInputStream *)stream;
+ CurlInputStream &c = *(CurlInputStream *)stream;
size *= nmemb;
- if (size == 0)
- return 0;
- const ScopeLock protect(c->base.mutex);
+ const char *header = (const char *)ptr;
+ const char *end = header + size;
+
+ char name[64];
+
+ const char *value = (const char *)memchr(header, ':', size);
+ if (value == nullptr || (size_t)(value - header) >= sizeof(name))
+ return size;
+
+ memcpy(name, header, value - header);
+ name[value - header] = 0;
+
+ /* skip the colon */
+
+ ++value;
+
+ /* strip the value */
+
+ while (value < end && IsWhitespaceOrNull(*value))
+ ++value;
+
+ while (end > value && IsWhitespaceOrNull(end[-1]))
+ --end;
- if (curl_total_buffer_size(c) + size >= CURL_MAX_BUFFERED) {
- c->paused = true;
+ c.HeaderReceived(name, value, end);
+ return size;
+}
+
+inline size_t
+CurlInputStream::DataReceived(const void *ptr, size_t size)
+{
+ assert(size > 0);
+
+ const ScopeLock protect(base.mutex);
+
+ if (GetTotalBufferSize() + size >= CURL_MAX_BUFFERED) {
+ paused = true;
return CURL_WRITEFUNC_PAUSE;
}
- c->buffers.emplace_back(ptr, size);
- c->base.ready = true;
-
- c->base.cond.broadcast();
+ buffers.emplace_back(ptr, size);
+ base.ready = true;
+ base.cond.broadcast();
return size;
}
+/** called by curl when new data is available */
+static size_t
+input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream)
+{
+ CurlInputStream &c = *(CurlInputStream *)stream;
+
+ size *= nmemb;
+ if (size == 0)
+ return 0;
+
+ return c.DataReceived(ptr, size);
+}
+
static bool
input_curl_easy_init(CurlInputStream *c, Error &error)
{
@@ -1091,7 +1144,7 @@ input_curl_seek(InputStream *is, InputPlugin::offset_type offset,
c->base.mutex.unlock();
- input_curl_easy_free_indirect(c);
+ c->FreeEasyIndirect();
c->buffers.clear();
is->offset = offset;