From 39439b80f5d1574e67f337a047869cf067c396b7 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Mon, 14 Jan 2013 23:42:06 +0100 Subject: Client: rebase on the new BufferedSocket class --- src/Client.hxx | 4 +-- src/ClientEvent.cxx | 90 +++++------------------------------------------ src/ClientExpire.cxx | 15 +++----- src/ClientIdle.cxx | 4 +-- src/ClientInternal.hxx | 34 +++++++++--------- src/ClientNew.cxx | 31 ++++------------ src/ClientProcess.cxx | 4 --- src/ClientRead.cxx | 95 ++++++++++++++------------------------------------ src/ClientWrite.cxx | 91 +---------------------------------------------- src/Listen.cxx | 2 +- 10 files changed, 66 insertions(+), 304 deletions(-) (limited to 'src') diff --git a/src/Client.hxx b/src/Client.hxx index 3ce323d80..bf2a2521f 100644 --- a/src/Client.hxx +++ b/src/Client.hxx @@ -22,11 +22,11 @@ #include "gcc.h" -#include #include #include struct sockaddr; +class EventLoop; struct Partition; class Client; @@ -34,7 +34,7 @@ void client_manager_init(void); void client_manager_deinit(void); void -client_new(Partition &partition, +client_new(EventLoop &loop, Partition &partition, int fd, const struct sockaddr *sa, size_t sa_length, int uid); /** diff --git a/src/ClientEvent.cxx b/src/ClientEvent.cxx index 201709c76..905cf0c0a 100644 --- a/src/ClientEvent.cxx +++ b/src/ClientEvent.cxx @@ -19,92 +19,18 @@ #include "config.h" #include "ClientInternal.hxx" -#include "Main.hxx" -#include "event/Loop.hxx" -#include - -static gboolean -client_out_event(G_GNUC_UNUSED GIOChannel *source, GIOCondition condition, - gpointer data) +void +Client::OnSocketError(GError *error) { - Client *client = (Client *)data; - - assert(!client->IsExpired()); - - if (condition != G_IO_OUT) { - client->SetExpired(); - return false; - } - - client_write_deferred(client); + g_warning("error on client %d: %s", num, error->message); + g_error_free(error); - if (client->IsExpired()) { - client->Close(); - return false; - } - - g_timer_start(client->last_activity); - - if (client->output_buffer.IsEmpty()) { - /* done sending deferred buffers exist: schedule - read */ - client->source_id = g_io_add_watch(client->channel, - GIOCondition(G_IO_IN|G_IO_ERR|G_IO_HUP), - client_in_event, client); - return false; - } - - /* write more */ - return true; + SetExpired(); } -gboolean -client_in_event(G_GNUC_UNUSED GIOChannel *source, GIOCondition condition, - gpointer data) +void +Client::OnSocketClosed() { - Client *client = (Client *)data; - enum command_return ret; - - assert(!client->IsExpired()); - - if (condition != G_IO_IN) { - client->SetExpired(); - return false; - } - - g_timer_start(client->last_activity); - - ret = client_read(client); - switch (ret) { - case COMMAND_RETURN_OK: - case COMMAND_RETURN_IDLE: - case COMMAND_RETURN_ERROR: - break; - - case COMMAND_RETURN_KILL: - client->Close(); - main_loop->Break(); - return false; - - case COMMAND_RETURN_CLOSE: - client->Close(); - return false; - } - - if (client->IsExpired()) { - client->Close(); - return false; - } - - if (!client->output_buffer.IsEmpty()) { - /* deferred buffers exist: schedule write */ - client->source_id = g_io_add_watch(client->channel, - GIOCondition(G_IO_OUT|G_IO_ERR|G_IO_HUP), - client_out_event, client); - return false; - } - - /* read more */ - return true; + SetExpired(); } diff --git a/src/ClientExpire.cxx b/src/ClientExpire.cxx index f599e472d..56b003df8 100644 --- a/src/ClientExpire.cxx +++ b/src/ClientExpire.cxx @@ -26,18 +26,11 @@ static guint expire_source_id; void Client::SetExpired() { - if (!IsExpired()) - client_schedule_expire(); + if (IsExpired()) + return; - if (source_id != 0) { - g_source_remove(source_id); - source_id = 0; - } - - if (channel != NULL) { - g_io_channel_unref(channel); - channel = nullptr; - } + client_schedule_expire(); + BufferedSocket::Close(); } static void diff --git a/src/ClientIdle.cxx b/src/ClientIdle.cxx index 98778f52c..f9818b278 100644 --- a/src/ClientIdle.cxx +++ b/src/ClientIdle.cxx @@ -60,10 +60,8 @@ client_idle_add(Client *client, unsigned flags) client->idle_flags |= flags; if (client->idle_waiting - && (client->idle_flags & client->idle_subscriptions)) { + && (client->idle_flags & client->idle_subscriptions)) client_idle_notify(client); - client_write_output(client); - } } static void diff --git a/src/ClientInternal.hxx b/src/ClientInternal.hxx index 009568ed4..dee5d9c71 100644 --- a/src/ClientInternal.hxx +++ b/src/ClientInternal.hxx @@ -24,8 +24,8 @@ #include "Client.hxx" #include "ClientMessage.hxx" #include "CommandListBuilder.hxx" +#include "event/BufferedSocket.hxx" #include "command.h" -#include "util/PeakBuffer.hxx" #include #include @@ -42,20 +42,13 @@ enum { }; struct Partition; -class PeakBuffer; -class Client { +class Client final : private BufferedSocket { public: Partition &partition; struct playlist &playlist; struct player_control *player_control; - GIOChannel *channel; - guint source_id; - - /** the buffer for reading lines from the #channel */ - struct fifo_buffer *input; - unsigned permission; /** the uid of the client process, or -1 if unknown */ @@ -70,8 +63,6 @@ public: unsigned int num; /* client number */ - PeakBuffer output_buffer; - /** is this client waiting for an "idle" response? */ bool idle_waiting; @@ -98,23 +89,35 @@ public: */ std::list messages; - Client(Partition &partition, + Client(EventLoop &loop, Partition &partition, int fd, int uid, int num); ~Client(); + bool IsConnected() const { + return BufferedSocket::IsDefined(); + } + gcc_pure bool IsSubscribed(const char *channel_name) const { return subscriptions.find(channel_name) != subscriptions.end(); } - gcc_pure bool IsExpired() const { - return channel == nullptr; + return !BufferedSocket::IsDefined(); } void Close(); void SetExpired(); + + using BufferedSocket::Write; + +private: + /* virtual methods from class BufferedSocket */ + virtual InputResult OnSocketInput(const void *data, + size_t length) override; + virtual void OnSocketError(GError *error) override; + virtual void OnSocketClosed() override; }; extern unsigned int client_max_connections; @@ -141,9 +144,6 @@ client_read(Client *client); enum command_return client_process_line(Client *client, char *line); -void -client_write_deferred(Client *client); - void client_write_output(Client *client); diff --git a/src/ClientNew.cxx b/src/ClientNew.cxx index 144c339ab..42cc3470d 100644 --- a/src/ClientNew.cxx +++ b/src/ClientNew.cxx @@ -22,7 +22,6 @@ #include "ClientList.hxx" #include "Partition.hxx" #include "fd_util.h" -#include "util/fifo_buffer.h" extern "C" { #include "resolver.h" } @@ -47,45 +46,27 @@ extern "C" { static const char GREETING[] = "OK MPD " PROTOCOL_VERSION "\n"; -Client::Client(Partition &_partition, - int fd, int _uid, int _num) - :partition(_partition), +Client::Client(EventLoop &_loop, Partition &_partition, + int _fd, int _uid, int _num) + :BufferedSocket(_fd, _loop, 16384, client_max_output_buffer_size), + partition(_partition), playlist(partition.playlist), player_control(&partition.pc), - input(fifo_buffer_new(4096)), permission(getDefaultPermissions()), uid(_uid), last_activity(g_timer_new()), num(_num), - output_buffer(16384, client_max_output_buffer_size), idle_waiting(false), idle_flags(0), num_subscriptions(0) { - assert(fd >= 0); - - channel = g_io_channel_new_socket(fd); - /* GLib is responsible for closing the file descriptor */ - g_io_channel_set_close_on_unref(channel, true); - /* NULL encoding means the stream is binary safe; the MPD - protocol is UTF-8 only, but we are doing this call anyway - to prevent GLib from messing around with the stream */ - g_io_channel_set_encoding(channel, NULL, NULL); - /* we prefer to do buffering */ - g_io_channel_set_buffered(channel, false); - - source_id = g_io_add_watch(channel, - GIOCondition(G_IO_IN|G_IO_ERR|G_IO_HUP), - client_in_event, this); } Client::~Client() { g_timer_destroy(last_activity); - - fifo_buffer_free(input); } void -client_new(Partition &partition, +client_new(EventLoop &loop, Partition &partition, int fd, const struct sockaddr *sa, size_t sa_length, int uid) { static unsigned int next_client_num; @@ -124,7 +105,7 @@ client_new(Partition &partition, return; } - Client *client = new Client(partition, fd, uid, + Client *client = new Client(loop, partition, fd, uid, next_client_num++); (void)send(fd, GREETING, sizeof(GREETING) - 1, 0); diff --git a/src/ClientProcess.cxx b/src/ClientProcess.cxx index e1e8395e7..bcd20d1b7 100644 --- a/src/ClientProcess.cxx +++ b/src/ClientProcess.cxx @@ -61,7 +61,6 @@ client_process_line(Client *client, char *line) /* send empty idle response and leave idle mode */ client->idle_waiting = false; command_success(client); - client_write_output(client); } /* do nothing if the client wasn't idling: the client @@ -97,7 +96,6 @@ client_process_line(Client *client, char *line) if (ret == COMMAND_RETURN_OK) command_success(client); - client_write_output(client); client->cmd_list.Reset(); } else { if (!client->cmd_list.Add(line)) { @@ -130,8 +128,6 @@ client_process_line(Client *client, char *line) if (ret == COMMAND_RETURN_OK) command_success(client); - - client_write_output(client); } } diff --git a/src/ClientRead.cxx b/src/ClientRead.cxx index e58c73dea..363363d1f 100644 --- a/src/ClientRead.cxx +++ b/src/ClientRead.cxx @@ -19,91 +19,48 @@ #include "config.h" #include "ClientInternal.hxx" -#include "util/fifo_buffer.h" +#include "Main.hxx" +#include "event/Loop.hxx" #include #include -static char * -client_read_line(Client *client) +BufferedSocket::InputResult +Client::OnSocketInput(const void *data, size_t length) { - size_t length; - const char *p = (const char *)fifo_buffer_read(client->input, &length); - if (p == NULL) - return NULL; + g_timer_start(last_activity); + const char *p = (const char *)data; const char *newline = (const char *)memchr(p, '\n', length); if (newline == NULL) - return NULL; + return InputResult::MORE; char *line = g_strndup(p, newline - p); - fifo_buffer_consume(client->input, newline - p + 1); + BufferedSocket::ConsumeInput(newline + 1 - p); - return g_strchomp(line); -} - -static enum command_return -client_input_received(Client *client, size_t bytesRead) -{ - char *line; - - fifo_buffer_append(client->input, bytesRead); - - /* process all lines */ - - while ((line = client_read_line(client)) != NULL) { - enum command_return ret = client_process_line(client, line); - g_free(line); - - if (ret == COMMAND_RETURN_KILL || - ret == COMMAND_RETURN_CLOSE) - return ret; - if (client->IsExpired()) - return COMMAND_RETURN_CLOSE; - } - - return COMMAND_RETURN_OK; -} + enum command_return result = client_process_line(this, line); + g_free(line); -enum command_return -client_read(Client *client) -{ - GError *error = NULL; - GIOStatus status; - gsize bytes_read; + switch (result) { + case COMMAND_RETURN_OK: + case COMMAND_RETURN_IDLE: + case COMMAND_RETURN_ERROR: + break; - assert(client != NULL); - assert(client->channel != NULL); + case COMMAND_RETURN_KILL: + Close(); + main_loop->Break(); + return InputResult::CLOSED; - size_t max_length; - char *p = (char *)fifo_buffer_write(client->input, &max_length); - if (p == NULL) { - g_warning("[%u] buffer overflow", client->num); - return COMMAND_RETURN_CLOSE; + case COMMAND_RETURN_CLOSE: + Close(); + return InputResult::CLOSED; } - status = g_io_channel_read_chars(client->channel, p, max_length, - &bytes_read, &error); - switch (status) { - case G_IO_STATUS_NORMAL: - return client_input_received(client, bytes_read); - - case G_IO_STATUS_AGAIN: - /* try again later, after select() */ - return COMMAND_RETURN_OK; - - case G_IO_STATUS_EOF: - /* peer disconnected */ - return COMMAND_RETURN_CLOSE; - - case G_IO_STATUS_ERROR: - /* I/O error */ - g_warning("failed to read from client %d: %s", - client->num, error->message); - g_error_free(error); - return COMMAND_RETURN_CLOSE; + if (IsExpired()) { + Close(); + return InputResult::CLOSED; } - /* unreachable */ - return COMMAND_RETURN_CLOSE; + return InputResult::AGAIN; } diff --git a/src/ClientWrite.cxx b/src/ClientWrite.cxx index 86abc152c..23b515a3d 100644 --- a/src/ClientWrite.cxx +++ b/src/ClientWrite.cxx @@ -20,98 +20,9 @@ #include "config.h" #include "ClientInternal.hxx" -#include #include #include -static size_t -client_write_direct(Client *client, const void *data, size_t length) -{ - assert(client != NULL); - assert(client->channel != NULL); - assert(data != NULL); - assert(length > 0); - - gsize bytes_written; - GError *error = NULL; - GIOStatus status = - g_io_channel_write_chars(client->channel, (const gchar *)data, - length, &bytes_written, &error); - switch (status) { - case G_IO_STATUS_NORMAL: - return bytes_written; - - case G_IO_STATUS_AGAIN: - return 0; - - case G_IO_STATUS_EOF: - /* client has disconnected */ - - client->SetExpired(); - return 0; - - case G_IO_STATUS_ERROR: - /* I/O error */ - - client->SetExpired(); - g_warning("failed to write to %i: %s", - client->num, error->message); - g_error_free(error); - return 0; - } - - /* unreachable */ - assert(false); - return 0; -} - -void -client_write_deferred(Client *client) -{ - assert(!client_is_expired(client)); - - while (true) { - size_t length; - const void *data = client->output_buffer.Read(&length); - if (data == nullptr) - break; - - size_t nbytes = client_write_direct(client, data, length); - if (nbytes == 0) - return; - - client->output_buffer.Consume(nbytes); - - if (nbytes < length) - return; - - g_timer_start(client->last_activity); - } -} - -static void -client_defer_output(Client *client, const void *data, size_t length) -{ - if (!client->output_buffer.Append(data, length)) { - g_warning("[%u] output buffer size is " - "larger than the max (%lu)", - client->num, - (unsigned long)client_max_output_buffer_size); - /* cause client to close */ - client->SetExpired(); - return; - } -} - -void -client_write_output(Client *client) -{ - if (client->IsExpired()) - return; - - client_write_deferred(client); -} - /** * Write a block of data to the client. */ @@ -122,7 +33,7 @@ client_write(Client *client, const char *data, size_t length) if (client->IsExpired() || length == 0) return; - client_defer_output(client, data, length); + client->Write(data, length); } void diff --git a/src/Listen.cxx b/src/Listen.cxx index 1e6333504..157b7992c 100644 --- a/src/Listen.cxx +++ b/src/Listen.cxx @@ -46,7 +46,7 @@ static void listen_callback(int fd, const struct sockaddr *address, size_t address_length, int uid, G_GNUC_UNUSED void *ctx) { - client_new(*global_partition, + client_new(*main_loop, *global_partition, fd, address, address_length, uid); } -- cgit v1.2.3