aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMax Kellermann <max@duempel.org>2013-01-14 23:42:06 +0100
committerMax Kellermann <max@duempel.org>2013-01-15 12:15:33 +0100
commit39439b80f5d1574e67f337a047869cf067c396b7 (patch)
tree5e28e61d4895da86189d14cde9f1e238c18079a8
parent396480cf94fbeda581acb6a78c42c7ec610d04a4 (diff)
downloadmpd-39439b80f5d1574e67f337a047869cf067c396b7.tar.gz
mpd-39439b80f5d1574e67f337a047869cf067c396b7.tar.xz
mpd-39439b80f5d1574e67f337a047869cf067c396b7.zip
Client: rebase on the new BufferedSocket class
-rw-r--r--src/Client.hxx4
-rw-r--r--src/ClientEvent.cxx90
-rw-r--r--src/ClientExpire.cxx15
-rw-r--r--src/ClientIdle.cxx4
-rw-r--r--src/ClientInternal.hxx34
-rw-r--r--src/ClientNew.cxx31
-rw-r--r--src/ClientProcess.cxx4
-rw-r--r--src/ClientRead.cxx95
-rw-r--r--src/ClientWrite.cxx91
-rw-r--r--src/Listen.cxx2
10 files changed, 66 insertions, 304 deletions
diff --git a/src/Client.hxx b/src/Client.hxx
index 3ce323d80..bf2a2521f 100644
--- a/src/Client.hxx
+++ b/src/Client.hxx
@@ -22,11 +22,11 @@
#include "gcc.h"
-#include <stdbool.h>
#include <stddef.h>
#include <stdarg.h>
struct sockaddr;
+class EventLoop;
struct Partition;
class Client;
@@ -34,7 +34,7 @@ void client_manager_init(void);
void client_manager_deinit(void);
void
-client_new(Partition &partition,
+client_new(EventLoop &loop, Partition &partition,
int fd, const struct sockaddr *sa, size_t sa_length, int uid);
/**
diff --git a/src/ClientEvent.cxx b/src/ClientEvent.cxx
index 201709c76..905cf0c0a 100644
--- a/src/ClientEvent.cxx
+++ b/src/ClientEvent.cxx
@@ -19,92 +19,18 @@
#include "config.h"
#include "ClientInternal.hxx"
-#include "Main.hxx"
-#include "event/Loop.hxx"
-#include <assert.h>
-
-static gboolean
-client_out_event(G_GNUC_UNUSED GIOChannel *source, GIOCondition condition,
- gpointer data)
+void
+Client::OnSocketError(GError *error)
{
- Client *client = (Client *)data;
-
- assert(!client->IsExpired());
-
- if (condition != G_IO_OUT) {
- client->SetExpired();
- return false;
- }
-
- client_write_deferred(client);
+ g_warning("error on client %d: %s", num, error->message);
+ g_error_free(error);
- if (client->IsExpired()) {
- client->Close();
- return false;
- }
-
- g_timer_start(client->last_activity);
-
- if (client->output_buffer.IsEmpty()) {
- /* done sending deferred buffers exist: schedule
- read */
- client->source_id = g_io_add_watch(client->channel,
- GIOCondition(G_IO_IN|G_IO_ERR|G_IO_HUP),
- client_in_event, client);
- return false;
- }
-
- /* write more */
- return true;
+ SetExpired();
}
-gboolean
-client_in_event(G_GNUC_UNUSED GIOChannel *source, GIOCondition condition,
- gpointer data)
+void
+Client::OnSocketClosed()
{
- Client *client = (Client *)data;
- enum command_return ret;
-
- assert(!client->IsExpired());
-
- if (condition != G_IO_IN) {
- client->SetExpired();
- return false;
- }
-
- g_timer_start(client->last_activity);
-
- ret = client_read(client);
- switch (ret) {
- case COMMAND_RETURN_OK:
- case COMMAND_RETURN_IDLE:
- case COMMAND_RETURN_ERROR:
- break;
-
- case COMMAND_RETURN_KILL:
- client->Close();
- main_loop->Break();
- return false;
-
- case COMMAND_RETURN_CLOSE:
- client->Close();
- return false;
- }
-
- if (client->IsExpired()) {
- client->Close();
- return false;
- }
-
- if (!client->output_buffer.IsEmpty()) {
- /* deferred buffers exist: schedule write */
- client->source_id = g_io_add_watch(client->channel,
- GIOCondition(G_IO_OUT|G_IO_ERR|G_IO_HUP),
- client_out_event, client);
- return false;
- }
-
- /* read more */
- return true;
+ SetExpired();
}
diff --git a/src/ClientExpire.cxx b/src/ClientExpire.cxx
index f599e472d..56b003df8 100644
--- a/src/ClientExpire.cxx
+++ b/src/ClientExpire.cxx
@@ -26,18 +26,11 @@ static guint expire_source_id;
void
Client::SetExpired()
{
- if (!IsExpired())
- client_schedule_expire();
+ if (IsExpired())
+ return;
- if (source_id != 0) {
- g_source_remove(source_id);
- source_id = 0;
- }
-
- if (channel != NULL) {
- g_io_channel_unref(channel);
- channel = nullptr;
- }
+ client_schedule_expire();
+ BufferedSocket::Close();
}
static void
diff --git a/src/ClientIdle.cxx b/src/ClientIdle.cxx
index 98778f52c..f9818b278 100644
--- a/src/ClientIdle.cxx
+++ b/src/ClientIdle.cxx
@@ -60,10 +60,8 @@ client_idle_add(Client *client, unsigned flags)
client->idle_flags |= flags;
if (client->idle_waiting
- && (client->idle_flags & client->idle_subscriptions)) {
+ && (client->idle_flags & client->idle_subscriptions))
client_idle_notify(client);
- client_write_output(client);
- }
}
static void
diff --git a/src/ClientInternal.hxx b/src/ClientInternal.hxx
index 009568ed4..dee5d9c71 100644
--- a/src/ClientInternal.hxx
+++ b/src/ClientInternal.hxx
@@ -24,8 +24,8 @@
#include "Client.hxx"
#include "ClientMessage.hxx"
#include "CommandListBuilder.hxx"
+#include "event/BufferedSocket.hxx"
#include "command.h"
-#include "util/PeakBuffer.hxx"
#include <set>
#include <string>
@@ -42,20 +42,13 @@ enum {
};
struct Partition;
-class PeakBuffer;
-class Client {
+class Client final : private BufferedSocket {
public:
Partition &partition;
struct playlist &playlist;
struct player_control *player_control;
- GIOChannel *channel;
- guint source_id;
-
- /** the buffer for reading lines from the #channel */
- struct fifo_buffer *input;
-
unsigned permission;
/** the uid of the client process, or -1 if unknown */
@@ -70,8 +63,6 @@ public:
unsigned int num; /* client number */
- PeakBuffer output_buffer;
-
/** is this client waiting for an "idle" response? */
bool idle_waiting;
@@ -98,23 +89,35 @@ public:
*/
std::list<ClientMessage> messages;
- Client(Partition &partition,
+ Client(EventLoop &loop, Partition &partition,
int fd, int uid, int num);
~Client();
+ bool IsConnected() const {
+ return BufferedSocket::IsDefined();
+ }
+
gcc_pure
bool IsSubscribed(const char *channel_name) const {
return subscriptions.find(channel_name) != subscriptions.end();
}
-
gcc_pure
bool IsExpired() const {
- return channel == nullptr;
+ return !BufferedSocket::IsDefined();
}
void Close();
void SetExpired();
+
+ using BufferedSocket::Write;
+
+private:
+ /* virtual methods from class BufferedSocket */
+ virtual InputResult OnSocketInput(const void *data,
+ size_t length) override;
+ virtual void OnSocketError(GError *error) override;
+ virtual void OnSocketClosed() override;
};
extern unsigned int client_max_connections;
@@ -142,9 +145,6 @@ enum command_return
client_process_line(Client *client, char *line);
void
-client_write_deferred(Client *client);
-
-void
client_write_output(Client *client);
gboolean
diff --git a/src/ClientNew.cxx b/src/ClientNew.cxx
index 144c339ab..42cc3470d 100644
--- a/src/ClientNew.cxx
+++ b/src/ClientNew.cxx
@@ -22,7 +22,6 @@
#include "ClientList.hxx"
#include "Partition.hxx"
#include "fd_util.h"
-#include "util/fifo_buffer.h"
extern "C" {
#include "resolver.h"
}
@@ -47,45 +46,27 @@ extern "C" {
static const char GREETING[] = "OK MPD " PROTOCOL_VERSION "\n";
-Client::Client(Partition &_partition,
- int fd, int _uid, int _num)
- :partition(_partition),
+Client::Client(EventLoop &_loop, Partition &_partition,
+ int _fd, int _uid, int _num)
+ :BufferedSocket(_fd, _loop, 16384, client_max_output_buffer_size),
+ partition(_partition),
playlist(partition.playlist), player_control(&partition.pc),
- input(fifo_buffer_new(4096)),
permission(getDefaultPermissions()),
uid(_uid),
last_activity(g_timer_new()),
num(_num),
- output_buffer(16384, client_max_output_buffer_size),
idle_waiting(false), idle_flags(0),
num_subscriptions(0)
{
- assert(fd >= 0);
-
- channel = g_io_channel_new_socket(fd);
- /* GLib is responsible for closing the file descriptor */
- g_io_channel_set_close_on_unref(channel, true);
- /* NULL encoding means the stream is binary safe; the MPD
- protocol is UTF-8 only, but we are doing this call anyway
- to prevent GLib from messing around with the stream */
- g_io_channel_set_encoding(channel, NULL, NULL);
- /* we prefer to do buffering */
- g_io_channel_set_buffered(channel, false);
-
- source_id = g_io_add_watch(channel,
- GIOCondition(G_IO_IN|G_IO_ERR|G_IO_HUP),
- client_in_event, this);
}
Client::~Client()
{
g_timer_destroy(last_activity);
-
- fifo_buffer_free(input);
}
void
-client_new(Partition &partition,
+client_new(EventLoop &loop, Partition &partition,
int fd, const struct sockaddr *sa, size_t sa_length, int uid)
{
static unsigned int next_client_num;
@@ -124,7 +105,7 @@ client_new(Partition &partition,
return;
}
- Client *client = new Client(partition, fd, uid,
+ Client *client = new Client(loop, partition, fd, uid,
next_client_num++);
(void)send(fd, GREETING, sizeof(GREETING) - 1, 0);
diff --git a/src/ClientProcess.cxx b/src/ClientProcess.cxx
index e1e8395e7..bcd20d1b7 100644
--- a/src/ClientProcess.cxx
+++ b/src/ClientProcess.cxx
@@ -61,7 +61,6 @@ client_process_line(Client *client, char *line)
/* send empty idle response and leave idle mode */
client->idle_waiting = false;
command_success(client);
- client_write_output(client);
}
/* do nothing if the client wasn't idling: the client
@@ -97,7 +96,6 @@ client_process_line(Client *client, char *line)
if (ret == COMMAND_RETURN_OK)
command_success(client);
- client_write_output(client);
client->cmd_list.Reset();
} else {
if (!client->cmd_list.Add(line)) {
@@ -130,8 +128,6 @@ client_process_line(Client *client, char *line)
if (ret == COMMAND_RETURN_OK)
command_success(client);
-
- client_write_output(client);
}
}
diff --git a/src/ClientRead.cxx b/src/ClientRead.cxx
index e58c73dea..363363d1f 100644
--- a/src/ClientRead.cxx
+++ b/src/ClientRead.cxx
@@ -19,91 +19,48 @@
#include "config.h"
#include "ClientInternal.hxx"
-#include "util/fifo_buffer.h"
+#include "Main.hxx"
+#include "event/Loop.hxx"
#include <assert.h>
#include <string.h>
-static char *
-client_read_line(Client *client)
+BufferedSocket::InputResult
+Client::OnSocketInput(const void *data, size_t length)
{
- size_t length;
- const char *p = (const char *)fifo_buffer_read(client->input, &length);
- if (p == NULL)
- return NULL;
+ g_timer_start(last_activity);
+ const char *p = (const char *)data;
const char *newline = (const char *)memchr(p, '\n', length);
if (newline == NULL)
- return NULL;
+ return InputResult::MORE;
char *line = g_strndup(p, newline - p);
- fifo_buffer_consume(client->input, newline - p + 1);
+ BufferedSocket::ConsumeInput(newline + 1 - p);
- return g_strchomp(line);
-}
-
-static enum command_return
-client_input_received(Client *client, size_t bytesRead)
-{
- char *line;
-
- fifo_buffer_append(client->input, bytesRead);
-
- /* process all lines */
-
- while ((line = client_read_line(client)) != NULL) {
- enum command_return ret = client_process_line(client, line);
- g_free(line);
-
- if (ret == COMMAND_RETURN_KILL ||
- ret == COMMAND_RETURN_CLOSE)
- return ret;
- if (client->IsExpired())
- return COMMAND_RETURN_CLOSE;
- }
-
- return COMMAND_RETURN_OK;
-}
+ enum command_return result = client_process_line(this, line);
+ g_free(line);
-enum command_return
-client_read(Client *client)
-{
- GError *error = NULL;
- GIOStatus status;
- gsize bytes_read;
+ switch (result) {
+ case COMMAND_RETURN_OK:
+ case COMMAND_RETURN_IDLE:
+ case COMMAND_RETURN_ERROR:
+ break;
- assert(client != NULL);
- assert(client->channel != NULL);
+ case COMMAND_RETURN_KILL:
+ Close();
+ main_loop->Break();
+ return InputResult::CLOSED;
- size_t max_length;
- char *p = (char *)fifo_buffer_write(client->input, &max_length);
- if (p == NULL) {
- g_warning("[%u] buffer overflow", client->num);
- return COMMAND_RETURN_CLOSE;
+ case COMMAND_RETURN_CLOSE:
+ Close();
+ return InputResult::CLOSED;
}
- status = g_io_channel_read_chars(client->channel, p, max_length,
- &bytes_read, &error);
- switch (status) {
- case G_IO_STATUS_NORMAL:
- return client_input_received(client, bytes_read);
-
- case G_IO_STATUS_AGAIN:
- /* try again later, after select() */
- return COMMAND_RETURN_OK;
-
- case G_IO_STATUS_EOF:
- /* peer disconnected */
- return COMMAND_RETURN_CLOSE;
-
- case G_IO_STATUS_ERROR:
- /* I/O error */
- g_warning("failed to read from client %d: %s",
- client->num, error->message);
- g_error_free(error);
- return COMMAND_RETURN_CLOSE;
+ if (IsExpired()) {
+ Close();
+ return InputResult::CLOSED;
}
- /* unreachable */
- return COMMAND_RETURN_CLOSE;
+ return InputResult::AGAIN;
}
diff --git a/src/ClientWrite.cxx b/src/ClientWrite.cxx
index 86abc152c..23b515a3d 100644
--- a/src/ClientWrite.cxx
+++ b/src/ClientWrite.cxx
@@ -20,98 +20,9 @@
#include "config.h"
#include "ClientInternal.hxx"
-#include <assert.h>
#include <string.h>
#include <stdio.h>
-static size_t
-client_write_direct(Client *client, const void *data, size_t length)
-{
- assert(client != NULL);
- assert(client->channel != NULL);
- assert(data != NULL);
- assert(length > 0);
-
- gsize bytes_written;
- GError *error = NULL;
- GIOStatus status =
- g_io_channel_write_chars(client->channel, (const gchar *)data,
- length, &bytes_written, &error);
- switch (status) {
- case G_IO_STATUS_NORMAL:
- return bytes_written;
-
- case G_IO_STATUS_AGAIN:
- return 0;
-
- case G_IO_STATUS_EOF:
- /* client has disconnected */
-
- client->SetExpired();
- return 0;
-
- case G_IO_STATUS_ERROR:
- /* I/O error */
-
- client->SetExpired();
- g_warning("failed to write to %i: %s",
- client->num, error->message);
- g_error_free(error);
- return 0;
- }
-
- /* unreachable */
- assert(false);
- return 0;
-}
-
-void
-client_write_deferred(Client *client)
-{
- assert(!client_is_expired(client));
-
- while (true) {
- size_t length;
- const void *data = client->output_buffer.Read(&length);
- if (data == nullptr)
- break;
-
- size_t nbytes = client_write_direct(client, data, length);
- if (nbytes == 0)
- return;
-
- client->output_buffer.Consume(nbytes);
-
- if (nbytes < length)
- return;
-
- g_timer_start(client->last_activity);
- }
-}
-
-static void
-client_defer_output(Client *client, const void *data, size_t length)
-{
- if (!client->output_buffer.Append(data, length)) {
- g_warning("[%u] output buffer size is "
- "larger than the max (%lu)",
- client->num,
- (unsigned long)client_max_output_buffer_size);
- /* cause client to close */
- client->SetExpired();
- return;
- }
-}
-
-void
-client_write_output(Client *client)
-{
- if (client->IsExpired())
- return;
-
- client_write_deferred(client);
-}
-
/**
* Write a block of data to the client.
*/
@@ -122,7 +33,7 @@ client_write(Client *client, const char *data, size_t length)
if (client->IsExpired() || length == 0)
return;
- client_defer_output(client, data, length);
+ client->Write(data, length);
}
void
diff --git a/src/Listen.cxx b/src/Listen.cxx
index 1e6333504..157b7992c 100644
--- a/src/Listen.cxx
+++ b/src/Listen.cxx
@@ -46,7 +46,7 @@ static void
listen_callback(int fd, const struct sockaddr *address,
size_t address_length, int uid, G_GNUC_UNUSED void *ctx)
{
- client_new(*global_partition,
+ client_new(*main_loop, *global_partition,
fd, address, address_length, uid);
}