aboutsummaryrefslogtreecommitdiffstats
path: root/src/input/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'src/input/plugins')
-rw-r--r--src/input/plugins/CurlInputPlugin.cxx259
1 files changed, 48 insertions, 211 deletions
diff --git a/src/input/plugins/CurlInputPlugin.cxx b/src/input/plugins/CurlInputPlugin.cxx
index b48512ccb..2450c8754 100644
--- a/src/input/plugins/CurlInputPlugin.cxx
+++ b/src/input/plugins/CurlInputPlugin.cxx
@@ -19,8 +19,8 @@
#include "config.h"
#include "CurlInputPlugin.hxx"
+#include "../AsyncInputStream.hxx"
#include "../IcyInputStream.hxx"
-#include "../InputStream.hxx"
#include "../InputPlugin.hxx"
#include "config/ConfigGlobal.hxx"
#include "config/ConfigData.hxx"
@@ -60,7 +60,7 @@ static const size_t CURL_MAX_BUFFERED = 512 * 1024;
*/
static const size_t CURL_RESUME_AT = 384 * 1024;
-struct CurlInputStream final : public InputStream {
+struct CurlInputStream final : public AsyncInputStream {
/* some buffers which were passed to libcurl, which we have
too free */
char range[32];
@@ -69,39 +69,19 @@ struct CurlInputStream final : public InputStream {
/** the curl handles */
CURL *easy;
- /**
- * A buffer where input_curl_writefunction() appends
- * to, and input_curl_read() reads from.
- */
- CircularBuffer<uint8_t> buffer;
-
- /**
- * Is the connection currently paused? That happens when the
- * buffer was getting too large. It will be unpaused when the
- * buffer is below the threshold again.
- */
- bool paused;
-
/** error message provided by libcurl */
char error_buffer[CURL_ERROR_SIZE];
/** parser for icy-metadata */
IcyInputStream *icy;
- /** the tag object ready to be requested via
- InputStream::ReadTag() */
- Tag *tag;
-
- Error postponed_error;
-
CurlInputStream(const char *_url, Mutex &_mutex, Cond &_cond,
void *_buffer)
- :InputStream(_url, _mutex, _cond),
+ :AsyncInputStream(_url, _mutex, _cond,
+ _buffer, CURL_MAX_BUFFERED,
+ CURL_RESUME_AT),
request_headers(nullptr),
- buffer((uint8_t *)_buffer, CURL_MAX_BUFFERED),
- paused(false),
- icy(new IcyInputStream(this)),
- tag(nullptr) {}
+ icy(new IcyInputStream(this)) {}
~CurlInputStream();
@@ -133,19 +113,6 @@ struct CurlInputStream final : public InputStream {
size_t DataReceived(const void *ptr, size_t size);
- void Resume();
- bool FillBuffer(Error &error);
-
- /**
- * Returns the number of bytes stored in the buffer.
- *
- * The caller must lock the mutex.
- */
- gcc_pure
- size_t GetTotalBufferSize() const {
- return buffer.GetSize();
- }
-
/**
* A HTTP request is finished.
*
@@ -153,22 +120,9 @@ struct CurlInputStream final : public InputStream {
*/
void RequestDone(CURLcode result, long status);
- /* virtual methods from InputStream */
- bool Check(Error &error) override;
-
- bool IsEOF() override {
- return easy == nullptr && buffer.IsEmpty();
- }
-
- Tag *ReadTag() override;
-
- bool IsAvailable() override {
- return postponed_error.IsDefined() || easy == nullptr ||
- !buffer.IsEmpty();
- }
-
- size_t Read(void *ptr, size_t size, Error &error) override;
- bool Seek(offset_type offset, Error &error) override;
+ /* virtual methods from AsyncInputStream */
+ virtual void DoResume() override;
+ virtual void DoSeek(offset_type new_offset) override;
};
class CurlMulti;
@@ -327,23 +281,24 @@ input_curl_find_request(CURL *easy)
return (CurlInputStream *)p;
}
-inline void
-CurlInputStream::Resume()
+void
+CurlInputStream::DoResume()
{
assert(io_thread_inside());
- if (paused) {
- paused = false;
- curl_easy_pause(easy, CURLPAUSE_CONT);
+ mutex.unlock();
- if (curl_version_num < 0x072000)
- /* libcurl older than 7.32.0 does not update
- its sockets after curl_easy_pause(); force
- libcurl to do it now */
- curl_multi->ResumeSockets();
+ curl_easy_pause(easy, CURLPAUSE_CONT);
- curl_multi->InvalidateSockets();
- }
+ if (curl_version_num < 0x072000)
+ /* libcurl older than 7.32.0 does not update
+ its sockets after curl_easy_pause(); force
+ libcurl to do it now */
+ curl_multi->ResumeSockets();
+
+ curl_multi->InvalidateSockets();
+
+ mutex.lock();
}
int
@@ -472,6 +427,7 @@ CurlInputStream::RequestDone(CURLcode result, long status)
assert(!postponed_error.IsDefined());
FreeEasy();
+ AsyncInputStream::SetClosed();
const ScopeLock protect(mutex);
@@ -484,7 +440,9 @@ CurlInputStream::RequestDone(CURLcode result, long status)
status);
}
- if (!IsReady())
+ if (IsSeekPending())
+ SeekDone();
+ else if (!IsReady())
SetReady();
}
@@ -630,81 +588,16 @@ input_curl_finish(void)
CurlInputStream::~CurlInputStream()
{
- delete tag;
-
FreeEasyIndirect();
-
- buffer.Clear();
- HugeFree(buffer.Write().data, CURL_MAX_BUFFERED);
-}
-
-inline bool
-CurlInputStream::Check(Error &error)
-{
- bool success = !postponed_error.IsDefined();
- if (!success) {
- error = std::move(postponed_error);
- postponed_error.Clear();
- }
-
- return success;
-}
-
-Tag *
-CurlInputStream::ReadTag()
-{
- Tag *result = tag;
- tag = nullptr;
- return result;
-}
-
-inline bool
-CurlInputStream::FillBuffer(Error &error)
-{
- while (easy != nullptr && buffer.IsEmpty())
- cond.wait(mutex);
-
- if (postponed_error.IsDefined()) {
- error = std::move(postponed_error);
- postponed_error.Clear();
- return false;
- }
-
- return !buffer.IsEmpty();
-}
-
-size_t
-CurlInputStream::Read(void *ptr, size_t read_size, Error &error)
-{
- if (!FillBuffer(error))
- return 0;
-
- auto r = buffer.Read();
- if (r.IsEmpty())
- return 0;
-
- const size_t nbytes = std::min(read_size, r.size);
- memcpy(ptr, r.data, nbytes);
- buffer.Consume(nbytes);
-
- offset += (InputPlugin::offset_type)nbytes;
-
- if (paused && GetTotalBufferSize() < CURL_RESUME_AT) {
- mutex.unlock();
-
- BlockingCall(io_thread_get(), [this](){
- Resume();
- });
-
- mutex.lock();
- }
-
- return nbytes;
}
inline void
CurlInputStream::HeaderReceived(const char *name, std::string &&value)
{
+ if (IsSeekPending())
+ /* don't update metadata while seeking */
+ return;
+
if (StringEqualsCaseASCII(name, "accept-ranges")) {
/* a stream with icy-metadata is not seekable */
if (!icy->IsEnabled())
@@ -716,12 +609,10 @@ CurlInputStream::HeaderReceived(const char *name, std::string &&value)
} else if (StringEqualsCaseASCII(name, "icy-name") ||
StringEqualsCaseASCII(name, "ice-name") ||
StringEqualsCaseASCII(name, "x-audiocast-name")) {
- delete tag;
-
TagBuilder tag_builder;
tag_builder.AddItem(TAG_NAME, value.c_str());
- tag = tag_builder.CommitNew();
+ SetTag(tag_builder.CommitNew());
} else if (StringEqualsCaseASCII(name, "icy-metaint")) {
if (icy->IsEnabled())
return;
@@ -782,30 +673,15 @@ CurlInputStream::DataReceived(const void *ptr, size_t received_size)
const ScopeLock protect(mutex);
- if (received_size > buffer.GetSpace()) {
- paused = true;
- return CURL_WRITEFUNC_PAUSE;
- }
-
- auto w = buffer.Write();
- assert(!w.IsEmpty());
-
- size_t nbytes = std::min(w.size, received_size);
- memcpy(w.data, ptr, nbytes);
- buffer.Append(nbytes);
-
- const size_t remaining = received_size - nbytes;
- if (remaining > 0) {
- w = buffer.Write();
- assert(!w.IsEmpty());
- assert(w.size >= remaining);
+ if (IsSeekPending())
+ SeekDone();
- memcpy(w.data, (const uint8_t *)ptr + nbytes, remaining);
- buffer.Append(remaining);
+ if (received_size > GetBufferSpace()) {
+ AsyncInputStream::Pause();
+ return CURL_WRITEFUNC_PAUSE;
}
- ready = true;
- cond.broadcast();
+ AppendToBuffer(ptr, received_size);
return received_size;
}
@@ -880,48 +756,16 @@ CurlInputStream::InitEasy(Error &error)
return true;
}
-inline bool
-CurlInputStream::Seek(offset_type new_offset, Error &error)
+void
+CurlInputStream::DoSeek(offset_type new_offset)
{
assert(IsReady());
- if (new_offset == offset)
- /* no-op */
- return true;
-
- if (!IsSeekable())
- return false;
-
- /* calculate the absolute offset */
-
- if (new_offset < 0)
- return false;
-
- /* check if we can fast-forward the buffer */
-
- while (new_offset > offset) {
- auto r = buffer.Read();
- if (r.IsEmpty())
- break;
-
- const size_t nbytes =
- new_offset - offset < (InputPlugin::offset_type)r.size
- ? new_offset - offset
- : r.size;
-
- buffer.Consume(nbytes);
- offset += nbytes;
- }
-
- if (new_offset == offset)
- return true;
-
/* close the old connection and open a new one */
mutex.unlock();
FreeEasyIndirect();
- buffer.Clear();
offset = new_offset;
if (offset == size) {
@@ -929,12 +773,14 @@ CurlInputStream::Seek(offset_type new_offset, Error &error)
triggering a "416 Requested Range Not Satisfiable"
response */
mutex.lock();
- return true;
+ SeekDone();
+ return;
}
- if (!InitEasy(error)) {
+ if (!InitEasy(postponed_error)) {
mutex.lock();
- return false;
+ SeekDone();
+ return;
}
/* send the "Range" header */
@@ -944,23 +790,14 @@ CurlInputStream::Seek(offset_type new_offset, Error &error)
curl_easy_setopt(easy, CURLOPT_RANGE, range);
}
- ready = false;
-
- if (!input_curl_easy_add_indirect(this, error)) {
+ if (!input_curl_easy_add_indirect(this, postponed_error)) {
mutex.lock();
- return false;
+ SeekDone();
+ return;
}
mutex.lock();
- WaitReady();
-
- if (postponed_error.IsDefined()) {
- error = std::move(postponed_error);
- postponed_error.Clear();
- return false;
- }
-
- return true;
+ offset = new_offset;
}
inline InputStream *