aboutsummaryrefslogtreecommitdiffstats
path: root/src/input/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'src/input/plugins')
-rw-r--r--src/input/plugins/NfsInputPlugin.cxx222
1 files changed, 138 insertions, 84 deletions
diff --git a/src/input/plugins/NfsInputPlugin.cxx b/src/input/plugins/NfsInputPlugin.cxx
index c90f00257..8f63d80a1 100644
--- a/src/input/plugins/NfsInputPlugin.cxx
+++ b/src/input/plugins/NfsInputPlugin.cxx
@@ -19,9 +19,12 @@
#include "config.h"
#include "NfsInputPlugin.hxx"
-#include "../InputStream.hxx"
+#include "../AsyncInputStream.hxx"
#include "../InputPlugin.hxx"
#include "lib/nfs/Domain.hxx"
+#include "lib/nfs/Glue.hxx"
+#include "lib/nfs/FileReader.hxx"
+#include "util/HugeAllocator.hxx"
#include "util/StringUtil.hxx"
#include "util/Error.hxx"
@@ -33,69 +36,158 @@ extern "C" {
#include <sys/stat.h>
#include <fcntl.h>
-class NfsInputStream final : public InputStream {
- nfs_context *ctx;
- nfsfh *fh;
+/**
+ * Do not buffer more than this number of bytes. It should be a
+ * reasonable limit that doesn't make low-end machines suffer too
+ * much, but doesn't cause stuttering on high-latency lines.
+ */
+static const size_t NFS_MAX_BUFFERED = 512 * 1024;
+
+/**
+ * Resume the stream at this number of bytes after it has been paused.
+ */
+static const size_t NFS_RESUME_AT = 384 * 1024;
+
+class NfsInputStream final : public AsyncInputStream, NfsFileReader {
+ uint64_t next_offset;
public:
NfsInputStream(const char *_uri,
Mutex &_mutex, Cond &_cond,
- nfs_context *_ctx, nfsfh *_fh,
- InputStream::offset_type _size)
- :InputStream(_uri, _mutex, _cond),
- ctx(_ctx), fh(_fh) {
- seekable = true;
- size = _size;
- SetReady();
- }
+ void *_buffer)
+ :AsyncInputStream(_uri, _mutex, _cond,
+ _buffer, NFS_MAX_BUFFERED,
+ NFS_RESUME_AT) {}
- ~NfsInputStream() {
- nfs_close(ctx, fh);
- nfs_destroy_context(ctx);
+ virtual ~NfsInputStream() {
+ DeferClose();
}
- /* virtual methods from InputStream */
+ bool Open(Error &error) {
+ assert(!IsReady());
- bool IsEOF() override {
- return offset >= size;
+ return NfsFileReader::Open(GetURI(), error);
}
- size_t Read(void *ptr, size_t size, Error &error) override;
- bool Seek(offset_type offset, Error &error) override;
+private:
+ bool DoRead();
+
+protected:
+ /* virtual methods from AsyncInputStream */
+ virtual void DoResume() override;
+ virtual void DoSeek(offset_type new_offset) override;
+
+private:
+ /* virtual methods from NfsFileReader */
+ void OnNfsFileOpen(uint64_t size) override;
+ void OnNfsFileRead(const void *data, size_t size) override;
+ void OnNfsFileError(Error &&error) override;
};
-size_t
-NfsInputStream::Read(void *ptr, size_t read_size, Error &error)
+bool
+NfsInputStream::DoRead()
{
- int nbytes = nfs_read(ctx, fh, read_size, (char *)ptr);
- if (nbytes < 0) {
- error.SetErrno(-nbytes, "nfs_read() failed");
- nbytes = 0;
+ assert(NfsFileReader::IsIdle());
+
+ int64_t remaining = size - next_offset;
+ if (remaining <= 0)
+ return true;
+
+ if (IsBufferFull()) {
+ Pause();
+ return true;
}
- return nbytes;
-}
+ size_t nbytes = std::min<uint64_t>(remaining, 32768);
-bool
-NfsInputStream::Seek(offset_type new_offset, Error &error)
-{
- uint64_t current_offset;
- int result = nfs_lseek(ctx, fh, new_offset, SEEK_SET,
- &current_offset);
- if (result < 0) {
- error.SetErrno(-result, "smbc_lseek() failed");
+ mutex.unlock();
+ Error error;
+ bool success = NfsFileReader::Read(next_offset, nbytes, error);
+ mutex.lock();
+
+ if (!success) {
+ PostponeError(std::move(error));
return false;
}
- offset = current_offset;
return true;
}
+void
+NfsInputStream::DoResume()
+{
+ assert(NfsFileReader::IsIdle());
+
+ DoRead();
+}
+
+void
+NfsInputStream::DoSeek(offset_type new_offset)
+{
+ mutex.unlock();
+ NfsFileReader::CancelRead();
+ mutex.lock();
+
+ next_offset = offset = new_offset;
+ SeekDone();
+ DoRead();
+}
+
+void
+NfsInputStream::OnNfsFileOpen(uint64_t _size)
+{
+ const ScopeLock protect(mutex);
+
+ size = _size;
+ seekable = true;
+ next_offset = 0;
+ SetReady();
+ DoRead();
+}
+
+void
+NfsInputStream::OnNfsFileRead(const void *data, size_t data_size)
+{
+ const ScopeLock protect(mutex);
+ assert(!IsBufferFull());
+ assert(IsBufferFull() == (GetBufferSpace() == 0));
+ AppendToBuffer(data, data_size);
+
+ next_offset += data_size;
+
+ DoRead();
+}
+
+void
+NfsInputStream::OnNfsFileError(Error &&error)
+{
+ const ScopeLock protect(mutex);
+ postponed_error = std::move(error);
+
+ if (IsSeekPending())
+ SeekDone();
+ else if (!IsReady())
+ SetReady();
+}
+
/*
* InputPlugin methods
*
*/
+static InputPlugin::InitResult
+input_nfs_init(const config_param &, Error &)
+{
+ nfs_init();
+ return InputPlugin::InitResult::SUCCESS;
+}
+
+static void
+input_nfs_finish()
+{
+ nfs_finish();
+}
+
static InputStream *
input_nfs_open(const char *uri,
Mutex &mutex, Cond &cond,
@@ -104,62 +196,24 @@ input_nfs_open(const char *uri,
if (!StringStartsWith(uri, "nfs://"))
return nullptr;
- uri += 6;
-
- const char *slash = strchr(uri, '/');
- if (slash == nullptr) {
- error.Set(nfs_domain, "Malformed nfs:// URI");
- return nullptr;
- }
-
- const std::string server(uri, slash);
-
- uri = slash;
- slash = strrchr(uri + 1, '/');
- if (slash == nullptr || slash[1] == 0) {
- error.Set(nfs_domain, "Malformed nfs:// URI");
- return nullptr;
- }
-
- const std::string mount(uri, slash);
- uri = slash;
-
- nfs_context *ctx = nfs_init_context();
- if (ctx == nullptr) {
- error.Set(nfs_domain, "nfs_init_context() failed");
- return nullptr;
- }
-
- int result = nfs_mount(ctx, server.c_str(), mount.c_str());
- if (result < 0) {
- nfs_destroy_context(ctx);
- error.SetErrno(-result, "nfs_mount() failed");
- return nullptr;
- }
-
- nfsfh *fh;
- result = nfs_open(ctx, uri, O_RDONLY, &fh);
- if (result < 0) {
- nfs_destroy_context(ctx);
- error.SetErrno(-result, "nfs_open() failed");
+ void *buffer = HugeAllocate(NFS_MAX_BUFFERED);
+ if (buffer == nullptr) {
+ error.Set(nfs_domain, "Out of memory");
return nullptr;
}
- struct stat st;
- result = nfs_fstat(ctx, fh, &st);
- if (result < 0) {
- nfs_close(ctx, fh);
- nfs_destroy_context(ctx);
- error.SetErrno(-result, "nfs_fstat() failed");
+ NfsInputStream *is = new NfsInputStream(uri, mutex, cond, buffer);
+ if (!is->Open(error)) {
+ delete is;
return nullptr;
}
- return new NfsInputStream(uri, mutex, cond, ctx, fh, st.st_size);
+ return is;
}
const InputPlugin input_plugin_nfs = {
"nfs",
- nullptr,
- nullptr,
+ input_nfs_init,
+ input_nfs_finish,
input_nfs_open,
};