aboutsummaryrefslogtreecommitdiffstats
path: root/src/input/ThreadInputStream.cxx
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/input/ThreadInputStream.cxx35
1 files changed, 20 insertions, 15 deletions
diff --git a/src/input/ThreadInputStream.cxx b/src/input/ThreadInputStream.cxx
index fde9bd08e..0f503be3f 100644
--- a/src/input/ThreadInputStream.cxx
+++ b/src/input/ThreadInputStream.cxx
@@ -52,30 +52,30 @@ ThreadInputStream::Start(Error &error)
if (!thread.Start(ThreadFunc, this, error))
return nullptr;
- return &base;
+ return this;
}
inline void
ThreadInputStream::ThreadFunc()
{
- FormatThreadName("input:%s", base.GetPlugin().name);
+ FormatThreadName("input:%s", GetPlugin().name);
Lock();
if (!Open(postponed_error)) {
- base.cond.broadcast();
+ cond.broadcast();
Unlock();
return;
}
/* we're ready, tell it to our client */
- base.SetReady();
+ SetReady();
while (!close) {
assert(!postponed_error.IsDefined());
auto w = buffer->Write();
if (w.IsEmpty()) {
- wake_cond.wait(base.mutex);
+ wake_cond.wait(mutex);
} else {
Unlock();
@@ -83,7 +83,7 @@ ThreadInputStream::ThreadFunc()
size_t nbytes = Read(w.data, w.size, error);
Lock();
- base.cond.broadcast();
+ cond.broadcast();
if (nbytes == 0) {
eof = true;
@@ -121,7 +121,8 @@ ThreadInputStream::Check2(Error &error)
bool
ThreadInputStream::Check(InputStream *is, Error &error)
{
- return Cast(is)->Check2(error);
+ ThreadInputStream &tis = *(ThreadInputStream *)is;
+ return tis.Check2(error);
}
inline bool
@@ -133,11 +134,12 @@ ThreadInputStream::Available2()
bool
ThreadInputStream::Available(InputStream *is)
{
- return Cast(is)->Available2();
+ ThreadInputStream &tis = *(ThreadInputStream *)is;
+ return tis.Available2();
}
inline size_t
-ThreadInputStream::Read2(void *ptr, size_t size, Error &error)
+ThreadInputStream::Read2(void *ptr, size_t read_size, Error &error)
{
while (true) {
if (postponed_error.IsDefined()) {
@@ -147,18 +149,18 @@ ThreadInputStream::Read2(void *ptr, size_t size, Error &error)
auto r = buffer->Read();
if (!r.IsEmpty()) {
- size_t nbytes = std::min(size, r.size);
+ size_t nbytes = std::min(read_size, r.size);
memcpy(ptr, r.data, nbytes);
buffer->Consume(nbytes);
wake_cond.broadcast();
- base.offset += nbytes;
+ offset += nbytes;
return nbytes;
}
if (eof)
return 0;
- base.cond.wait(base.mutex);
+ cond.wait(mutex);
}
}
@@ -166,7 +168,8 @@ size_t
ThreadInputStream::Read(InputStream *is, void *ptr, size_t size,
Error &error)
{
- return Cast(is)->Read2(ptr, size, error);
+ ThreadInputStream &tis = *(ThreadInputStream *)is;
+ return tis.Read2(ptr, size, error);
}
inline void
@@ -187,7 +190,8 @@ ThreadInputStream::Close2()
void
ThreadInputStream::Close(InputStream *is)
{
- Cast(is)->Close2();
+ ThreadInputStream &tis = *(ThreadInputStream *)is;
+ tis.Close2();
}
inline bool
@@ -199,5 +203,6 @@ ThreadInputStream::IsEOF2()
bool
ThreadInputStream::IsEOF(InputStream *is)
{
- return Cast(is)->IsEOF2();
+ ThreadInputStream &tis = *(ThreadInputStream *)is;
+ return tis.IsEOF2();
}