aboutsummaryrefslogtreecommitdiffstats
path: root/src/tcp_socket.c
diff options
context:
space:
mode:
authorMax Kellermann <max@duempel.org>2011-08-30 08:13:28 +0200
committerMax Kellermann <max@duempel.org>2011-08-31 08:32:09 +0200
commit395191bd75541c1709ee1d1bad4af5140317ddea (patch)
tree7285f02a4a157a025646bae53ea613fc7616bd21 /src/tcp_socket.c
parentec7d8fb6bd7b9a83e354aa5cb1c812435d587fab (diff)
downloadmpd-395191bd75541c1709ee1d1bad4af5140317ddea.tar.gz
mpd-395191bd75541c1709ee1d1bad4af5140317ddea.tar.xz
mpd-395191bd75541c1709ee1d1bad4af5140317ddea.zip
rtsp_client: use the I/O thread
Make the code portable.
Diffstat (limited to '')
-rw-r--r--src/tcp_socket.c381
1 files changed, 381 insertions, 0 deletions
diff --git a/src/tcp_socket.c b/src/tcp_socket.c
new file mode 100644
index 000000000..f65b9c07c
--- /dev/null
+++ b/src/tcp_socket.c
@@ -0,0 +1,381 @@
+/*
+ * Copyright (C) 2003-2011 The Music Player Daemon Project
+ * http://www.musicpd.org
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include "tcp_socket.h"
+#include "fifo_buffer.h"
+#include "io_thread.h"
+
+#include <assert.h>
+#include <string.h>
+
+#ifdef WIN32
+#define WINVER 0x0501
+#include <ws2tcpip.h>
+#include <winsock.h>
+#else
+#include <sys/socket.h>
+#include <netinet/in.h>
+#endif
+
+struct tcp_socket {
+ const struct tcp_socket_handler *handler;
+ void *handler_ctx;
+
+ GMutex *mutex;
+
+ GIOChannel *channel;
+ GSource *in_source, *out_source;
+
+ struct fifo_buffer *input, *output;
+};
+
+static gboolean
+tcp_event(GIOChannel *source, GIOCondition condition, gpointer data);
+
+static void
+tcp_socket_schedule_read(struct tcp_socket *s)
+{
+ assert(s->input != NULL);
+ assert(!fifo_buffer_is_full(s->input));
+
+ if (s->in_source != NULL)
+ return;
+
+ s->in_source = g_io_create_watch(s->channel,
+ G_IO_IN|G_IO_ERR|G_IO_HUP);
+ g_source_set_callback(s->in_source, (GSourceFunc)tcp_event, s, NULL);
+ g_source_attach(s->in_source, io_thread_context());
+}
+
+static void
+tcp_socket_unschedule_read(struct tcp_socket *s)
+{
+ if (s->in_source == NULL)
+ return;
+
+ g_source_destroy(s->in_source);
+ g_source_unref(s->in_source);
+ s->in_source = NULL;
+}
+
+static void
+tcp_socket_schedule_write(struct tcp_socket *s)
+{
+ assert(s->output != NULL);
+ assert(!fifo_buffer_is_empty(s->output));
+
+ if (s->out_source != NULL)
+ return;
+
+ s->out_source = g_io_create_watch(s->channel, G_IO_OUT);
+ g_source_set_callback(s->out_source, (GSourceFunc)tcp_event, s, NULL);
+ g_source_attach(s->out_source, io_thread_context());
+}
+
+static void
+tcp_socket_unschedule_write(struct tcp_socket *s)
+{
+ if (s->out_source == NULL)
+ return;
+
+ g_source_destroy(s->out_source);
+ g_source_unref(s->out_source);
+ s->out_source = NULL;
+}
+
+/**
+ * Close the socket. Caller must lock the mutex.
+ */
+static void
+tcp_socket_close(struct tcp_socket *s)
+{
+ tcp_socket_unschedule_read(s);
+ tcp_socket_unschedule_write(s);
+
+ if (s->channel != NULL) {
+ g_io_channel_unref(s->channel);
+ s->channel = NULL;
+ }
+
+ if (s->input != NULL) {
+ fifo_buffer_free(s->input);
+ s->input = NULL;
+ }
+
+ if (s->output != NULL) {
+ fifo_buffer_free(s->output);
+ s->output = NULL;
+ }
+}
+
+static gpointer
+tcp_socket_close_callback(gpointer data)
+{
+ struct tcp_socket *s = data;
+
+ g_mutex_lock(s->mutex);
+ tcp_socket_close(s);
+ g_mutex_unlock(s->mutex);
+
+ return NULL;
+}
+
+static void
+tcp_socket_close_indirect(struct tcp_socket *s)
+{
+ io_thread_call(tcp_socket_close_callback, s);
+
+ assert(s->channel == NULL);
+ assert(s->in_source == NULL);
+ assert(s->out_source == NULL);
+}
+
+static void
+tcp_handle_input(struct tcp_socket *s)
+{
+ size_t length;
+ const void *p = fifo_buffer_read(s->input, &length);
+ if (p == NULL)
+ return;
+
+ g_mutex_unlock(s->mutex);
+ size_t consumed = s->handler->data(p, length, s->handler_ctx);
+ g_mutex_lock(s->mutex);
+ if (consumed > 0 && s->input != NULL)
+ fifo_buffer_consume(s->input, consumed);
+}
+
+static bool
+tcp_in_event(struct tcp_socket *s)
+{
+ assert(s != NULL);
+ assert(s->channel != NULL);
+
+ g_mutex_lock(s->mutex);
+
+ size_t max_length;
+ void *p = fifo_buffer_write(s->input, &max_length);
+ if (p == NULL) {
+ GError *error = g_error_new_literal(tcp_socket_quark(), 0,
+ "buffer overflow");
+ tcp_socket_close(s);
+ g_mutex_unlock(s->mutex);
+ s->handler->error(error, s->handler_ctx);
+ return false;
+ }
+
+ gsize bytes_read;
+ GError *error = NULL;
+ GIOStatus status = g_io_channel_read_chars(s->channel,
+ p, max_length,
+ &bytes_read, &error);
+ switch (status) {
+ case G_IO_STATUS_NORMAL:
+ fifo_buffer_append(s->input, bytes_read);
+ tcp_handle_input(s);
+ g_mutex_unlock(s->mutex);
+ return true;
+
+ case G_IO_STATUS_AGAIN:
+ /* try again later */
+ g_mutex_unlock(s->mutex);
+ return true;
+
+ case G_IO_STATUS_EOF:
+ /* peer disconnected */
+ tcp_socket_close(s);
+ g_mutex_unlock(s->mutex);
+ s->handler->disconnected(s->handler_ctx);
+ return false;
+
+ case G_IO_STATUS_ERROR:
+ /* I/O error */
+ tcp_socket_close(s);
+ g_mutex_unlock(s->mutex);
+ s->handler->error(error, s->handler_ctx);
+ return false;
+ }
+
+ /* unreachable */
+ assert(false);
+ return true;
+}
+
+static bool
+tcp_out_event(struct tcp_socket *s)
+{
+ assert(s != NULL);
+ assert(s->channel != NULL);
+
+ g_mutex_lock(s->mutex);
+
+ size_t length;
+ const void *p = fifo_buffer_read(s->output, &length);
+ if (p == NULL) {
+ /* no more data in the output buffer, remove the
+ output event */
+ tcp_socket_unschedule_write(s);
+ g_mutex_unlock(s->mutex);
+ return false;
+ }
+
+ gsize bytes_written;
+ GError *error = NULL;
+ GIOStatus status = g_io_channel_write_chars(s->channel, p, length,
+ &bytes_written, &error);
+ switch (status) {
+ case G_IO_STATUS_NORMAL:
+ fifo_buffer_consume(s->output, bytes_written);
+ g_mutex_unlock(s->mutex);
+ return true;
+
+ case G_IO_STATUS_AGAIN:
+ tcp_socket_schedule_write(s);
+ g_mutex_unlock(s->mutex);
+ return true;
+
+ case G_IO_STATUS_EOF:
+ /* peer disconnected */
+ tcp_socket_close(s);
+ g_mutex_unlock(s->mutex);
+ s->handler->disconnected(s->handler_ctx);
+ return false;
+
+ case G_IO_STATUS_ERROR:
+ /* I/O error */
+ tcp_socket_close(s);
+ g_mutex_unlock(s->mutex);
+ s->handler->error(error, s->handler_ctx);
+ return false;
+ }
+
+ /* unreachable */
+ g_mutex_unlock(s->mutex);
+ assert(false);
+ return true;
+}
+
+static gboolean
+tcp_event(G_GNUC_UNUSED GIOChannel *source, GIOCondition condition,
+ gpointer data)
+{
+ struct tcp_socket *s = data;
+
+ assert(source == s->channel);
+
+ switch (condition) {
+ case G_IO_IN:
+ case G_IO_PRI:
+ return tcp_in_event(s);
+
+ case G_IO_OUT:
+ return tcp_out_event(s);
+
+ case G_IO_ERR:
+ case G_IO_HUP:
+ case G_IO_NVAL:
+ tcp_socket_close(s);
+ s->handler->disconnected(s->handler_ctx);
+ return false;
+ }
+
+ /* unreachable */
+ assert(false);
+ return false;
+}
+
+struct tcp_socket *
+tcp_socket_new(int fd,
+ const struct tcp_socket_handler *handler, void *ctx)
+{
+ assert(fd >= 0);
+ assert(handler != NULL);
+ assert(handler->data != NULL);
+ assert(handler->error != NULL);
+ assert(handler->disconnected != NULL);
+
+ struct tcp_socket *s = g_new(struct tcp_socket, 1);
+ s->handler = handler;
+ s->handler_ctx = ctx;
+ s->mutex = g_mutex_new();
+
+ g_mutex_lock(s->mutex);
+
+#ifndef G_OS_WIN32
+ s->channel = g_io_channel_unix_new(fd);
+#else
+ s->channel = g_io_channel_win32_new_socket(fd);
+#endif
+ /* GLib is responsible for closing the file descriptor */
+ g_io_channel_set_close_on_unref(s->channel, true);
+ /* NULL encoding means the stream is binary safe */
+ g_io_channel_set_encoding(s->channel, NULL, NULL);
+ /* no buffering */
+ g_io_channel_set_buffered(s->channel, false);
+
+ s->input = fifo_buffer_new(4096);
+ s->output = fifo_buffer_new(4096);
+
+ s->in_source = NULL;
+ s->out_source = NULL;
+
+ tcp_socket_schedule_read(s);
+
+ g_mutex_unlock(s->mutex);
+
+ return s;
+}
+
+void
+tcp_socket_free(struct tcp_socket *s)
+{
+ tcp_socket_close_indirect(s);
+ g_mutex_free(s->mutex);
+ g_free(s);
+}
+
+bool
+tcp_socket_send(struct tcp_socket *s, const void *data, size_t length)
+{
+ assert(s != NULL);
+
+ g_mutex_lock(s->mutex);
+
+ if (s->output == NULL || s->channel == NULL) {
+ /* already disconnected */
+ g_mutex_unlock(s->mutex);
+ return false;
+ }
+
+ size_t max_length;
+ void *p = fifo_buffer_write(s->output, &max_length);
+ if (p == NULL || max_length < length) {
+ /* buffer is full */
+ g_mutex_unlock(s->mutex);
+ return false;
+ }
+
+ memcpy(p, data, length);
+ fifo_buffer_append(s->output, length);
+ tcp_socket_schedule_write(s);
+
+ g_mutex_unlock(s->mutex);
+ return true;
+}
+