From c99559dbe937eba73376137ceb5551f1c55764d5 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Sat, 15 Mar 2014 15:29:10 +0100 Subject: input/nfs: use the asynchronous libnfs API More robust and cancellable. --- src/input/plugins/NfsInputPlugin.cxx | 222 ++++++++++++++++++++++------------- 1 file changed, 138 insertions(+), 84 deletions(-) (limited to 'src/input') diff --git a/src/input/plugins/NfsInputPlugin.cxx b/src/input/plugins/NfsInputPlugin.cxx index c90f00257..8f63d80a1 100644 --- a/src/input/plugins/NfsInputPlugin.cxx +++ b/src/input/plugins/NfsInputPlugin.cxx @@ -19,9 +19,12 @@ #include "config.h" #include "NfsInputPlugin.hxx" -#include "../InputStream.hxx" +#include "../AsyncInputStream.hxx" #include "../InputPlugin.hxx" #include "lib/nfs/Domain.hxx" +#include "lib/nfs/Glue.hxx" +#include "lib/nfs/FileReader.hxx" +#include "util/HugeAllocator.hxx" #include "util/StringUtil.hxx" #include "util/Error.hxx" @@ -33,69 +36,158 @@ extern "C" { #include #include -class NfsInputStream final : public InputStream { - nfs_context *ctx; - nfsfh *fh; +/** + * Do not buffer more than this number of bytes. It should be a + * reasonable limit that doesn't make low-end machines suffer too + * much, but doesn't cause stuttering on high-latency lines. + */ +static const size_t NFS_MAX_BUFFERED = 512 * 1024; + +/** + * Resume the stream at this number of bytes after it has been paused. + */ +static const size_t NFS_RESUME_AT = 384 * 1024; + +class NfsInputStream final : public AsyncInputStream, NfsFileReader { + uint64_t next_offset; public: NfsInputStream(const char *_uri, Mutex &_mutex, Cond &_cond, - nfs_context *_ctx, nfsfh *_fh, - InputStream::offset_type _size) - :InputStream(_uri, _mutex, _cond), - ctx(_ctx), fh(_fh) { - seekable = true; - size = _size; - SetReady(); - } + void *_buffer) + :AsyncInputStream(_uri, _mutex, _cond, + _buffer, NFS_MAX_BUFFERED, + NFS_RESUME_AT) {} - ~NfsInputStream() { - nfs_close(ctx, fh); - nfs_destroy_context(ctx); + virtual ~NfsInputStream() { + DeferClose(); } - /* virtual methods from InputStream */ + bool Open(Error &error) { + assert(!IsReady()); - bool IsEOF() override { - return offset >= size; + return NfsFileReader::Open(GetURI(), error); } - size_t Read(void *ptr, size_t size, Error &error) override; - bool Seek(offset_type offset, Error &error) override; +private: + bool DoRead(); + +protected: + /* virtual methods from AsyncInputStream */ + virtual void DoResume() override; + virtual void DoSeek(offset_type new_offset) override; + +private: + /* virtual methods from NfsFileReader */ + void OnNfsFileOpen(uint64_t size) override; + void OnNfsFileRead(const void *data, size_t size) override; + void OnNfsFileError(Error &&error) override; }; -size_t -NfsInputStream::Read(void *ptr, size_t read_size, Error &error) +bool +NfsInputStream::DoRead() { - int nbytes = nfs_read(ctx, fh, read_size, (char *)ptr); - if (nbytes < 0) { - error.SetErrno(-nbytes, "nfs_read() failed"); - nbytes = 0; + assert(NfsFileReader::IsIdle()); + + int64_t remaining = size - next_offset; + if (remaining <= 0) + return true; + + if (IsBufferFull()) { + Pause(); + return true; } - return nbytes; -} + size_t nbytes = std::min(remaining, 32768); -bool -NfsInputStream::Seek(offset_type new_offset, Error &error) -{ - uint64_t current_offset; - int result = nfs_lseek(ctx, fh, new_offset, SEEK_SET, - ¤t_offset); - if (result < 0) { - error.SetErrno(-result, "smbc_lseek() failed"); + mutex.unlock(); + Error error; + bool success = NfsFileReader::Read(next_offset, nbytes, error); + mutex.lock(); + + if (!success) { + PostponeError(std::move(error)); return false; } - offset = current_offset; return true; } +void +NfsInputStream::DoResume() +{ + assert(NfsFileReader::IsIdle()); + + DoRead(); +} + +void +NfsInputStream::DoSeek(offset_type new_offset) +{ + mutex.unlock(); + NfsFileReader::CancelRead(); + mutex.lock(); + + next_offset = offset = new_offset; + SeekDone(); + DoRead(); +} + +void +NfsInputStream::OnNfsFileOpen(uint64_t _size) +{ + const ScopeLock protect(mutex); + + size = _size; + seekable = true; + next_offset = 0; + SetReady(); + DoRead(); +} + +void +NfsInputStream::OnNfsFileRead(const void *data, size_t data_size) +{ + const ScopeLock protect(mutex); + assert(!IsBufferFull()); + assert(IsBufferFull() == (GetBufferSpace() == 0)); + AppendToBuffer(data, data_size); + + next_offset += data_size; + + DoRead(); +} + +void +NfsInputStream::OnNfsFileError(Error &&error) +{ + const ScopeLock protect(mutex); + postponed_error = std::move(error); + + if (IsSeekPending()) + SeekDone(); + else if (!IsReady()) + SetReady(); +} + /* * InputPlugin methods * */ +static InputPlugin::InitResult +input_nfs_init(const config_param &, Error &) +{ + nfs_init(); + return InputPlugin::InitResult::SUCCESS; +} + +static void +input_nfs_finish() +{ + nfs_finish(); +} + static InputStream * input_nfs_open(const char *uri, Mutex &mutex, Cond &cond, @@ -104,62 +196,24 @@ input_nfs_open(const char *uri, if (!StringStartsWith(uri, "nfs://")) return nullptr; - uri += 6; - - const char *slash = strchr(uri, '/'); - if (slash == nullptr) { - error.Set(nfs_domain, "Malformed nfs:// URI"); - return nullptr; - } - - const std::string server(uri, slash); - - uri = slash; - slash = strrchr(uri + 1, '/'); - if (slash == nullptr || slash[1] == 0) { - error.Set(nfs_domain, "Malformed nfs:// URI"); - return nullptr; - } - - const std::string mount(uri, slash); - uri = slash; - - nfs_context *ctx = nfs_init_context(); - if (ctx == nullptr) { - error.Set(nfs_domain, "nfs_init_context() failed"); - return nullptr; - } - - int result = nfs_mount(ctx, server.c_str(), mount.c_str()); - if (result < 0) { - nfs_destroy_context(ctx); - error.SetErrno(-result, "nfs_mount() failed"); - return nullptr; - } - - nfsfh *fh; - result = nfs_open(ctx, uri, O_RDONLY, &fh); - if (result < 0) { - nfs_destroy_context(ctx); - error.SetErrno(-result, "nfs_open() failed"); + void *buffer = HugeAllocate(NFS_MAX_BUFFERED); + if (buffer == nullptr) { + error.Set(nfs_domain, "Out of memory"); return nullptr; } - struct stat st; - result = nfs_fstat(ctx, fh, &st); - if (result < 0) { - nfs_close(ctx, fh); - nfs_destroy_context(ctx); - error.SetErrno(-result, "nfs_fstat() failed"); + NfsInputStream *is = new NfsInputStream(uri, mutex, cond, buffer); + if (!is->Open(error)) { + delete is; return nullptr; } - return new NfsInputStream(uri, mutex, cond, ctx, fh, st.st_size); + return is; } const InputPlugin input_plugin_nfs = { "nfs", - nullptr, - nullptr, + input_nfs_init, + input_nfs_finish, input_nfs_open, }; -- cgit v1.2.3