aboutsummaryrefslogtreecommitdiffstats
path: root/src/input/AsyncInputStream.cxx
diff options
context:
space:
mode:
authorMax Kellermann <max@duempel.org>2014-05-02 22:31:02 +0200
committerMax Kellermann <max@duempel.org>2014-05-24 14:36:28 +0200
commitfbafb19657f3337a4d0d5e4783cd91ad0c2963aa (patch)
tree5f61ccd2f3646f282e468ce2f8d1873463913802 /src/input/AsyncInputStream.cxx
parent6c4438d8a9ff12db9bb6323e128a69de6cd4e2b7 (diff)
downloadmpd-fbafb19657f3337a4d0d5e4783cd91ad0c2963aa.tar.gz
mpd-fbafb19657f3337a4d0d5e4783cd91ad0c2963aa.tar.xz
mpd-fbafb19657f3337a4d0d5e4783cd91ad0c2963aa.zip
input/curl: move code to AsyncInputStream
New base class for other InputStream implementations that run in the I/O thread.
Diffstat (limited to 'src/input/AsyncInputStream.cxx')
-rw-r--r--src/input/AsyncInputStream.cxx239
1 files changed, 239 insertions, 0 deletions
diff --git a/src/input/AsyncInputStream.cxx b/src/input/AsyncInputStream.cxx
new file mode 100644
index 000000000..3f2d3b90f
--- /dev/null
+++ b/src/input/AsyncInputStream.cxx
@@ -0,0 +1,239 @@
+/*
+ * Copyright (C) 2003-2014 The Music Player Daemon Project
+ * http://www.musicpd.org
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include "config.h"
+#include "AsyncInputStream.hxx"
+#include "tag/Tag.hxx"
+#include "event/Call.hxx"
+#include "thread/Cond.hxx"
+#include "IOThread.hxx"
+#include "util/HugeAllocator.hxx"
+
+#include <assert.h>
+#include <string.h>
+
+AsyncInputStream::AsyncInputStream(const char *_url,
+ Mutex &_mutex, Cond &_cond,
+ void *_buffer, size_t _buffer_size,
+ size_t _resume_at)
+ :InputStream(_url, _mutex, _cond), DeferredMonitor(io_thread_get()),
+ buffer((uint8_t *)_buffer, _buffer_size),
+ resume_at(_resume_at),
+ open(true),
+ paused(false),
+ seek_state(SeekState::NONE),
+ tag(nullptr) {}
+
+AsyncInputStream::~AsyncInputStream()
+{
+ delete tag;
+
+ buffer.Clear();
+ HugeFree(buffer.Write().data, buffer.GetCapacity());
+}
+
+void
+AsyncInputStream::SetTag(Tag *_tag)
+{
+ delete tag;
+ tag = _tag;
+}
+
+void
+AsyncInputStream::Pause()
+{
+ assert(io_thread_inside());
+
+ paused = true;
+}
+
+inline void
+AsyncInputStream::Resume()
+{
+ assert(io_thread_inside());
+
+ if (paused) {
+ paused = false;
+ DoResume();
+ }
+}
+
+bool
+AsyncInputStream::Check(Error &error)
+{
+ bool success = !postponed_error.IsDefined();
+ if (!success) {
+ error = std::move(postponed_error);
+ postponed_error.Clear();
+ }
+
+ return success;
+}
+
+bool
+AsyncInputStream::IsEOF()
+{
+ return !open && buffer.IsEmpty();
+}
+
+bool
+AsyncInputStream::Seek(offset_type new_offset, Error &error)
+{
+ assert(IsReady());
+ assert(seek_state == SeekState::NONE);
+
+ if (new_offset == offset)
+ /* no-op */
+ return true;
+
+ if (!IsSeekable())
+ return false;
+
+ if (new_offset < 0)
+ return false;
+
+ /* check if we can fast-forward the buffer */
+
+ while (new_offset > offset) {
+ auto r = buffer.Read();
+ if (r.IsEmpty())
+ break;
+
+ const size_t nbytes =
+ new_offset - offset < (offset_type)r.size
+ ? new_offset - offset
+ : r.size;
+
+ buffer.Consume(nbytes);
+ offset += nbytes;
+ }
+
+ if (new_offset == offset)
+ return true;
+
+ /* no: ask the implementation to seek */
+
+ seek_offset = new_offset;
+ seek_state = SeekState::SCHEDULED;
+
+ DeferredMonitor::Schedule();
+
+ while (seek_state != SeekState::NONE)
+ cond.wait(mutex);
+
+ if (!Check(error))
+ return false;
+
+ return true;
+}
+
+void
+AsyncInputStream::SeekDone()
+{
+ assert(io_thread_inside());
+ assert(IsSeekPending());
+
+ seek_state = SeekState::NONE;
+ cond.broadcast();
+}
+
+Tag *
+AsyncInputStream::ReadTag()
+{
+ Tag *result = tag;
+ tag = nullptr;
+ return result;
+}
+
+bool
+AsyncInputStream::IsAvailable()
+{
+ return postponed_error.IsDefined() || !open ||
+ !buffer.IsEmpty();
+}
+
+size_t
+AsyncInputStream::Read(void *ptr, size_t read_size, Error &error)
+{
+ assert(!io_thread_inside());
+
+ /* wait for data */
+ CircularBuffer<uint8_t>::Range r;
+ while (true) {
+ if (!Check(error))
+ return 0;
+
+ r = buffer.Read();
+ if (!r.IsEmpty() || !open)
+ break;
+
+ cond.wait(mutex);
+ }
+
+ const size_t nbytes = std::min(read_size, r.size);
+ memcpy(ptr, r.data, nbytes);
+ buffer.Consume(nbytes);
+
+ offset += (offset_type)nbytes;
+
+ if (paused && buffer.GetSize() < resume_at)
+ DeferredMonitor::Schedule();
+
+ return nbytes;
+}
+
+void
+AsyncInputStream::AppendToBuffer(const void *data, size_t append_size)
+{
+ auto w = buffer.Write();
+ assert(!w.IsEmpty());
+
+ size_t nbytes = std::min(w.size, append_size);
+ memcpy(w.data, data, nbytes);
+ buffer.Append(nbytes);
+
+ const size_t remaining = append_size - nbytes;
+ if (remaining > 0) {
+ w = buffer.Write();
+ assert(!w.IsEmpty());
+ assert(w.size >= remaining);
+
+ memcpy(w.data, (const uint8_t *)data + nbytes, remaining);
+ buffer.Append(remaining);
+ }
+
+ if (!IsReady())
+ SetReady();
+ else
+ cond.broadcast();
+}
+
+void
+AsyncInputStream::RunDeferred()
+{
+ const ScopeLock protect(mutex);
+
+ Resume();
+
+ if (seek_state == SeekState::SCHEDULED) {
+ seek_state = SeekState::PENDING;
+ buffer.Clear();
+ DoSeek(seek_offset);
+ }
+}