aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMax Kellermann <max@duempel.org>2011-08-30 08:13:28 +0200
committerMax Kellermann <max@duempel.org>2011-08-31 08:32:09 +0200
commit395191bd75541c1709ee1d1bad4af5140317ddea (patch)
tree7285f02a4a157a025646bae53ea613fc7616bd21 /src
parentec7d8fb6bd7b9a83e354aa5cb1c812435d587fab (diff)
downloadmpd-395191bd75541c1709ee1d1bad4af5140317ddea.tar.gz
mpd-395191bd75541c1709ee1d1bad4af5140317ddea.tar.xz
mpd-395191bd75541c1709ee1d1bad4af5140317ddea.zip
rtsp_client: use the I/O thread
Make the code portable.
Diffstat (limited to '')
-rw-r--r--src/rtsp_client.c221
-rw-r--r--src/rtsp_client.h8
-rw-r--r--src/tcp_socket.c381
-rw-r--r--src/tcp_socket.h61
4 files changed, 615 insertions, 56 deletions
diff --git a/src/rtsp_client.c b/src/rtsp_client.c
index 093b44710..d05efa619 100644
--- a/src/rtsp_client.c
+++ b/src/rtsp_client.c
@@ -22,13 +22,16 @@
*/
#include "rtsp_client.h"
+#include "tcp_socket.h"
#include "glib_compat.h"
+#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <stdlib.h>
+#include <sys/time.h>
#ifdef WIN32
#define WINVER 0x0501
@@ -37,8 +40,6 @@
#else
#include <arpa/inet.h>
#include <sys/socket.h>
-#include <sys/select.h>
-#include <sys/poll.h>
#include <netdb.h>
#endif
@@ -78,6 +79,9 @@ rtspcl_open(void)
{
struct rtspcl_data *rtspcld;
rtspcld = g_new0(struct rtspcl_data, 1);
+ rtspcld->mutex = g_mutex_new();
+ rtspcld->cond = g_cond_new();
+ rtspcld->received_lines = g_queue_new();
rtspcld->useragent = "RTSPClient";
return rtspcld;
}
@@ -223,33 +227,141 @@ get_tcp_connect_by_host(int sd, const char *host, short destport,
get_tcp_connect(sd, addr, error_r);
}
+static void
+rtsp_client_flush_received(struct rtspcl_data *rtspcld)
+{
+ char *line;
+ while ((line = g_queue_pop_head(rtspcld->received_lines)) != NULL)
+ g_free(line);
+}
+
+static size_t
+rtsp_client_socket_data(const void *_data, size_t length, void *ctx)
+{
+ struct rtspcl_data *rtspcld = ctx;
+
+ g_mutex_lock(rtspcld->mutex);
+
+ if (rtspcld->tcp_socket == NULL) {
+ g_mutex_unlock(rtspcld->mutex);
+ return 0;
+ }
+
+ const bool was_empty = g_queue_is_empty(rtspcld->received_lines);
+ bool added = false;
+ const char *data = _data, *end = data + length, *p = data, *eol;
+ while ((eol = memchr(p, '\n', end - p)) != NULL) {
+ const char *next = eol + 1;
+
+ if (rtspcld->received_lines->length < 64) {
+ if (eol > p && eol[-1] == '\r')
+ --eol;
+
+ g_queue_push_tail(rtspcld->received_lines,
+ g_strndup(p, eol - p));
+ added = true;
+ }
+
+ p = next;
+ }
+
+ if (was_empty && added)
+ g_cond_broadcast(rtspcld->cond);
+
+ g_mutex_unlock(rtspcld->mutex);
+
+ return p - data;
+}
+
+static void
+rtsp_client_socket_error(GError *error, void *ctx)
+{
+ struct rtspcl_data *rtspcld = ctx;
+
+ g_warning("%s", error->message);
+ g_error_free(error);
+
+ g_mutex_lock(rtspcld->mutex);
+
+ rtsp_client_flush_received(rtspcld);
+
+ struct tcp_socket *s = rtspcld->tcp_socket;
+ rtspcld->tcp_socket = NULL;
+
+ g_cond_broadcast(rtspcld->cond);
+
+ g_mutex_unlock(rtspcld->mutex);
+
+ if (s != NULL)
+ tcp_socket_free(s);
+}
+
+static void
+rtsp_client_socket_disconnected(void *ctx)
+{
+ struct rtspcl_data *rtspcld = ctx;
+
+ g_mutex_lock(rtspcld->mutex);
+
+ rtsp_client_flush_received(rtspcld);
+
+ struct tcp_socket *s = rtspcld->tcp_socket;
+ rtspcld->tcp_socket = NULL;
+
+ g_cond_broadcast(rtspcld->cond);
+
+ g_mutex_unlock(rtspcld->mutex);
+
+ if (s != NULL)
+ tcp_socket_free(s);
+}
+
+static const struct tcp_socket_handler rtsp_client_socket_handler = {
+ .data = rtsp_client_socket_data,
+ .error = rtsp_client_socket_error,
+ .disconnected = rtsp_client_socket_disconnected,
+};
+
bool
rtspcl_connect(struct rtspcl_data *rtspcld, const char *host, short destport,
const char *sid, GError **error_r)
{
+ assert(rtspcld->tcp_socket == NULL);
+
unsigned short myport = 0;
struct sockaddr_in name;
socklen_t namelen = sizeof(name);
- if ((rtspcld->fd = open_tcp_socket(NULL, &myport, error_r)) == -1)
+ int fd = open_tcp_socket(NULL, &myport, error_r);
+ if (fd < 0)
return false;
- if (!get_tcp_connect_by_host(rtspcld->fd, host, destport, error_r))
+ if (!get_tcp_connect_by_host(fd, host, destport, error_r))
return false;
- getsockname(rtspcld->fd, (struct sockaddr*)&name, &namelen);
+ getsockname(fd, (struct sockaddr*)&name, &namelen);
memcpy(&rtspcld->local_addr, &name.sin_addr,sizeof(struct in_addr));
sprintf(rtspcld->url, "rtsp://%s/%s", inet_ntoa(name.sin_addr), sid);
- getpeername(rtspcld->fd, (struct sockaddr*)&name, &namelen);
+ getpeername(fd, (struct sockaddr*)&name, &namelen);
memcpy(&rtspcld->host_addr, &name.sin_addr, sizeof(struct in_addr));
+
+ rtspcld->tcp_socket = tcp_socket_new(fd, &rtsp_client_socket_handler,
+ rtspcld);
+
return true;
}
static void
rtspcl_disconnect(struct rtspcl_data *rtspcld)
{
- if (rtspcld->fd > 0) close(rtspcld->fd);
- rtspcld->fd = 0;
+ g_mutex_lock(rtspcld->mutex);
+ rtsp_client_flush_received(rtspcld);
+ g_mutex_unlock(rtspcld->mutex);
+
+ if (rtspcld->tcp_socket != NULL) {
+ tcp_socket_free(rtspcld->tcp_socket);
+ rtspcld->tcp_socket = NULL;
+ }
}
static void
@@ -263,8 +375,11 @@ void
rtspcl_close(struct rtspcl_data *rtspcld)
{
rtspcl_disconnect(rtspcld);
+ g_queue_free(rtspcld->received_lines);
rtspcl_remove_all_exthds(rtspcld);
g_free(rtspcld->session);
+ g_cond_free(rtspcld->cond);
+ g_mutex_free(rtspcld->mutex);
g_free(rtspcld);
}
@@ -294,40 +409,51 @@ rtspcl_add_exthds(struct rtspcl_data *rtspcld, const char *key, char *data)
* returned string in line is always null terminated, maxlen-1 is maximum string length
*/
static int
-read_line(int fd, char *line, int maxlen, int timeout, int no_poll)
+read_line(struct rtspcl_data *rtspcld, char *line, int maxlen,
+ int timeout)
{
- int i, rval;
- int count = 0;
- struct pollfd pfds;
- char ch;
- *line = 0;
- pfds.events = POLLIN;
- pfds.fd = fd;
- for (i = 0;i < maxlen; i++) {
- if (no_poll || poll(&pfds, 1, timeout))
- rval=read(fd,&ch,1);
- else return 0;
-
- if (rval == -1) {
- if (errno == EAGAIN) return 0;
- g_warning("%s:read error: %s\n", __func__, strerror(errno));
- return -1;
+ g_mutex_lock(rtspcld->mutex);
+
+ GTimeVal end_time;
+ if (timeout >= 0) {
+ g_get_current_time(&end_time);
+
+ end_time.tv_sec += timeout / 1000;
+ timeout %= 1000;
+ end_time.tv_usec = timeout * 1000;
+ if (end_time.tv_usec > 1000000) {
+ end_time.tv_usec -= 1000000;
+ ++end_time.tv_sec;
+ }
+ }
+
+ while (true) {
+ if (!g_queue_is_empty(rtspcld->received_lines)) {
+ /* success, copy to buffer */
+
+ char *p = g_queue_pop_head(rtspcld->received_lines);
+ g_mutex_unlock(rtspcld->mutex);
+
+ g_strlcpy(line, p, maxlen);
+ g_free(p);
+
+ return strlen(line);
}
- if (rval == 0) {
- g_debug("%s:disconnected on the other end\n", __func__);
+
+ if (rtspcld->tcp_socket == NULL) {
+ /* error */
+ g_mutex_unlock(rtspcld->mutex);
return -1;
}
- if(ch == '\n') {
- *line = 0;
- return count;
+
+ if (timeout < 0) {
+ g_cond_wait(rtspcld->cond, rtspcld->mutex);
+ } else if (!g_cond_timed_wait(rtspcld->cond, rtspcld->mutex,
+ &end_time)) {
+ g_mutex_unlock(rtspcld->mutex);
+ return 0;
}
- if (ch == '\r') continue;
- *line++ = ch;
- count++;
- if (count >= maxlen - 1) break;
}
- *line = 0;
- return count;
}
/*
@@ -346,13 +472,9 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd,
char reql[128];
const char delimiters[] = " ";
char *token, *dp;
- int dsize = 0,rval;
+ int dsize = 0;
int timeout = 5000; // msec unit
- fd_set rdfds;
- int fdmax = 0;
- struct timeval tout = {.tv_sec=10, .tv_usec=0};
-
if (!rtspcld) {
g_set_error_literal(error_r, rtsp_client_quark(), 0,
"not connected");
@@ -393,8 +515,7 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd,
if (content_type && content)
strncat(req, content, sizeof(req));
- rval = write(rtspcld->fd, req, strlen(req));
- if (rval < 0) {
+ if (!tcp_socket_send(rtspcld->tcp_socket, req, strlen(req))) {
g_set_error(error_r, rtsp_client_quark(), errno,
"write error: %s",
g_strerror(errno));
@@ -403,17 +524,7 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd,
if (!get_response) return true;
- while (true) {
- FD_ZERO(&rdfds);
- FD_SET(rtspcld->fd, &rdfds);
- fdmax = rtspcld->fd;
- select(fdmax + 1, &rdfds, NULL, NULL, &tout);
- if (FD_ISSET(rtspcld->fd, &rdfds)) {
- break;
- }
- }
-
- if (read_line(rtspcld->fd, line, sizeof(line), timeout, 0) <= 0) {
+ if (read_line(rtspcld, line, sizeof(line), timeout) <= 0) {
g_set_error_literal(error_r, rtsp_client_quark(), 0,
"request failed");
return false;
@@ -443,7 +554,7 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd,
struct key_data *cur_kd = *kd;
struct key_data *new_kd = NULL;
- while (read_line(rtspcld->fd, line, sizeof(line), timeout, 0) > 0) {
+ while (read_line(rtspcld, line, sizeof(line), timeout) > 0) {
timeout = 1000; // once it started, it shouldn't take a long time
if (new_kd != NULL && line[0] == ' ') {
const char *j = line;
diff --git a/src/rtsp_client.h b/src/rtsp_client.h
index 5b56af9da..5c8425248 100644
--- a/src/rtsp_client.h
+++ b/src/rtsp_client.h
@@ -42,7 +42,13 @@ struct key_data {
};
struct rtspcl_data {
- int fd;
+ GMutex *mutex;
+ GCond *cond;
+
+ GQueue *received_lines;
+
+ struct tcp_socket *tcp_socket;
+
char url[128];
int cseq;
struct key_data *exthds;
diff --git a/src/tcp_socket.c b/src/tcp_socket.c
new file mode 100644
index 000000000..f65b9c07c
--- /dev/null
+++ b/src/tcp_socket.c
@@ -0,0 +1,381 @@
+/*
+ * Copyright (C) 2003-2011 The Music Player Daemon Project
+ * http://www.musicpd.org
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include "tcp_socket.h"
+#include "fifo_buffer.h"
+#include "io_thread.h"
+
+#include <assert.h>
+#include <string.h>
+
+#ifdef WIN32
+#define WINVER 0x0501
+#include <ws2tcpip.h>
+#include <winsock.h>
+#else
+#include <sys/socket.h>
+#include <netinet/in.h>
+#endif
+
+struct tcp_socket {
+ const struct tcp_socket_handler *handler;
+ void *handler_ctx;
+
+ GMutex *mutex;
+
+ GIOChannel *channel;
+ GSource *in_source, *out_source;
+
+ struct fifo_buffer *input, *output;
+};
+
+static gboolean
+tcp_event(GIOChannel *source, GIOCondition condition, gpointer data);
+
+static void
+tcp_socket_schedule_read(struct tcp_socket *s)
+{
+ assert(s->input != NULL);
+ assert(!fifo_buffer_is_full(s->input));
+
+ if (s->in_source != NULL)
+ return;
+
+ s->in_source = g_io_create_watch(s->channel,
+ G_IO_IN|G_IO_ERR|G_IO_HUP);
+ g_source_set_callback(s->in_source, (GSourceFunc)tcp_event, s, NULL);
+ g_source_attach(s->in_source, io_thread_context());
+}
+
+static void
+tcp_socket_unschedule_read(struct tcp_socket *s)
+{
+ if (s->in_source == NULL)
+ return;
+
+ g_source_destroy(s->in_source);
+ g_source_unref(s->in_source);
+ s->in_source = NULL;
+}
+
+static void
+tcp_socket_schedule_write(struct tcp_socket *s)
+{
+ assert(s->output != NULL);
+ assert(!fifo_buffer_is_empty(s->output));
+
+ if (s->out_source != NULL)
+ return;
+
+ s->out_source = g_io_create_watch(s->channel, G_IO_OUT);
+ g_source_set_callback(s->out_source, (GSourceFunc)tcp_event, s, NULL);
+ g_source_attach(s->out_source, io_thread_context());
+}
+
+static void
+tcp_socket_unschedule_write(struct tcp_socket *s)
+{
+ if (s->out_source == NULL)
+ return;
+
+ g_source_destroy(s->out_source);
+ g_source_unref(s->out_source);
+ s->out_source = NULL;
+}
+
+/**
+ * Close the socket. Caller must lock the mutex.
+ */
+static void
+tcp_socket_close(struct tcp_socket *s)
+{
+ tcp_socket_unschedule_read(s);
+ tcp_socket_unschedule_write(s);
+
+ if (s->channel != NULL) {
+ g_io_channel_unref(s->channel);
+ s->channel = NULL;
+ }
+
+ if (s->input != NULL) {
+ fifo_buffer_free(s->input);
+ s->input = NULL;
+ }
+
+ if (s->output != NULL) {
+ fifo_buffer_free(s->output);
+ s->output = NULL;
+ }
+}
+
+static gpointer
+tcp_socket_close_callback(gpointer data)
+{
+ struct tcp_socket *s = data;
+
+ g_mutex_lock(s->mutex);
+ tcp_socket_close(s);
+ g_mutex_unlock(s->mutex);
+
+ return NULL;
+}
+
+static void
+tcp_socket_close_indirect(struct tcp_socket *s)
+{
+ io_thread_call(tcp_socket_close_callback, s);
+
+ assert(s->channel == NULL);
+ assert(s->in_source == NULL);
+ assert(s->out_source == NULL);
+}
+
+static void
+tcp_handle_input(struct tcp_socket *s)
+{
+ size_t length;
+ const void *p = fifo_buffer_read(s->input, &length);
+ if (p == NULL)
+ return;
+
+ g_mutex_unlock(s->mutex);
+ size_t consumed = s->handler->data(p, length, s->handler_ctx);
+ g_mutex_lock(s->mutex);
+ if (consumed > 0 && s->input != NULL)
+ fifo_buffer_consume(s->input, consumed);
+}
+
+static bool
+tcp_in_event(struct tcp_socket *s)
+{
+ assert(s != NULL);
+ assert(s->channel != NULL);
+
+ g_mutex_lock(s->mutex);
+
+ size_t max_length;
+ void *p = fifo_buffer_write(s->input, &max_length);
+ if (p == NULL) {
+ GError *error = g_error_new_literal(tcp_socket_quark(), 0,
+ "buffer overflow");
+ tcp_socket_close(s);
+ g_mutex_unlock(s->mutex);
+ s->handler->error(error, s->handler_ctx);
+ return false;
+ }
+
+ gsize bytes_read;
+ GError *error = NULL;
+ GIOStatus status = g_io_channel_read_chars(s->channel,
+ p, max_length,
+ &bytes_read, &error);
+ switch (status) {
+ case G_IO_STATUS_NORMAL:
+ fifo_buffer_append(s->input, bytes_read);
+ tcp_handle_input(s);
+ g_mutex_unlock(s->mutex);
+ return true;
+
+ case G_IO_STATUS_AGAIN:
+ /* try again later */
+ g_mutex_unlock(s->mutex);
+ return true;
+
+ case G_IO_STATUS_EOF:
+ /* peer disconnected */
+ tcp_socket_close(s);
+ g_mutex_unlock(s->mutex);
+ s->handler->disconnected(s->handler_ctx);
+ return false;
+
+ case G_IO_STATUS_ERROR:
+ /* I/O error */
+ tcp_socket_close(s);
+ g_mutex_unlock(s->mutex);
+ s->handler->error(error, s->handler_ctx);
+ return false;
+ }
+
+ /* unreachable */
+ assert(false);
+ return true;
+}
+
+static bool
+tcp_out_event(struct tcp_socket *s)
+{
+ assert(s != NULL);
+ assert(s->channel != NULL);
+
+ g_mutex_lock(s->mutex);
+
+ size_t length;
+ const void *p = fifo_buffer_read(s->output, &length);
+ if (p == NULL) {
+ /* no more data in the output buffer, remove the
+ output event */
+ tcp_socket_unschedule_write(s);
+ g_mutex_unlock(s->mutex);
+ return false;
+ }
+
+ gsize bytes_written;
+ GError *error = NULL;
+ GIOStatus status = g_io_channel_write_chars(s->channel, p, length,
+ &bytes_written, &error);
+ switch (status) {
+ case G_IO_STATUS_NORMAL:
+ fifo_buffer_consume(s->output, bytes_written);
+ g_mutex_unlock(s->mutex);
+ return true;
+
+ case G_IO_STATUS_AGAIN:
+ tcp_socket_schedule_write(s);
+ g_mutex_unlock(s->mutex);
+ return true;
+
+ case G_IO_STATUS_EOF:
+ /* peer disconnected */
+ tcp_socket_close(s);
+ g_mutex_unlock(s->mutex);
+ s->handler->disconnected(s->handler_ctx);
+ return false;
+
+ case G_IO_STATUS_ERROR:
+ /* I/O error */
+ tcp_socket_close(s);
+ g_mutex_unlock(s->mutex);
+ s->handler->error(error, s->handler_ctx);
+ return false;
+ }
+
+ /* unreachable */
+ g_mutex_unlock(s->mutex);
+ assert(false);
+ return true;
+}
+
+static gboolean
+tcp_event(G_GNUC_UNUSED GIOChannel *source, GIOCondition condition,
+ gpointer data)
+{
+ struct tcp_socket *s = data;
+
+ assert(source == s->channel);
+
+ switch (condition) {
+ case G_IO_IN:
+ case G_IO_PRI:
+ return tcp_in_event(s);
+
+ case G_IO_OUT:
+ return tcp_out_event(s);
+
+ case G_IO_ERR:
+ case G_IO_HUP:
+ case G_IO_NVAL:
+ tcp_socket_close(s);
+ s->handler->disconnected(s->handler_ctx);
+ return false;
+ }
+
+ /* unreachable */
+ assert(false);
+ return false;
+}
+
+struct tcp_socket *
+tcp_socket_new(int fd,
+ const struct tcp_socket_handler *handler, void *ctx)
+{
+ assert(fd >= 0);
+ assert(handler != NULL);
+ assert(handler->data != NULL);
+ assert(handler->error != NULL);
+ assert(handler->disconnected != NULL);
+
+ struct tcp_socket *s = g_new(struct tcp_socket, 1);
+ s->handler = handler;
+ s->handler_ctx = ctx;
+ s->mutex = g_mutex_new();
+
+ g_mutex_lock(s->mutex);
+
+#ifndef G_OS_WIN32
+ s->channel = g_io_channel_unix_new(fd);
+#else
+ s->channel = g_io_channel_win32_new_socket(fd);
+#endif
+ /* GLib is responsible for closing the file descriptor */
+ g_io_channel_set_close_on_unref(s->channel, true);
+ /* NULL encoding means the stream is binary safe */
+ g_io_channel_set_encoding(s->channel, NULL, NULL);
+ /* no buffering */
+ g_io_channel_set_buffered(s->channel, false);
+
+ s->input = fifo_buffer_new(4096);
+ s->output = fifo_buffer_new(4096);
+
+ s->in_source = NULL;
+ s->out_source = NULL;
+
+ tcp_socket_schedule_read(s);
+
+ g_mutex_unlock(s->mutex);
+
+ return s;
+}
+
+void
+tcp_socket_free(struct tcp_socket *s)
+{
+ tcp_socket_close_indirect(s);
+ g_mutex_free(s->mutex);
+ g_free(s);
+}
+
+bool
+tcp_socket_send(struct tcp_socket *s, const void *data, size_t length)
+{
+ assert(s != NULL);
+
+ g_mutex_lock(s->mutex);
+
+ if (s->output == NULL || s->channel == NULL) {
+ /* already disconnected */
+ g_mutex_unlock(s->mutex);
+ return false;
+ }
+
+ size_t max_length;
+ void *p = fifo_buffer_write(s->output, &max_length);
+ if (p == NULL || max_length < length) {
+ /* buffer is full */
+ g_mutex_unlock(s->mutex);
+ return false;
+ }
+
+ memcpy(p, data, length);
+ fifo_buffer_append(s->output, length);
+ tcp_socket_schedule_write(s);
+
+ g_mutex_unlock(s->mutex);
+ return true;
+}
+
diff --git a/src/tcp_socket.h b/src/tcp_socket.h
new file mode 100644
index 000000000..b6b367b86
--- /dev/null
+++ b/src/tcp_socket.h
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2003-2011 The Music Player Daemon Project
+ * http://www.musicpd.org
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef MPD_TCP_SOCKET_H
+#define MPD_TCP_SOCKET_H
+
+#include <glib.h>
+
+#include <stdbool.h>
+#include <stddef.h>
+
+struct sockaddr;
+
+struct tcp_socket_handler {
+ /**
+ * New data has arrived.
+ *
+ * @return the number of bytes consumed; 0 if more data is
+ * needed
+ */
+ size_t (*data)(const void *data, size_t length, void *ctx);
+
+ void (*error)(GError *error, void *ctx);
+
+ void (*disconnected)(void *ctx);
+};
+
+static inline GQuark
+tcp_socket_quark(void)
+{
+ return g_quark_from_static_string("tcp_socket");
+}
+
+G_GNUC_MALLOC
+struct tcp_socket *
+tcp_socket_new(int fd,
+ const struct tcp_socket_handler *handler, void *ctx);
+
+void
+tcp_socket_free(struct tcp_socket *s);
+
+bool
+tcp_socket_send(struct tcp_socket *s, const void *data, size_t length);
+
+#endif