From bb922d577dd8dc484d25c29c7c85bf04ecd62256 Mon Sep 17 00:00:00 2001
From: Max Kellermann <max@duempel.org>
Date: Wed, 1 Oct 2014 23:57:28 +0200
Subject: storage/nfs: use the libnfs async API

Share the NFS connection with the NFS input plugin.
---
 src/storage/plugins/NfsStorage.cxx | 277 ++++++++++++++++++++++++++++++-------
 1 file changed, 228 insertions(+), 49 deletions(-)

(limited to 'src/storage')

diff --git a/src/storage/plugins/NfsStorage.cxx b/src/storage/plugins/NfsStorage.cxx
index e28e41a67..8ddb14250 100644
--- a/src/storage/plugins/NfsStorage.cxx
+++ b/src/storage/plugins/NfsStorage.cxx
@@ -23,31 +23,64 @@
 #include "storage/StorageInterface.hxx"
 #include "storage/FileInfo.hxx"
 #include "storage/MemoryDirectoryReader.hxx"
+#include "lib/nfs/Blocking.hxx"
 #include "lib/nfs/Domain.hxx"
 #include "lib/nfs/Base.hxx"
+#include "lib/nfs/Lease.hxx"
+#include "lib/nfs/Connection.hxx"
+#include "lib/nfs/Glue.hxx"
 #include "fs/AllocatedPath.hxx"
 #include "util/Error.hxx"
 #include "thread/Mutex.hxx"
+#include "thread/Cond.hxx"
+#include "event/Loop.hxx"
+#include "event/Call.hxx"
+#include "event/DeferredMonitor.hxx"
+#include "event/TimeoutMonitor.hxx"
 
 extern "C" {
 #include <nfsc/libnfs.h>
 #include <nfsc/libnfs-raw-nfs.h>
 }
 
+#include <string>
+
+#include <assert.h>
 #include <sys/stat.h>
 #include <fcntl.h>
 
-class NfsStorage final : public Storage {
+class NfsStorage final
+	: public Storage, NfsLease, DeferredMonitor, TimeoutMonitor {
+
+	enum class State {
+		INITIAL, CONNECTING, READY, DELAY,
+	};
+
 	const std::string base;
 
-	nfs_context *const ctx;
+	const std::string server, export_name;
+
+	NfsConnection *connection;
+
+	Mutex mutex;
+	Cond cond;
+	State state;
+	Error last_error;
 
 public:
-	NfsStorage(const char *_base, nfs_context *_ctx)
-		:base(_base), ctx(_ctx) {}
+	NfsStorage(EventLoop &_loop, const char *_base,
+		   std::string &&_server, std::string &&_export_name)
+		:DeferredMonitor(_loop), TimeoutMonitor(_loop),
+		 base(_base),
+		 server(std::move(_server)),
+		 export_name(std::move(_export_name)),
+		 state(State::INITIAL) {
+		nfs_init();
+	}
 
-	virtual ~NfsStorage() {
-		nfs_destroy_context(ctx);
+	~NfsStorage() {
+		BlockingCall(GetEventLoop(), [this](){ Disconnect(); });
+		nfs_finish();
 	}
 
 	/* virtual methods from class Storage */
@@ -60,6 +93,125 @@ public:
 	std::string MapUTF8(const char *uri_utf8) const override;
 
 	const char *MapToRelativeUTF8(const char *uri_utf8) const override;
+
+	/* virtual methods from NfsLease */
+	void OnNfsConnectionReady() final {
+		assert(state == State::CONNECTING);
+
+		SetState(State::READY);
+	}
+
+	void OnNfsConnectionFailed(gcc_unused const Error &error) final {
+		assert(state == State::CONNECTING);
+
+		SetState(State::DELAY, error);
+		TimeoutMonitor::ScheduleSeconds(60);
+	}
+
+	void OnNfsConnectionDisconnected(gcc_unused const Error &error) final {
+		assert(state == State::READY);
+
+		SetState(State::DELAY, error);
+		TimeoutMonitor::ScheduleSeconds(5);
+	}
+
+	/* virtual methods from DeferredMonitor */
+	void RunDeferred() final {
+		if (state == State::INITIAL)
+			Connect();
+	}
+
+	/* virtual methods from TimeoutMonitor */
+	void OnTimeout() final {
+		assert(state == State::DELAY);
+
+		Connect();
+	}
+
+private:
+	EventLoop &GetEventLoop() {
+		return DeferredMonitor::GetEventLoop();
+	}
+
+	void SetState(State _state) {
+		assert(GetEventLoop().IsInside());
+
+		const ScopeLock protect(mutex);
+		state = _state;
+		cond.broadcast();
+	}
+
+	void SetState(State _state, const Error &error) {
+		assert(GetEventLoop().IsInside());
+
+		const ScopeLock protect(mutex);
+		state = _state;
+		last_error.Set(error);
+		cond.broadcast();
+	}
+
+	void Connect() {
+		assert(state != State::READY);
+		assert(GetEventLoop().IsInside());
+
+		connection = &nfs_get_connection(server.c_str(),
+						 export_name.c_str());
+		connection->AddLease(*this);
+
+		SetState(State::CONNECTING);
+	}
+
+	void EnsureConnected() {
+		if (state != State::READY)
+			Connect();
+	}
+
+	bool WaitConnected(Error &error) {
+		const ScopeLock protect(mutex);
+
+		while (true) {
+			switch (state) {
+			case State::INITIAL:
+				/* schedule connect */
+				mutex.unlock();
+				DeferredMonitor::Schedule();
+				mutex.lock();
+				break;
+
+			case State::CONNECTING:
+			case State::READY:
+				return true;
+
+			case State::DELAY:
+				assert(last_error.IsDefined());
+				error.Set(last_error);
+				return false;
+			}
+
+			cond.wait(mutex);
+		}
+	}
+
+	void Disconnect() {
+		assert(GetEventLoop().IsInside());
+
+		switch (state) {
+		case State::INITIAL:
+			DeferredMonitor::Cancel();
+			break;
+
+		case State::CONNECTING:
+		case State::READY:
+			connection->RemoveLease(*this);
+			SetState(State::INITIAL);
+			break;
+
+		case State::DELAY:
+			TimeoutMonitor::Cancel();
+			SetState(State::INITIAL);
+			break;
+		}
+	}
 };
 
 static std::string
@@ -107,19 +259,24 @@ Copy(FileInfo &info, const struct stat &st)
 	info.inode = st.st_ino;
 }
 
-static bool
-GetInfo(nfs_context *ctx, const char *path, FileInfo &info, Error &error)
-{
-	struct stat st;
-	int result = nfs_stat(ctx, path, &st);
-	if (result < 0) {
-		error.SetErrno(-result, "nfs_stat() failed");
-		return false;
+class NfsGetInfoOperation final : public BlockingNfsOperation {
+	const char *const path;
+	FileInfo &info;
+
+public:
+	NfsGetInfoOperation(NfsConnection &_connection, const char *_path,
+			    FileInfo &_info)
+		:BlockingNfsOperation(_connection), path(_path), info(_info) {}
+
+protected:
+	bool Start(Error &_error) override {
+		return connection.Stat(path, *this, _error);
 	}
 
-	Copy(info, st);
-	return true;
-}
+	void HandleResult(gcc_unused unsigned status, void *data) override {
+		Copy(info, *(const struct stat *)data);
+	}
+};
 
 bool
 NfsStorage::GetInfo(const char *uri_utf8, gcc_unused bool follow,
@@ -129,7 +286,11 @@ NfsStorage::GetInfo(const char *uri_utf8, gcc_unused bool follow,
 	if (path.empty())
 		return false;
 
-	return ::GetInfo(ctx, path.c_str(), info, error);
+	if (!WaitConnected(error))
+		return nullptr;
+
+	NfsGetInfoOperation operation(*connection, path.c_str(), info);
+	return operation.Run(error);
 }
 
 gcc_pure
@@ -164,24 +325,43 @@ Copy(FileInfo &info, const struct nfsdirent &ent)
 	info.inode = ent.inode;
 }
 
-StorageDirectoryReader *
-NfsStorage::OpenDirectory(const char *uri_utf8, Error &error)
-{
-	const std::string path = UriToNfsPath(uri_utf8, error);
-	if (path.empty())
-		return nullptr;
+class NfsListDirectoryOperation final : public BlockingNfsOperation {
+	const char *const path;
 
-	nfsdir *dir;
-	int result = nfs_opendir(ctx, path.c_str(), &dir);
-	if (result < 0) {
-		error.SetErrno(-result, "nfs_opendir() failed");
-		return nullptr;
+	MemoryStorageDirectoryReader::List entries;
+
+public:
+	NfsListDirectoryOperation(NfsConnection &_connection,
+				  const char *_path)
+		:BlockingNfsOperation(_connection), path(_path) {}
+
+	StorageDirectoryReader *ToReader() {
+		return new MemoryStorageDirectoryReader(std::move(entries));
 	}
 
-	MemoryStorageDirectoryReader::List entries;
+protected:
+	bool Start(Error &_error) override {
+		return connection.OpenDirectory(path, *this, _error);
+	}
+
+	void HandleResult(gcc_unused unsigned status, void *data) override {
+		struct nfsdir *const dir = (struct nfsdir *)data;
+
+		CollectEntries(dir);
+		connection.CloseDirectory(dir);
+	}
+
+private:
+	void CollectEntries(struct nfsdir *dir);
+};
+
+inline void
+NfsListDirectoryOperation::CollectEntries(struct nfsdir *dir)
+{
+	assert(entries.empty());
 
 	const struct nfsdirent *ent;
-	while ((ent = nfs_readdir(ctx, dir)) != nullptr) {
+	while ((ent = connection.ReadDirectory(dir)) != nullptr) {
 		const Path name_fs = Path::FromFS(ent->name);
 		if (SkipNameFS(name_fs.c_str()))
 			continue;
@@ -195,15 +375,27 @@ NfsStorage::OpenDirectory(const char *uri_utf8, Error &error)
 		entries.emplace_front(std::move(name_utf8));
 		Copy(entries.front().info, *ent);
 	}
+}
 
-	nfs_closedir(ctx, dir);
+StorageDirectoryReader *
+NfsStorage::OpenDirectory(const char *uri_utf8, Error &error)
+{
+	const std::string path = UriToNfsPath(uri_utf8, error);
+	if (path.empty())
+		return nullptr;
 
-	/* don't reverse the list - order does not matter */
-	return new MemoryStorageDirectoryReader(std::move(entries));
+	if (!WaitConnected(error))
+		return nullptr;
+
+	NfsListDirectoryOperation operation(*connection, path.c_str());
+	if (!operation.Run(error))
+		return nullptr;
+
+	return operation.ToReader();
 }
 
 static Storage *
-CreateNfsStorageURI(gcc_unused EventLoop &event_loop, const char *base,
+CreateNfsStorageURI(EventLoop &event_loop, const char *base,
 		    Error &error)
 {
 	if (memcmp(base, "nfs://", 6) != 0)
@@ -219,22 +411,9 @@ CreateNfsStorageURI(gcc_unused EventLoop &event_loop, const char *base,
 
 	const std::string server(p, mount);
 
-	nfs_context *ctx = nfs_init_context();
-	if (ctx == nullptr) {
-		error.Set(nfs_domain, "nfs_init_context() failed");
-		return nullptr;
-	}
-
-	int result = nfs_mount(ctx, server.c_str(), mount);
-	if (result < 0) {
-		nfs_destroy_context(ctx);
-		error.SetErrno(-result, "nfs_mount() failed");
-		return nullptr;
-	}
-
 	nfs_set_base(server.c_str(), mount);
 
-	return new NfsStorage(base, ctx);
+	return new NfsStorage(event_loop, base, server.c_str(), mount);
 }
 
 const StoragePlugin nfs_storage_plugin = {
-- 
cgit v1.2.3