aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/input/plugins/CurlInputPlugin.cxx208
1 files changed, 75 insertions, 133 deletions
diff --git a/src/input/plugins/CurlInputPlugin.cxx b/src/input/plugins/CurlInputPlugin.cxx
index 60facb26b..37509e998 100644
--- a/src/input/plugins/CurlInputPlugin.cxx
+++ b/src/input/plugins/CurlInputPlugin.cxx
@@ -33,6 +33,8 @@
#include "util/ASCII.hxx"
#include "util/CharUtil.hxx"
#include "util/NumberParser.hxx"
+#include "util/CircularBuffer.hxx"
+#include "util/HugeAllocator.hxx"
#include "util/Error.hxx"
#include "util/Domain.hxx"
#include "Log.hxx"
@@ -46,9 +48,6 @@
#endif
#include <string.h>
-#include <errno.h>
-
-#include <list>
#include <curl/curl.h>
@@ -68,68 +67,6 @@ static const size_t CURL_MAX_BUFFERED = 512 * 1024;
*/
static const size_t CURL_RESUME_AT = 384 * 1024;
-/**
- * Buffers created by input_curl_writefunction().
- */
-class CurlInputBuffer {
- /** size of the payload */
- size_t size;
-
- /** how much has been consumed yet? */
- size_t consumed;
-
- /** the payload */
- uint8_t *data;
-
-public:
- CurlInputBuffer(const void *_data, size_t _size)
- :size(_size), consumed(0), data(new uint8_t[size]) {
- memcpy(data, _data, size);
- }
-
- ~CurlInputBuffer() {
- delete[] data;
- }
-
- CurlInputBuffer(const CurlInputBuffer &) = delete;
- CurlInputBuffer &operator=(const CurlInputBuffer &) = delete;
-
- const void *Begin() const {
- return data + consumed;
- }
-
- size_t TotalSize() const {
- return size;
- }
-
- size_t Available() const {
- return size - consumed;
- }
-
- /**
- * Mark a part of the buffer as consumed.
- *
- * @return false if the buffer is now empty
- */
- bool Consume(size_t length) {
- assert(consumed < size);
-
- consumed += length;
- if (consumed < size)
- return true;
-
- assert(consumed == size);
- return false;
- }
-
- bool Read(void *dest, size_t length) {
- assert(consumed + length <= size);
-
- memcpy(dest, data + consumed, length);
- return Consume(length);
- }
-};
-
struct CurlInputStream {
InputStream base;
@@ -141,9 +78,11 @@ struct CurlInputStream {
/** the curl handles */
CURL *easy;
- /** list of buffers, where input_curl_writefunction() appends
- to, and input_curl_read() reads from them */
- std::list<CurlInputBuffer> buffers;
+ /**
+ * A buffer where input_curl_writefunction() appends
+ * to, and input_curl_read() reads from.
+ */
+ CircularBuffer<uint8_t> buffer;
/**
* Is the connection currently paused? That happens when the
@@ -167,9 +106,11 @@ struct CurlInputStream {
Error postponed_error;
- CurlInputStream(const char *url, Mutex &mutex, Cond &cond)
+ CurlInputStream(const char *url, Mutex &mutex, Cond &cond,
+ void *_buffer)
:base(input_plugin_curl, url, mutex, cond),
request_headers(nullptr),
+ buffer((uint8_t *)_buffer, CURL_MAX_BUFFERED),
paused(false),
tag(nullptr) {}
@@ -184,7 +125,7 @@ struct CurlInputStream {
bool Check(Error &error);
bool IsEOF() const {
- return easy == nullptr && buffers.empty();
+ return easy == nullptr && buffer.IsEmpty();
}
bool Seek(InputPlugin::offset_type offset, int whence, Error &error);
@@ -193,7 +134,7 @@ struct CurlInputStream {
bool IsAvailable() const {
return postponed_error.IsDefined() || easy == nullptr ||
- !buffers.empty();
+ !buffer.IsEmpty();
}
size_t Read(void *ptr, size_t size, Error &error);
@@ -224,13 +165,14 @@ struct CurlInputStream {
bool FillBuffer(Error &error);
/**
- * Determine the total sizes of all buffers, including
- * portions that have already been consumed.
+ * Returns the number of bytes stored in the buffer.
*
* The caller must lock the mutex.
*/
gcc_pure
- size_t GetTotalBufferSize() const;
+ size_t GetTotalBufferSize() const {
+ return buffer.GetSize();
+ }
void CopyIcyTag();
@@ -699,21 +641,14 @@ input_curl_finish(void)
curl_global_cleanup();
}
-size_t
-CurlInputStream::GetTotalBufferSize() const
-{
- size_t total = 0;
- for (const auto &i : buffers)
- total += i.TotalSize();
-
- return total;
-}
-
CurlInputStream::~CurlInputStream()
{
delete tag;
FreeEasyIndirect();
+
+ buffer.Clear();
+ HugeFree(buffer.Write().data, CURL_MAX_BUFFERED);
}
inline bool
@@ -753,7 +688,7 @@ input_curl_tag(InputStream *is)
inline bool
CurlInputStream::FillBuffer(Error &error)
{
- while (easy != nullptr && buffers.empty())
+ while (easy != nullptr && buffer.IsEmpty())
base.cond.wait(base.mutex);
if (postponed_error.IsDefined()) {
@@ -762,51 +697,44 @@ CurlInputStream::FillBuffer(Error &error)
return false;
}
- return !buffers.empty();
+ return !buffer.IsEmpty();
}
static size_t
-read_from_buffer(IcyMetaDataParser &icy, std::list<CurlInputBuffer> &buffers,
+read_from_buffer(IcyMetaDataParser &icy, CircularBuffer<uint8_t> &buffer,
void *dest0, size_t length)
{
- auto &buffer = buffers.front();
uint8_t *dest = (uint8_t *)dest0;
size_t nbytes = 0;
- if (length > buffer.Available())
- length = buffer.Available();
-
while (true) {
- size_t chunk;
+ auto r = buffer.Read();
+ if (r.IsEmpty())
+ break;
+
+ if (r.size > length)
+ r.size = length;
- chunk = icy.Data(length);
+ size_t chunk = icy.Data(r.size);
if (chunk > 0) {
- const bool empty = !buffer.Read(dest, chunk);
+ memcpy(dest, r.data, chunk);
+ buffer.Consume(chunk);
nbytes += chunk;
dest += chunk;
length -= chunk;
- if (empty) {
- buffers.pop_front();
- break;
- }
-
if (length == 0)
break;
}
- chunk = icy.Meta(buffer.Begin(), length);
- if (chunk > 0) {
- const bool empty = !buffer.Consume(chunk);
-
- length -= chunk;
-
- if (empty) {
- buffers.pop_front();
- break;
- }
+ r = buffer.Read();
+ if (r.IsEmpty())
+ break;
+ chunk = icy.Meta(r.data, r.size);
+ if (chunk > 0) {
+ buffer.Consume(chunk);
if (length == 0)
break;
}
@@ -843,8 +771,7 @@ input_curl_available(InputStream *is)
inline size_t
CurlInputStream::Read(void *ptr, size_t size, Error &error)
{
- size_t nbytes = 0;
- char *dest = (char *)ptr;
+ size_t nbytes;
do {
/* fill the buffer */
@@ -852,15 +779,7 @@ CurlInputStream::Read(void *ptr, size_t size, Error &error)
if (!FillBuffer(error))
return 0;
- /* send buffer contents */
-
- while (size > 0 && !buffers.empty()) {
- size_t copy = read_from_buffer(icy, buffers,
- dest + nbytes, size);
-
- nbytes += copy;
- size -= copy;
- }
+ nbytes = read_from_buffer(icy, buffer, ptr, size);
} while (nbytes == 0);
if (icy.IsDefined())
@@ -986,12 +905,28 @@ CurlInputStream::DataReceived(const void *ptr, size_t size)
const ScopeLock protect(base.mutex);
- if (GetTotalBufferSize() + size >= CURL_MAX_BUFFERED) {
+ if (size > buffer.GetSpace()) {
paused = true;
return CURL_WRITEFUNC_PAUSE;
}
- buffers.emplace_back(ptr, size);
+ auto w = buffer.Write();
+ assert(!w.IsEmpty());
+
+ size_t nbytes = std::min(w.size, size);
+ memcpy(w.data, ptr, nbytes);
+ buffer.Append(nbytes);
+
+ const size_t remaining = size - nbytes;
+ if (remaining > 0) {
+ w = buffer.Write();
+ assert(!w.IsEmpty());
+ assert(w.size >= remaining);
+
+ memcpy(w.data, (const uint8_t *)ptr + nbytes, remaining);
+ buffer.Append(size);
+ }
+
base.ready = true;
base.cond.broadcast();
return size;
@@ -1108,17 +1043,18 @@ CurlInputStream::Seek(InputPlugin::offset_type offset, int whence,
/* check if we can fast-forward the buffer */
- while (offset > base.offset && !buffers.empty()) {
- auto &buffer = buffers.front();
- size_t length = buffer.Available();
- if (offset - base.offset < (InputPlugin::offset_type)length)
- length = offset - base.offset;
+ while (offset > base.offset) {
+ auto r = buffer.Read();
+ if (r.IsEmpty())
+ break;
- const bool empty = !buffer.Consume(length);
- if (empty)
- buffers.pop_front();
+ const size_t nbytes =
+ offset - base.offset < (InputPlugin::offset_type)r.size
+ ? offset - base.offset
+ : r.size;
- base.offset += length;
+ buffer.Consume(nbytes);
+ base.offset += nbytes;
}
if (offset == base.offset)
@@ -1129,7 +1065,7 @@ CurlInputStream::Seek(InputPlugin::offset_type offset, int whence,
base.mutex.unlock();
FreeEasyIndirect();
- buffers.clear();
+ buffer.Clear();
base.offset = offset;
if (base.offset == base.size) {
@@ -1181,7 +1117,13 @@ inline InputStream *
CurlInputStream::Open(const char *url, Mutex &mutex, Cond &cond,
Error &error)
{
- CurlInputStream *c = new CurlInputStream(url, mutex, cond);
+ void *buffer = HugeAllocate(CURL_MAX_BUFFERED);
+ if (buffer == nullptr) {
+ error.Set(curl_domain, "Out of memory");
+ return nullptr;
+ }
+
+ CurlInputStream *c = new CurlInputStream(url, mutex, cond, buffer);
if (!c->InitEasy(error) || !input_curl_easy_add_indirect(c, error)) {
delete c;