diff options
author | Max Kellermann <max@duempel.org> | 2014-03-15 15:29:10 +0100 |
---|---|---|
committer | Max Kellermann <max@duempel.org> | 2014-06-17 03:21:25 +0200 |
commit | c99559dbe937eba73376137ceb5551f1c55764d5 (patch) | |
tree | 0887f02b73b695adc35a30b468b1078d1ac7359f | |
parent | 966c4244cbe0de174df1e72e917078269ec9dbb9 (diff) | |
download | mpd-c99559dbe937eba73376137ceb5551f1c55764d5.tar.gz mpd-c99559dbe937eba73376137ceb5551f1c55764d5.tar.xz mpd-c99559dbe937eba73376137ceb5551f1c55764d5.zip |
input/nfs: use the asynchronous libnfs API
More robust and cancellable.
Diffstat (limited to '')
-rw-r--r-- | Makefile.am | 6 | ||||
-rw-r--r-- | src/input/plugins/NfsInputPlugin.cxx | 222 | ||||
-rw-r--r-- | src/lib/nfs/Callback.hxx | 33 | ||||
-rw-r--r-- | src/lib/nfs/Cancellable.hxx | 177 | ||||
-rw-r--r-- | src/lib/nfs/Connection.cxx | 415 | ||||
-rw-r--r-- | src/lib/nfs/Connection.hxx | 160 | ||||
-rw-r--r-- | src/lib/nfs/FileReader.cxx | 244 | ||||
-rw-r--r-- | src/lib/nfs/FileReader.hxx | 94 | ||||
-rw-r--r-- | src/lib/nfs/Glue.cxx | 69 | ||||
-rw-r--r-- | src/lib/nfs/Glue.hxx | 38 | ||||
-rw-r--r-- | src/lib/nfs/Lease.hxx | 48 | ||||
-rw-r--r-- | src/lib/nfs/Manager.cxx | 48 | ||||
-rw-r--r-- | src/lib/nfs/Manager.hxx | 72 |
13 files changed, 1542 insertions, 84 deletions
diff --git a/Makefile.am b/Makefile.am index 15b392230..fc54f4006 100644 --- a/Makefile.am +++ b/Makefile.am @@ -510,6 +510,12 @@ SMBCLIENT_SOURCES = \ src/lib/smbclient/Init.cxx src/lib/smbclient/Init.hxx NFS_SOURCES = \ + src/lib/nfs/Callback.hxx \ + src/lib/nfs/Cancellable.hxx \ + src/lib/nfs/Connection.cxx src/lib/nfs/Connection.hxx \ + src/lib/nfs/Manager.cxx src/lib/nfs/Manager.hxx \ + src/lib/nfs/Glue.cxx src/lib/nfs/Glue.hxx \ + src/lib/nfs/FileReader.cxx src/lib/nfs/FileReader.hxx \ src/lib/nfs/Domain.cxx src/lib/nfs/Domain.hxx if ENABLE_DATABASE diff --git a/src/input/plugins/NfsInputPlugin.cxx b/src/input/plugins/NfsInputPlugin.cxx index c90f00257..8f63d80a1 100644 --- a/src/input/plugins/NfsInputPlugin.cxx +++ b/src/input/plugins/NfsInputPlugin.cxx @@ -19,9 +19,12 @@ #include "config.h" #include "NfsInputPlugin.hxx" -#include "../InputStream.hxx" +#include "../AsyncInputStream.hxx" #include "../InputPlugin.hxx" #include "lib/nfs/Domain.hxx" +#include "lib/nfs/Glue.hxx" +#include "lib/nfs/FileReader.hxx" +#include "util/HugeAllocator.hxx" #include "util/StringUtil.hxx" #include "util/Error.hxx" @@ -33,69 +36,158 @@ extern "C" { #include <sys/stat.h> #include <fcntl.h> -class NfsInputStream final : public InputStream { - nfs_context *ctx; - nfsfh *fh; +/** + * Do not buffer more than this number of bytes. It should be a + * reasonable limit that doesn't make low-end machines suffer too + * much, but doesn't cause stuttering on high-latency lines. + */ +static const size_t NFS_MAX_BUFFERED = 512 * 1024; + +/** + * Resume the stream at this number of bytes after it has been paused. + */ +static const size_t NFS_RESUME_AT = 384 * 1024; + +class NfsInputStream final : public AsyncInputStream, NfsFileReader { + uint64_t next_offset; public: NfsInputStream(const char *_uri, Mutex &_mutex, Cond &_cond, - nfs_context *_ctx, nfsfh *_fh, - InputStream::offset_type _size) - :InputStream(_uri, _mutex, _cond), - ctx(_ctx), fh(_fh) { - seekable = true; - size = _size; - SetReady(); - } + void *_buffer) + :AsyncInputStream(_uri, _mutex, _cond, + _buffer, NFS_MAX_BUFFERED, + NFS_RESUME_AT) {} - ~NfsInputStream() { - nfs_close(ctx, fh); - nfs_destroy_context(ctx); + virtual ~NfsInputStream() { + DeferClose(); } - /* virtual methods from InputStream */ + bool Open(Error &error) { + assert(!IsReady()); - bool IsEOF() override { - return offset >= size; + return NfsFileReader::Open(GetURI(), error); } - size_t Read(void *ptr, size_t size, Error &error) override; - bool Seek(offset_type offset, Error &error) override; +private: + bool DoRead(); + +protected: + /* virtual methods from AsyncInputStream */ + virtual void DoResume() override; + virtual void DoSeek(offset_type new_offset) override; + +private: + /* virtual methods from NfsFileReader */ + void OnNfsFileOpen(uint64_t size) override; + void OnNfsFileRead(const void *data, size_t size) override; + void OnNfsFileError(Error &&error) override; }; -size_t -NfsInputStream::Read(void *ptr, size_t read_size, Error &error) +bool +NfsInputStream::DoRead() { - int nbytes = nfs_read(ctx, fh, read_size, (char *)ptr); - if (nbytes < 0) { - error.SetErrno(-nbytes, "nfs_read() failed"); - nbytes = 0; + assert(NfsFileReader::IsIdle()); + + int64_t remaining = size - next_offset; + if (remaining <= 0) + return true; + + if (IsBufferFull()) { + Pause(); + return true; } - return nbytes; -} + size_t nbytes = std::min<uint64_t>(remaining, 32768); -bool -NfsInputStream::Seek(offset_type new_offset, Error &error) -{ - uint64_t current_offset; - int result = nfs_lseek(ctx, fh, new_offset, SEEK_SET, - ¤t_offset); - if (result < 0) { - error.SetErrno(-result, "smbc_lseek() failed"); + mutex.unlock(); + Error error; + bool success = NfsFileReader::Read(next_offset, nbytes, error); + mutex.lock(); + + if (!success) { + PostponeError(std::move(error)); return false; } - offset = current_offset; return true; } +void +NfsInputStream::DoResume() +{ + assert(NfsFileReader::IsIdle()); + + DoRead(); +} + +void +NfsInputStream::DoSeek(offset_type new_offset) +{ + mutex.unlock(); + NfsFileReader::CancelRead(); + mutex.lock(); + + next_offset = offset = new_offset; + SeekDone(); + DoRead(); +} + +void +NfsInputStream::OnNfsFileOpen(uint64_t _size) +{ + const ScopeLock protect(mutex); + + size = _size; + seekable = true; + next_offset = 0; + SetReady(); + DoRead(); +} + +void +NfsInputStream::OnNfsFileRead(const void *data, size_t data_size) +{ + const ScopeLock protect(mutex); + assert(!IsBufferFull()); + assert(IsBufferFull() == (GetBufferSpace() == 0)); + AppendToBuffer(data, data_size); + + next_offset += data_size; + + DoRead(); +} + +void +NfsInputStream::OnNfsFileError(Error &&error) +{ + const ScopeLock protect(mutex); + postponed_error = std::move(error); + + if (IsSeekPending()) + SeekDone(); + else if (!IsReady()) + SetReady(); +} + /* * InputPlugin methods * */ +static InputPlugin::InitResult +input_nfs_init(const config_param &, Error &) +{ + nfs_init(); + return InputPlugin::InitResult::SUCCESS; +} + +static void +input_nfs_finish() +{ + nfs_finish(); +} + static InputStream * input_nfs_open(const char *uri, Mutex &mutex, Cond &cond, @@ -104,62 +196,24 @@ input_nfs_open(const char *uri, if (!StringStartsWith(uri, "nfs://")) return nullptr; - uri += 6; - - const char *slash = strchr(uri, '/'); - if (slash == nullptr) { - error.Set(nfs_domain, "Malformed nfs:// URI"); - return nullptr; - } - - const std::string server(uri, slash); - - uri = slash; - slash = strrchr(uri + 1, '/'); - if (slash == nullptr || slash[1] == 0) { - error.Set(nfs_domain, "Malformed nfs:// URI"); - return nullptr; - } - - const std::string mount(uri, slash); - uri = slash; - - nfs_context *ctx = nfs_init_context(); - if (ctx == nullptr) { - error.Set(nfs_domain, "nfs_init_context() failed"); - return nullptr; - } - - int result = nfs_mount(ctx, server.c_str(), mount.c_str()); - if (result < 0) { - nfs_destroy_context(ctx); - error.SetErrno(-result, "nfs_mount() failed"); - return nullptr; - } - - nfsfh *fh; - result = nfs_open(ctx, uri, O_RDONLY, &fh); - if (result < 0) { - nfs_destroy_context(ctx); - error.SetErrno(-result, "nfs_open() failed"); + void *buffer = HugeAllocate(NFS_MAX_BUFFERED); + if (buffer == nullptr) { + error.Set(nfs_domain, "Out of memory"); return nullptr; } - struct stat st; - result = nfs_fstat(ctx, fh, &st); - if (result < 0) { - nfs_close(ctx, fh); - nfs_destroy_context(ctx); - error.SetErrno(-result, "nfs_fstat() failed"); + NfsInputStream *is = new NfsInputStream(uri, mutex, cond, buffer); + if (!is->Open(error)) { + delete is; return nullptr; } - return new NfsInputStream(uri, mutex, cond, ctx, fh, st.st_size); + return is; } const InputPlugin input_plugin_nfs = { "nfs", - nullptr, - nullptr, + input_nfs_init, + input_nfs_finish, input_nfs_open, }; diff --git a/src/lib/nfs/Callback.hxx b/src/lib/nfs/Callback.hxx new file mode 100644 index 000000000..ae82ecc3c --- /dev/null +++ b/src/lib/nfs/Callback.hxx @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2003-2014 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_NFS_CALLBACK_HXX +#define MPD_NFS_CALLBACK_HXX + +#include "check.h" + +class Error; + +class NfsCallback { +public: + virtual void OnNfsCallback(unsigned status, void *data) = 0; + virtual void OnNfsError(Error &&error) = 0; +}; + +#endif diff --git a/src/lib/nfs/Cancellable.hxx b/src/lib/nfs/Cancellable.hxx new file mode 100644 index 000000000..50762b582 --- /dev/null +++ b/src/lib/nfs/Cancellable.hxx @@ -0,0 +1,177 @@ +/* + * Copyright (C) 2003-2014 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_NFS_CANCELLABLE_HXX +#define MPD_NFS_CANCELLABLE_HXX + +#include "Compiler.h" + +#include <list> +#include <algorithm> + +#include <assert.h> + +template<typename T> +class CancellablePointer { +public: + typedef T *pointer_type; + typedef T &reference_type; + typedef const T &const_reference_type; + +private: + pointer_type p; + +public: + explicit constexpr CancellablePointer(reference_type _p):p(&_p) {} + + CancellablePointer(const CancellablePointer &) = delete; + + constexpr bool IsCancelled() const { + return p == nullptr; + } + + void Cancel() { + assert(!IsCancelled()); + + p = nullptr; + } + + reference_type Get() { + assert(p != nullptr); + + return *p; + } + + constexpr bool Is(const_reference_type other) const { + return p == &other; + } +}; + +template<typename T, typename CT=CancellablePointer<T>> +class CancellableList { +public: + typedef typename CT::reference_type reference_type; + typedef typename CT::const_reference_type const_reference_type; + +private: + typedef std::list<CT> List; + typedef typename List::iterator iterator; + typedef typename List::const_iterator const_iterator; + List list; + + class MatchPointer { + const_reference_type p; + + public: + explicit constexpr MatchPointer(const_reference_type _p) + :p(_p) {} + + constexpr bool operator()(const CT &a) const { + return a.Is(p); + } + }; + + gcc_pure + iterator Find(reference_type p) { + return std::find_if(list.begin(), list.end(), MatchPointer(p)); + } + + gcc_pure + const_iterator Find(const_reference_type p) const { + return std::find_if(list.begin(), list.end(), MatchPointer(p)); + } + + class MatchReference { + const CT &c; + + public: + constexpr explicit MatchReference(const CT &_c):c(_c) {} + + gcc_pure + bool operator()(const CT &a) const { + return &a == &c; + } + }; + + gcc_pure + iterator Find(CT &c) { + return std::find_if(list.begin(), list.end(), + MatchReference(c)); + } + + gcc_pure + const_iterator Find(const CT &c) const { + return std::find_if(list.begin(), list.end(), + MatchReference(c)); + } + +public: +#ifndef NDEBUG + gcc_pure + bool IsEmpty() const { + for (const auto &c : list) + if (!c.IsCancelled()) + return false; + + return true; + } +#endif + + gcc_pure + bool Contains(const_reference_type p) const { + return Find(p) != list.end(); + } + + template<typename... Args> + CT &Add(reference_type p, Args&&... args) { + assert(Find(p) == list.end()); + + list.emplace_back(p, std::forward<Args>(args)...); + return list.back(); + } + + void RemoveLast() { + list.pop_back(); + } + + bool RemoveOptional(CT &ct) { + auto i = Find(ct); + if (i == list.end()) + return false; + + list.erase(i); + return true; + } + + void Remove(CT &ct) { + auto i = Find(ct); + assert(i != list.end()); + + list.erase(i); + } + + void Cancel(reference_type p) { + auto i = Find(p); + assert(i != list.end()); + + i->Cancel(); + } +}; + +#endif diff --git a/src/lib/nfs/Connection.cxx b/src/lib/nfs/Connection.cxx new file mode 100644 index 000000000..4c2f523f6 --- /dev/null +++ b/src/lib/nfs/Connection.cxx @@ -0,0 +1,415 @@ +/* + * Copyright (C) 2003-2014 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "config.h" +#include "Connection.hxx" +#include "Lease.hxx" +#include "Domain.hxx" +#include "Callback.hxx" +#include "system/fd_util.h" +#include "util/Error.hxx" +#include "event/Call.hxx" + +extern "C" { +#include <nfsc/libnfs.h> +} + +#include <utility> + +inline bool +NfsConnection::CancellableCallback::Open(nfs_context *ctx, + const char *path, int flags, + Error &error) +{ + int result = nfs_open_async(ctx, path, flags, + Callback, this); + if (result < 0) { + error.Format(nfs_domain, "nfs_open_async() failed: %s", + nfs_get_error(ctx)); + return false; + } + + return true; +} + +inline bool +NfsConnection::CancellableCallback::Stat(nfs_context *ctx, + struct nfsfh *fh, + Error &error) +{ + int result = nfs_fstat_async(ctx, fh, Callback, this); + if (result < 0) { + error.Format(nfs_domain, "nfs_fstat_async() failed: %s", + nfs_get_error(ctx)); + return false; + } + + return true; +} + +inline bool +NfsConnection::CancellableCallback::Read(nfs_context *ctx, struct nfsfh *fh, + uint64_t offset, size_t size, + Error &error) +{ + int result = nfs_pread_async(ctx, fh, offset, size, Callback, this); + if (result < 0) { + error.Format(nfs_domain, "nfs_pread_async() failed: %s", + nfs_get_error(ctx)); + return false; + } + + return true; +} + +inline void +NfsConnection::CancellableCallback::Callback(int err, void *data) +{ + if (!IsCancelled()) { + NfsCallback &cb = Get(); + + connection.callbacks.Remove(*this); + + if (err >= 0) + cb.OnNfsCallback((unsigned)err, data); + else + cb.OnNfsError(Error(nfs_domain, err, + (const char *)data)); + } else { + connection.callbacks.Remove(*this); + } +} + +void +NfsConnection::CancellableCallback::Callback(int err, + gcc_unused struct nfs_context *nfs, + void *data, void *private_data) +{ + CancellableCallback &c = *(CancellableCallback *)private_data; + c.Callback(err, data); +} + +static constexpr unsigned +libnfs_to_events(int i) +{ + return ((i & POLLIN) ? SocketMonitor::READ : 0) | + ((i & POLLOUT) ? SocketMonitor::WRITE : 0); +} + +static constexpr int +events_to_libnfs(unsigned i) +{ + return ((i & SocketMonitor::READ) ? POLLIN : 0) | + ((i & SocketMonitor::WRITE) ? POLLOUT : 0); +} + +NfsConnection::~NfsConnection() +{ + assert(new_leases.empty()); + assert(active_leases.empty()); + assert(callbacks.IsEmpty()); + + if (context != nullptr) + BlockingCall(SocketMonitor::GetEventLoop(), [this](){ + DestroyContext(); + }); +} + +void +NfsConnection::AddLease(NfsLease &lease) +{ + { + const ScopeLock protect(mutex); + new_leases.push_back(&lease); + } + + DeferredMonitor::Schedule(); +} + +void +NfsConnection::RemoveLease(NfsLease &lease) +{ + const ScopeLock protect(mutex); + + new_leases.remove(&lease); + active_leases.remove(&lease); +} + +bool +NfsConnection::Open(const char *path, int flags, NfsCallback &callback, + Error &error) +{ + assert(!callbacks.Contains(callback)); + + auto &c = callbacks.Add(callback, *this); + if (!c.Open(context, path, flags, error)) { + callbacks.RemoveLast(); + return false; + } + + ScheduleSocket(); + return true; +} + +bool +NfsConnection::Stat(struct nfsfh *fh, NfsCallback &callback, Error &error) +{ + assert(!callbacks.Contains(callback)); + + auto &c = callbacks.Add(callback, *this); + if (!c.Stat(context, fh, error)) { + callbacks.RemoveLast(); + return false; + } + + ScheduleSocket(); + return true; +} + +bool +NfsConnection::Read(struct nfsfh *fh, uint64_t offset, size_t size, + NfsCallback &callback, Error &error) +{ + assert(!callbacks.Contains(callback)); + + auto &c = callbacks.Add(callback, *this); + if (!c.Read(context, fh, offset, size, error)) { + callbacks.RemoveLast(); + return false; + } + + ScheduleSocket(); + return true; +} + +void +NfsConnection::Cancel(NfsCallback &callback) +{ + callbacks.Cancel(callback); +} + +static void +DummyCallback(int, struct nfs_context *, void *, void *) +{ +} + +void +NfsConnection::Close(struct nfsfh *fh) +{ + nfs_close_async(context, fh, DummyCallback, nullptr); + ScheduleSocket(); +} + +void +NfsConnection::DestroyContext() +{ + assert(context != nullptr); + + SocketMonitor::Cancel(); + nfs_destroy_context(context); + context = nullptr; +} + +void +NfsConnection::ScheduleSocket() +{ + assert(context != nullptr); + + if (!SocketMonitor::IsDefined()) { + int _fd = nfs_get_fd(context); + fd_set_cloexec(_fd, true); + SocketMonitor::Open(_fd); + } + + SocketMonitor::Schedule(libnfs_to_events(nfs_which_events(context))); +} + +bool +NfsConnection::OnSocketReady(unsigned flags) +{ + bool closed = false; + + const bool was_mounted = mount_finished; + if (!mount_finished) + /* until the mount is finished, the NFS client may use + various sockets, therefore we unregister and + re-register it each time */ + SocketMonitor::Steal(); + + assert(!in_event); + in_event = true; + + assert(!in_service); + in_service = true; + postponed_destroy = false; + + int result = nfs_service(context, events_to_libnfs(flags)); + + assert(context != nullptr); + assert(in_service); + in_service = false; + + if (postponed_destroy) { + /* somebody has called nfs_client_free() while we were inside + nfs_service() */ + const ScopeLock protect(mutex); + DestroyContext(); + closed = true; + // TODO? nfs_client_cleanup_files(client); + } else if (!was_mounted && mount_finished) { + const ScopeLock protect(mutex); + + if (postponed_mount_error.IsDefined()) { + DestroyContext(); + closed = true; + BroadcastMountError(std::move(postponed_mount_error)); + } else if (result == 0) + BroadcastMountSuccess(); + } else if (result < 0) { + /* the connection has failed */ + Error error; + error.Format(nfs_domain, "NFS connection has failed: %s", + nfs_get_error(context)); + + const ScopeLock protect(mutex); + + DestroyContext(); + closed = true; + + if (!mount_finished) + BroadcastMountError(std::move(error)); + else + BroadcastError(std::move(error)); + } + + assert(in_event); + in_event = false; + + if (context != nullptr) + ScheduleSocket(); + + return !closed; +} + +inline void +NfsConnection::MountCallback(int status, gcc_unused nfs_context *nfs, + gcc_unused void *data) +{ + assert(context == nfs); + + mount_finished = true; + + if (status < 0) { + postponed_mount_error.Set(nfs_domain, status, + "nfs_mount_async() failed"); + return; + } +} + +void +NfsConnection::MountCallback(int status, nfs_context *nfs, void *data, + void *private_data) +{ + NfsConnection *c = (NfsConnection *)private_data; + + c->MountCallback(status, nfs, data); +} + +inline bool +NfsConnection::MountInternal(Error &error) +{ + if (context != nullptr) + return true; + + context = nfs_init_context(); + if (context == nullptr) { + error.Set(nfs_domain, "nfs_init_context() failed"); + return false; + } + + postponed_mount_error.Clear(); + mount_finished = false; + in_service = false; + in_event = false; + + if (nfs_mount_async(context, server.c_str(), export_name.c_str(), + MountCallback, this) != 0) { + error.Format(nfs_domain, + "nfs_mount_async() failed: %s", + nfs_get_error(context)); + nfs_destroy_context(context); + context = nullptr; + return false; + } + + ScheduleSocket(); + return true; +} + +void +NfsConnection::BroadcastMountSuccess() +{ + while (!new_leases.empty()) { + auto i = new_leases.begin(); + active_leases.splice(active_leases.end(), new_leases, i); + (*i)->OnNfsConnectionReady(); + } +} + +void +NfsConnection::BroadcastMountError(Error &&error) +{ + while (!new_leases.empty()) { + auto l = new_leases.front(); + new_leases.pop_front(); + l->OnNfsConnectionFailed(error); + } + + OnNfsConnectionError(std::move(error)); +} + +void +NfsConnection::BroadcastError(Error &&error) +{ + while (!active_leases.empty()) { + auto l = active_leases.front(); + active_leases.pop_front(); + l->OnNfsConnectionDisconnected(error); + } + + BroadcastMountError(std::move(error)); +} + +void +NfsConnection::RunDeferred() +{ + { + Error error; + if (!MountInternal(error)) { + const ScopeLock protect(mutex); + BroadcastMountError(std::move(error)); + return; + } + } + + if (mount_finished) { + const ScopeLock protect(mutex); + BroadcastMountSuccess(); + } +} diff --git a/src/lib/nfs/Connection.hxx b/src/lib/nfs/Connection.hxx new file mode 100644 index 000000000..8850ff6f3 --- /dev/null +++ b/src/lib/nfs/Connection.hxx @@ -0,0 +1,160 @@ +/* + * Copyright (C) 2003-2014 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_NFS_CONNECTION_HXX +#define MPD_NFS_CONNECTION_HXX + +#include "Lease.hxx" +#include "Cancellable.hxx" +#include "thread/Mutex.hxx" +#include "event/SocketMonitor.hxx" +#include "event/DeferredMonitor.hxx" +#include "util/Error.hxx" + +#include <string> +#include <list> + +struct nfs_context; +class NfsCallback; + +/** + * An asynchronous connection to a NFS server. + */ +class NfsConnection : SocketMonitor, DeferredMonitor { + class CancellableCallback : public CancellablePointer<NfsCallback> { + NfsConnection &connection; + + public: + explicit constexpr CancellableCallback(NfsCallback &_callback, + NfsConnection &_connection) + :CancellablePointer<NfsCallback>(_callback), + connection(_connection) {} + + bool Open(nfs_context *context, const char *path, int flags, + Error &error); + bool Stat(nfs_context *context, struct nfsfh *fh, + Error &error); + bool Read(nfs_context *context, struct nfsfh *fh, + uint64_t offset, size_t size, + Error &error); + + private: + static void Callback(int err, struct nfs_context *nfs, + void *data, void *private_data); + void Callback(int err, void *data); + }; + + std::string server, export_name; + + nfs_context *context; + + Mutex mutex; + + typedef std::list<NfsLease *> LeaseList; + LeaseList new_leases, active_leases; + + typedef CancellableList<NfsCallback, CancellableCallback> CallbackList; + CallbackList callbacks; + + Error postponed_mount_error; + + /** + * True when nfs_service() is being called. During that, + * nfs_client_free() is postponed, or libnfs will crash. See + * #postponed_destroy. + */ + bool in_service; + + /** + * True when OnSocketReady() is being called. During that, + * event updates are omitted. + */ + bool in_event; + + /** + * True when nfs_client_free() has been called while #in_service + * was true. + */ + bool postponed_destroy; + + bool mount_finished; + +public: + gcc_nonnull_all + NfsConnection(EventLoop &_loop, + const char *_server, const char *_export_name) + :SocketMonitor(_loop), DeferredMonitor(_loop), + server(_server), export_name(_export_name), + context(nullptr) {} + + ~NfsConnection(); + + gcc_pure + const char *GetServer() const { + return server.c_str(); + } + + gcc_pure + const char *GetExportName() const { + return export_name.c_str(); + } + + /** + * Ensure that the connection is established. The connection + * is kept up while at least one #NfsLease is registered. + * + * This method is thread-safe. However, #NfsLease's methods + * will be invoked from within the #EventLoop's thread. + */ + void AddLease(NfsLease &lease); + void RemoveLease(NfsLease &lease); + + bool Open(const char *path, int flags, NfsCallback &callback, + Error &error); + bool Stat(struct nfsfh *fh, NfsCallback &callback, Error &error); + bool Read(struct nfsfh *fh, uint64_t offset, size_t size, + NfsCallback &callback, Error &error); + void Cancel(NfsCallback &callback); + + void Close(struct nfsfh *fh); + +protected: + virtual void OnNfsConnectionError(Error &&error) = 0; + +private: + void DestroyContext(); + bool MountInternal(Error &error); + void BroadcastMountSuccess(); + void BroadcastMountError(Error &&error); + void BroadcastError(Error &&error); + + static void MountCallback(int status, nfs_context *nfs, void *data, + void *private_data); + void MountCallback(int status, nfs_context *nfs, void *data); + + void ScheduleSocket(); + + /* virtual methods from SocketMonitor */ + virtual bool OnSocketReady(unsigned flags) override; + + /* virtual methods from DeferredMonitor */ + virtual void RunDeferred() override; +}; + +#endif diff --git a/src/lib/nfs/FileReader.cxx b/src/lib/nfs/FileReader.cxx new file mode 100644 index 000000000..52d951fa6 --- /dev/null +++ b/src/lib/nfs/FileReader.cxx @@ -0,0 +1,244 @@ +/* + * Copyright (C) 2003-2014 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "config.h" +#include "FileReader.hxx" +#include "Glue.hxx" +#include "Connection.hxx" +#include "Domain.hxx" +#include "event/Call.hxx" +#include "IOThread.hxx" +#include "util/StringUtil.hxx" +#include "util/Error.hxx" + +#include <utility> + +#include <assert.h> +#include <string.h> +#include <fcntl.h> + +NfsFileReader::NfsFileReader() + :DeferredMonitor(io_thread_get()), state(State::INITIAL) +{ +} + +NfsFileReader::~NfsFileReader() +{ + assert(state == State::INITIAL); +} + +void +NfsFileReader::Close() +{ + if (state == State::INITIAL) + return; + + if (state == State::DEFER) { + state = State::INITIAL; + DeferredMonitor::Cancel(); + return; + } + + connection->RemoveLease(*this); + + if (state > State::MOUNT && state != State::IDLE) + connection->Cancel(*this); + + if (state > State::OPEN) + connection->Close(fh); + + state = State::INITIAL; +} + +void +NfsFileReader::DeferClose() +{ + BlockingCall(io_thread_get(), [this](){ Close(); }); +} + +bool +NfsFileReader::Open(const char *uri, Error &error) +{ + assert(state == State::INITIAL); + + if (!StringStartsWith(uri, "nfs://")) { + error.Set(nfs_domain, "Malformed nfs:// URI"); + return false; + } + + uri += 6; + + const char *slash = strchr(uri, '/'); + if (slash == nullptr) { + error.Set(nfs_domain, "Malformed nfs:// URI"); + return false; + } + + server = std::string(uri, slash); + + uri = slash; + slash = strrchr(uri + 1, '/'); + if (slash == nullptr || slash[1] == 0) { + error.Set(nfs_domain, "Malformed nfs:// URI"); + return false; + } + + export_name = std::string(uri, slash); + path = slash; + + state = State::DEFER; + DeferredMonitor::Schedule(); + return true; +} + +bool +NfsFileReader::Read(uint64_t offset, size_t size, Error &error) +{ + assert(state == State::IDLE); + + if (!connection->Read(fh, offset, size, *this, error)) + return false; + + state = State::READ; + return true; +} + +void +NfsFileReader::CancelRead() +{ + if (state == State::READ) { + connection->Cancel(*this); + state = State::IDLE; + } +} + +void +NfsFileReader::OnNfsConnectionReady() +{ + assert(state == State::MOUNT); + + Error error; + if (!connection->Open(path, O_RDONLY, *this, error)) { + OnNfsFileError(std::move(error)); + return; + } + + state = State::OPEN; +} + +void +NfsFileReader::OnNfsConnectionFailed(const Error &error) +{ + assert(state == State::MOUNT); + + Error copy; + copy.Set(error); + OnNfsFileError(std::move(copy)); +} + +void +NfsFileReader::OnNfsConnectionDisconnected(const Error &error) +{ + assert(state > State::MOUNT); + + state = State::INITIAL; + + Error copy; + copy.Set(error); + OnNfsFileError(std::move(copy)); +} + +inline void +NfsFileReader::OpenCallback(nfsfh *_fh) +{ + assert(state == State::OPEN); + assert(connection != nullptr); + assert(_fh != nullptr); + + fh = _fh; + + Error error; + if (!connection->Stat(fh, *this, error)) { + OnNfsFileError(std::move(error)); + return; + } + + state = State::STAT; +} + +inline void +NfsFileReader::StatCallback(const struct stat *st) +{ + assert(state == State::STAT); + assert(connection != nullptr); + assert(fh != nullptr); + assert(st != nullptr); + + if (!S_ISREG(st->st_mode)) { + OnNfsFileError(Error(nfs_domain, "Not a regular file")); + return; + } + + state = State::IDLE; + + OnNfsFileOpen(st->st_size); +} + +void +NfsFileReader::OnNfsCallback(unsigned status, void *data) +{ + switch (state) { + case State::INITIAL: + case State::DEFER: + case State::MOUNT: + case State::IDLE: + assert(false); + gcc_unreachable(); + + case State::OPEN: + OpenCallback((struct nfsfh *)data); + break; + + case State::STAT: + StatCallback((const struct stat *)data); + break; + + case State::READ: + state = State::IDLE; + OnNfsFileRead(data, status); + break; + } +} + +void +NfsFileReader::OnNfsError(Error &&error) +{ + OnNfsFileError(std::move(error)); +} + +void +NfsFileReader::RunDeferred() +{ + assert(state == State::DEFER); + + state = State::MOUNT; + + connection = &nfs_get_connection(server.c_str(), export_name.c_str()); + connection->AddLease(*this); +} diff --git a/src/lib/nfs/FileReader.hxx b/src/lib/nfs/FileReader.hxx new file mode 100644 index 000000000..7f43e0ecf --- /dev/null +++ b/src/lib/nfs/FileReader.hxx @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2003-2014 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_NFS_FILE_READER_HXX +#define MPD_NFS_FILE_READER_HXX + +#include "check.h" +#include "Lease.hxx" +#include "Callback.hxx" +#include "event/DeferredMonitor.hxx" + +#include <string> + +#include <stdint.h> +#include <stddef.h> +#include <sys/stat.h> + +struct nfsfh; +class NfsConnection; + +class NfsFileReader : NfsLease, NfsCallback, DeferredMonitor { + enum class State { + INITIAL, + DEFER, + MOUNT, + OPEN, + STAT, + READ, + IDLE, + }; + + State state; + + std::string server, export_name; + const char *path; + + NfsConnection *connection; + + nfsfh *fh; + +public: + NfsFileReader(); + ~NfsFileReader(); + + void Close(); + void DeferClose(); + + bool Open(const char *uri, Error &error); + bool Read(uint64_t offset, size_t size, Error &error); + void CancelRead(); + + bool IsIdle() const { + return state == State::IDLE; + } + +protected: + virtual void OnNfsFileOpen(uint64_t size) = 0; + virtual void OnNfsFileRead(const void *data, size_t size) = 0; + virtual void OnNfsFileError(Error &&error) = 0; + +private: + void OpenCallback(nfsfh *_fh); + void StatCallback(const struct stat *st); + + /* virtual methods from NfsLease */ + void OnNfsConnectionReady() final; + void OnNfsConnectionFailed(const Error &error) final; + void OnNfsConnectionDisconnected(const Error &error) final; + + /* virtual methods from NfsCallback */ + void OnNfsCallback(unsigned status, void *data) final; + void OnNfsError(Error &&error) final; + + /* virtual methods from DeferredMonitor */ + void RunDeferred() final; +}; + +#endif diff --git a/src/lib/nfs/Glue.cxx b/src/lib/nfs/Glue.cxx new file mode 100644 index 000000000..c043f06c6 --- /dev/null +++ b/src/lib/nfs/Glue.cxx @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2003-2014 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "config.h" +#include "Glue.hxx" +#include "Manager.hxx" +#include "IOThread.hxx" +#include "util/Manual.hxx" + +class NfsGlue { + NfsManager manager; + +public: + NfsGlue(EventLoop &_loop) + :manager(_loop) {} + + ~NfsGlue() { + //assert(open_uri.empty()); + } + + NfsConnection &GetConnection(const char *server, const char *export_name) { + return manager.GetConnection(server, export_name); + } +}; + +static Manual<NfsGlue> nfs_glue; +static unsigned in_use; + +void +nfs_init() +{ + if (in_use++ > 0) + return; + + nfs_glue.Construct(io_thread_get()); +} + +void +nfs_finish() +{ + assert(in_use > 0); + + if (--in_use > 0) + return; + + nfs_glue.Destruct(); +} + +NfsConnection & +nfs_get_connection(const char *server, const char *export_name) +{ + return nfs_glue->GetConnection(server, export_name); +} diff --git a/src/lib/nfs/Glue.hxx b/src/lib/nfs/Glue.hxx new file mode 100644 index 000000000..6da8957cb --- /dev/null +++ b/src/lib/nfs/Glue.hxx @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2003-2014 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_NFS_GLUE_HXX +#define MPD_NFS_GLUE_HXX + +#include "check.h" +#include "Compiler.h" + +class NfsConnection; + +void +nfs_init(); + +void +nfs_finish(); + +gcc_pure +NfsConnection & +nfs_get_connection(const char *server, const char *export_name); + +#endif diff --git a/src/lib/nfs/Lease.hxx b/src/lib/nfs/Lease.hxx new file mode 100644 index 000000000..6f88acf53 --- /dev/null +++ b/src/lib/nfs/Lease.hxx @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2003-2014 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_NFS_LEASE_HXX +#define MPD_NFS_LEASE_HXX + +#include "check.h" + +class Error; + +class NfsLease { +public: + /** + * The #NfsConnection has successfully mounted the server's + * export and is ready for regular operation. + */ + virtual void OnNfsConnectionReady() = 0; + + /** + * The #NfsConnection has failed to mount the server's export. + * This is being called instead of OnNfsConnectionReady(). + */ + virtual void OnNfsConnectionFailed(const Error &error) = 0; + + /** + * The #NfsConnection has failed after OnNfsConnectionReady() + * had been called already. + */ + virtual void OnNfsConnectionDisconnected(const Error &error) = 0; +}; + +#endif diff --git a/src/lib/nfs/Manager.cxx b/src/lib/nfs/Manager.cxx new file mode 100644 index 000000000..56a3fb79a --- /dev/null +++ b/src/lib/nfs/Manager.cxx @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2003-2014 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "config.h" +#include "Manager.hxx" +#include "event/Loop.hxx" +#include "Log.hxx" + +void +NfsManager::ManagedConnection::OnNfsConnectionError(Error &&error) +{ + FormatError(error, "NFS error on %s:%s", GetServer(), GetExportName()); + + manager.connections.erase(Key(GetServer(), GetExportName())); +} + +NfsConnection & +NfsManager::GetConnection(const char *server, const char *export_name) +{ + assert(server != nullptr); + assert(export_name != nullptr); + assert(loop.IsInside()); + + const std::string key = Key(server, export_name); + + auto e = connections.emplace(std::piecewise_construct, + std::forward_as_tuple(key), + std::forward_as_tuple(*this, loop, + server, + export_name)); + return e.first->second; +} diff --git a/src/lib/nfs/Manager.hxx b/src/lib/nfs/Manager.hxx new file mode 100644 index 000000000..4a380bd51 --- /dev/null +++ b/src/lib/nfs/Manager.hxx @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2003-2014 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_NFS_MANAGER_HXX +#define MPD_NFS_MANAGER_HXX + +#include "check.h" +#include "Connection.hxx" + +#include <string> +#include <map> + +/** + * A manager for NFS connections. Handles multiple connections to + * multiple NFS servers. + */ +class NfsManager { + class ManagedConnection final : public NfsConnection { + NfsManager &manager; + + public: + ManagedConnection(NfsManager &_manager, EventLoop &_loop, + const char *_server, + const char *_export_name) + :NfsConnection(_loop, _server, _export_name), + manager(_manager) {} + + protected: + /* virtual methods from NfsConnection */ + void OnNfsConnectionError(Error &&error) override; + }; + + EventLoop &loop; + + /** + * Maps server+":"+export_name (see method Key()) to + * #ManagedConnection. + */ + std::map<std::string, ManagedConnection> connections; + +public: + NfsManager(EventLoop &_loop) + :loop(_loop) {} + + gcc_pure + NfsConnection &GetConnection(const char *server, + const char *export_name); + +private: + gcc_pure + static std::string Key(const char *server, const char *export_name) { + return std::string(server) + ':' + export_name; + } +}; + +#endif |