From 395191bd75541c1709ee1d1bad4af5140317ddea Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Tue, 30 Aug 2011 08:13:28 +0200 Subject: rtsp_client: use the I/O thread Make the code portable. --- Makefile.am | 2 + src/rtsp_client.c | 221 +++++++++++++++++++++++-------- src/rtsp_client.h | 8 +- src/tcp_socket.c | 381 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/tcp_socket.h | 61 +++++++++ 5 files changed, 617 insertions(+), 56 deletions(-) create mode 100644 src/tcp_socket.c create mode 100644 src/tcp_socket.h diff --git a/Makefile.am b/Makefile.am index 5baffd018..764dee3eb 100644 --- a/Makefile.am +++ b/Makefile.am @@ -294,6 +294,7 @@ src_mpd_SOURCES = \ src/client_message.c \ src/client_subscribe.h \ src/client_subscribe.c \ + src/tcp_socket.c src/tcp_socket.h \ src/udp_server.c src/udp_server.h \ src/server_socket.c \ src/listen.c \ @@ -1126,6 +1127,7 @@ test_run_output_SOURCES = test/run_output.c \ src/conf.c src/tokenizer.c src/utils.c src/string_util.c src/log.c \ src/io_thread.c src/io_thread.h \ src/udp_server.c src/udp_server.h \ + src/tcp_socket.c src/tcp_socket.h \ src/audio_check.c \ src/audio_format.c \ src/audio_parser.c \ diff --git a/src/rtsp_client.c b/src/rtsp_client.c index 093b44710..d05efa619 100644 --- a/src/rtsp_client.c +++ b/src/rtsp_client.c @@ -22,13 +22,16 @@ */ #include "rtsp_client.h" +#include "tcp_socket.h" #include "glib_compat.h" +#include #include #include #include #include #include +#include #ifdef WIN32 #define WINVER 0x0501 @@ -37,8 +40,6 @@ #else #include #include -#include -#include #include #endif @@ -78,6 +79,9 @@ rtspcl_open(void) { struct rtspcl_data *rtspcld; rtspcld = g_new0(struct rtspcl_data, 1); + rtspcld->mutex = g_mutex_new(); + rtspcld->cond = g_cond_new(); + rtspcld->received_lines = g_queue_new(); rtspcld->useragent = "RTSPClient"; return rtspcld; } @@ -223,33 +227,141 @@ get_tcp_connect_by_host(int sd, const char *host, short destport, get_tcp_connect(sd, addr, error_r); } +static void +rtsp_client_flush_received(struct rtspcl_data *rtspcld) +{ + char *line; + while ((line = g_queue_pop_head(rtspcld->received_lines)) != NULL) + g_free(line); +} + +static size_t +rtsp_client_socket_data(const void *_data, size_t length, void *ctx) +{ + struct rtspcl_data *rtspcld = ctx; + + g_mutex_lock(rtspcld->mutex); + + if (rtspcld->tcp_socket == NULL) { + g_mutex_unlock(rtspcld->mutex); + return 0; + } + + const bool was_empty = g_queue_is_empty(rtspcld->received_lines); + bool added = false; + const char *data = _data, *end = data + length, *p = data, *eol; + while ((eol = memchr(p, '\n', end - p)) != NULL) { + const char *next = eol + 1; + + if (rtspcld->received_lines->length < 64) { + if (eol > p && eol[-1] == '\r') + --eol; + + g_queue_push_tail(rtspcld->received_lines, + g_strndup(p, eol - p)); + added = true; + } + + p = next; + } + + if (was_empty && added) + g_cond_broadcast(rtspcld->cond); + + g_mutex_unlock(rtspcld->mutex); + + return p - data; +} + +static void +rtsp_client_socket_error(GError *error, void *ctx) +{ + struct rtspcl_data *rtspcld = ctx; + + g_warning("%s", error->message); + g_error_free(error); + + g_mutex_lock(rtspcld->mutex); + + rtsp_client_flush_received(rtspcld); + + struct tcp_socket *s = rtspcld->tcp_socket; + rtspcld->tcp_socket = NULL; + + g_cond_broadcast(rtspcld->cond); + + g_mutex_unlock(rtspcld->mutex); + + if (s != NULL) + tcp_socket_free(s); +} + +static void +rtsp_client_socket_disconnected(void *ctx) +{ + struct rtspcl_data *rtspcld = ctx; + + g_mutex_lock(rtspcld->mutex); + + rtsp_client_flush_received(rtspcld); + + struct tcp_socket *s = rtspcld->tcp_socket; + rtspcld->tcp_socket = NULL; + + g_cond_broadcast(rtspcld->cond); + + g_mutex_unlock(rtspcld->mutex); + + if (s != NULL) + tcp_socket_free(s); +} + +static const struct tcp_socket_handler rtsp_client_socket_handler = { + .data = rtsp_client_socket_data, + .error = rtsp_client_socket_error, + .disconnected = rtsp_client_socket_disconnected, +}; + bool rtspcl_connect(struct rtspcl_data *rtspcld, const char *host, short destport, const char *sid, GError **error_r) { + assert(rtspcld->tcp_socket == NULL); + unsigned short myport = 0; struct sockaddr_in name; socklen_t namelen = sizeof(name); - if ((rtspcld->fd = open_tcp_socket(NULL, &myport, error_r)) == -1) + int fd = open_tcp_socket(NULL, &myport, error_r); + if (fd < 0) return false; - if (!get_tcp_connect_by_host(rtspcld->fd, host, destport, error_r)) + if (!get_tcp_connect_by_host(fd, host, destport, error_r)) return false; - getsockname(rtspcld->fd, (struct sockaddr*)&name, &namelen); + getsockname(fd, (struct sockaddr*)&name, &namelen); memcpy(&rtspcld->local_addr, &name.sin_addr,sizeof(struct in_addr)); sprintf(rtspcld->url, "rtsp://%s/%s", inet_ntoa(name.sin_addr), sid); - getpeername(rtspcld->fd, (struct sockaddr*)&name, &namelen); + getpeername(fd, (struct sockaddr*)&name, &namelen); memcpy(&rtspcld->host_addr, &name.sin_addr, sizeof(struct in_addr)); + + rtspcld->tcp_socket = tcp_socket_new(fd, &rtsp_client_socket_handler, + rtspcld); + return true; } static void rtspcl_disconnect(struct rtspcl_data *rtspcld) { - if (rtspcld->fd > 0) close(rtspcld->fd); - rtspcld->fd = 0; + g_mutex_lock(rtspcld->mutex); + rtsp_client_flush_received(rtspcld); + g_mutex_unlock(rtspcld->mutex); + + if (rtspcld->tcp_socket != NULL) { + tcp_socket_free(rtspcld->tcp_socket); + rtspcld->tcp_socket = NULL; + } } static void @@ -263,8 +375,11 @@ void rtspcl_close(struct rtspcl_data *rtspcld) { rtspcl_disconnect(rtspcld); + g_queue_free(rtspcld->received_lines); rtspcl_remove_all_exthds(rtspcld); g_free(rtspcld->session); + g_cond_free(rtspcld->cond); + g_mutex_free(rtspcld->mutex); g_free(rtspcld); } @@ -294,40 +409,51 @@ rtspcl_add_exthds(struct rtspcl_data *rtspcld, const char *key, char *data) * returned string in line is always null terminated, maxlen-1 is maximum string length */ static int -read_line(int fd, char *line, int maxlen, int timeout, int no_poll) +read_line(struct rtspcl_data *rtspcld, char *line, int maxlen, + int timeout) { - int i, rval; - int count = 0; - struct pollfd pfds; - char ch; - *line = 0; - pfds.events = POLLIN; - pfds.fd = fd; - for (i = 0;i < maxlen; i++) { - if (no_poll || poll(&pfds, 1, timeout)) - rval=read(fd,&ch,1); - else return 0; - - if (rval == -1) { - if (errno == EAGAIN) return 0; - g_warning("%s:read error: %s\n", __func__, strerror(errno)); - return -1; + g_mutex_lock(rtspcld->mutex); + + GTimeVal end_time; + if (timeout >= 0) { + g_get_current_time(&end_time); + + end_time.tv_sec += timeout / 1000; + timeout %= 1000; + end_time.tv_usec = timeout * 1000; + if (end_time.tv_usec > 1000000) { + end_time.tv_usec -= 1000000; + ++end_time.tv_sec; + } + } + + while (true) { + if (!g_queue_is_empty(rtspcld->received_lines)) { + /* success, copy to buffer */ + + char *p = g_queue_pop_head(rtspcld->received_lines); + g_mutex_unlock(rtspcld->mutex); + + g_strlcpy(line, p, maxlen); + g_free(p); + + return strlen(line); } - if (rval == 0) { - g_debug("%s:disconnected on the other end\n", __func__); + + if (rtspcld->tcp_socket == NULL) { + /* error */ + g_mutex_unlock(rtspcld->mutex); return -1; } - if(ch == '\n') { - *line = 0; - return count; + + if (timeout < 0) { + g_cond_wait(rtspcld->cond, rtspcld->mutex); + } else if (!g_cond_timed_wait(rtspcld->cond, rtspcld->mutex, + &end_time)) { + g_mutex_unlock(rtspcld->mutex); + return 0; } - if (ch == '\r') continue; - *line++ = ch; - count++; - if (count >= maxlen - 1) break; } - *line = 0; - return count; } /* @@ -346,13 +472,9 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd, char reql[128]; const char delimiters[] = " "; char *token, *dp; - int dsize = 0,rval; + int dsize = 0; int timeout = 5000; // msec unit - fd_set rdfds; - int fdmax = 0; - struct timeval tout = {.tv_sec=10, .tv_usec=0}; - if (!rtspcld) { g_set_error_literal(error_r, rtsp_client_quark(), 0, "not connected"); @@ -393,8 +515,7 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd, if (content_type && content) strncat(req, content, sizeof(req)); - rval = write(rtspcld->fd, req, strlen(req)); - if (rval < 0) { + if (!tcp_socket_send(rtspcld->tcp_socket, req, strlen(req))) { g_set_error(error_r, rtsp_client_quark(), errno, "write error: %s", g_strerror(errno)); @@ -403,17 +524,7 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd, if (!get_response) return true; - while (true) { - FD_ZERO(&rdfds); - FD_SET(rtspcld->fd, &rdfds); - fdmax = rtspcld->fd; - select(fdmax + 1, &rdfds, NULL, NULL, &tout); - if (FD_ISSET(rtspcld->fd, &rdfds)) { - break; - } - } - - if (read_line(rtspcld->fd, line, sizeof(line), timeout, 0) <= 0) { + if (read_line(rtspcld, line, sizeof(line), timeout) <= 0) { g_set_error_literal(error_r, rtsp_client_quark(), 0, "request failed"); return false; @@ -443,7 +554,7 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd, struct key_data *cur_kd = *kd; struct key_data *new_kd = NULL; - while (read_line(rtspcld->fd, line, sizeof(line), timeout, 0) > 0) { + while (read_line(rtspcld, line, sizeof(line), timeout) > 0) { timeout = 1000; // once it started, it shouldn't take a long time if (new_kd != NULL && line[0] == ' ') { const char *j = line; diff --git a/src/rtsp_client.h b/src/rtsp_client.h index 5b56af9da..5c8425248 100644 --- a/src/rtsp_client.h +++ b/src/rtsp_client.h @@ -42,7 +42,13 @@ struct key_data { }; struct rtspcl_data { - int fd; + GMutex *mutex; + GCond *cond; + + GQueue *received_lines; + + struct tcp_socket *tcp_socket; + char url[128]; int cseq; struct key_data *exthds; diff --git a/src/tcp_socket.c b/src/tcp_socket.c new file mode 100644 index 000000000..f65b9c07c --- /dev/null +++ b/src/tcp_socket.c @@ -0,0 +1,381 @@ +/* + * Copyright (C) 2003-2011 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "tcp_socket.h" +#include "fifo_buffer.h" +#include "io_thread.h" + +#include +#include + +#ifdef WIN32 +#define WINVER 0x0501 +#include +#include +#else +#include +#include +#endif + +struct tcp_socket { + const struct tcp_socket_handler *handler; + void *handler_ctx; + + GMutex *mutex; + + GIOChannel *channel; + GSource *in_source, *out_source; + + struct fifo_buffer *input, *output; +}; + +static gboolean +tcp_event(GIOChannel *source, GIOCondition condition, gpointer data); + +static void +tcp_socket_schedule_read(struct tcp_socket *s) +{ + assert(s->input != NULL); + assert(!fifo_buffer_is_full(s->input)); + + if (s->in_source != NULL) + return; + + s->in_source = g_io_create_watch(s->channel, + G_IO_IN|G_IO_ERR|G_IO_HUP); + g_source_set_callback(s->in_source, (GSourceFunc)tcp_event, s, NULL); + g_source_attach(s->in_source, io_thread_context()); +} + +static void +tcp_socket_unschedule_read(struct tcp_socket *s) +{ + if (s->in_source == NULL) + return; + + g_source_destroy(s->in_source); + g_source_unref(s->in_source); + s->in_source = NULL; +} + +static void +tcp_socket_schedule_write(struct tcp_socket *s) +{ + assert(s->output != NULL); + assert(!fifo_buffer_is_empty(s->output)); + + if (s->out_source != NULL) + return; + + s->out_source = g_io_create_watch(s->channel, G_IO_OUT); + g_source_set_callback(s->out_source, (GSourceFunc)tcp_event, s, NULL); + g_source_attach(s->out_source, io_thread_context()); +} + +static void +tcp_socket_unschedule_write(struct tcp_socket *s) +{ + if (s->out_source == NULL) + return; + + g_source_destroy(s->out_source); + g_source_unref(s->out_source); + s->out_source = NULL; +} + +/** + * Close the socket. Caller must lock the mutex. + */ +static void +tcp_socket_close(struct tcp_socket *s) +{ + tcp_socket_unschedule_read(s); + tcp_socket_unschedule_write(s); + + if (s->channel != NULL) { + g_io_channel_unref(s->channel); + s->channel = NULL; + } + + if (s->input != NULL) { + fifo_buffer_free(s->input); + s->input = NULL; + } + + if (s->output != NULL) { + fifo_buffer_free(s->output); + s->output = NULL; + } +} + +static gpointer +tcp_socket_close_callback(gpointer data) +{ + struct tcp_socket *s = data; + + g_mutex_lock(s->mutex); + tcp_socket_close(s); + g_mutex_unlock(s->mutex); + + return NULL; +} + +static void +tcp_socket_close_indirect(struct tcp_socket *s) +{ + io_thread_call(tcp_socket_close_callback, s); + + assert(s->channel == NULL); + assert(s->in_source == NULL); + assert(s->out_source == NULL); +} + +static void +tcp_handle_input(struct tcp_socket *s) +{ + size_t length; + const void *p = fifo_buffer_read(s->input, &length); + if (p == NULL) + return; + + g_mutex_unlock(s->mutex); + size_t consumed = s->handler->data(p, length, s->handler_ctx); + g_mutex_lock(s->mutex); + if (consumed > 0 && s->input != NULL) + fifo_buffer_consume(s->input, consumed); +} + +static bool +tcp_in_event(struct tcp_socket *s) +{ + assert(s != NULL); + assert(s->channel != NULL); + + g_mutex_lock(s->mutex); + + size_t max_length; + void *p = fifo_buffer_write(s->input, &max_length); + if (p == NULL) { + GError *error = g_error_new_literal(tcp_socket_quark(), 0, + "buffer overflow"); + tcp_socket_close(s); + g_mutex_unlock(s->mutex); + s->handler->error(error, s->handler_ctx); + return false; + } + + gsize bytes_read; + GError *error = NULL; + GIOStatus status = g_io_channel_read_chars(s->channel, + p, max_length, + &bytes_read, &error); + switch (status) { + case G_IO_STATUS_NORMAL: + fifo_buffer_append(s->input, bytes_read); + tcp_handle_input(s); + g_mutex_unlock(s->mutex); + return true; + + case G_IO_STATUS_AGAIN: + /* try again later */ + g_mutex_unlock(s->mutex); + return true; + + case G_IO_STATUS_EOF: + /* peer disconnected */ + tcp_socket_close(s); + g_mutex_unlock(s->mutex); + s->handler->disconnected(s->handler_ctx); + return false; + + case G_IO_STATUS_ERROR: + /* I/O error */ + tcp_socket_close(s); + g_mutex_unlock(s->mutex); + s->handler->error(error, s->handler_ctx); + return false; + } + + /* unreachable */ + assert(false); + return true; +} + +static bool +tcp_out_event(struct tcp_socket *s) +{ + assert(s != NULL); + assert(s->channel != NULL); + + g_mutex_lock(s->mutex); + + size_t length; + const void *p = fifo_buffer_read(s->output, &length); + if (p == NULL) { + /* no more data in the output buffer, remove the + output event */ + tcp_socket_unschedule_write(s); + g_mutex_unlock(s->mutex); + return false; + } + + gsize bytes_written; + GError *error = NULL; + GIOStatus status = g_io_channel_write_chars(s->channel, p, length, + &bytes_written, &error); + switch (status) { + case G_IO_STATUS_NORMAL: + fifo_buffer_consume(s->output, bytes_written); + g_mutex_unlock(s->mutex); + return true; + + case G_IO_STATUS_AGAIN: + tcp_socket_schedule_write(s); + g_mutex_unlock(s->mutex); + return true; + + case G_IO_STATUS_EOF: + /* peer disconnected */ + tcp_socket_close(s); + g_mutex_unlock(s->mutex); + s->handler->disconnected(s->handler_ctx); + return false; + + case G_IO_STATUS_ERROR: + /* I/O error */ + tcp_socket_close(s); + g_mutex_unlock(s->mutex); + s->handler->error(error, s->handler_ctx); + return false; + } + + /* unreachable */ + g_mutex_unlock(s->mutex); + assert(false); + return true; +} + +static gboolean +tcp_event(G_GNUC_UNUSED GIOChannel *source, GIOCondition condition, + gpointer data) +{ + struct tcp_socket *s = data; + + assert(source == s->channel); + + switch (condition) { + case G_IO_IN: + case G_IO_PRI: + return tcp_in_event(s); + + case G_IO_OUT: + return tcp_out_event(s); + + case G_IO_ERR: + case G_IO_HUP: + case G_IO_NVAL: + tcp_socket_close(s); + s->handler->disconnected(s->handler_ctx); + return false; + } + + /* unreachable */ + assert(false); + return false; +} + +struct tcp_socket * +tcp_socket_new(int fd, + const struct tcp_socket_handler *handler, void *ctx) +{ + assert(fd >= 0); + assert(handler != NULL); + assert(handler->data != NULL); + assert(handler->error != NULL); + assert(handler->disconnected != NULL); + + struct tcp_socket *s = g_new(struct tcp_socket, 1); + s->handler = handler; + s->handler_ctx = ctx; + s->mutex = g_mutex_new(); + + g_mutex_lock(s->mutex); + +#ifndef G_OS_WIN32 + s->channel = g_io_channel_unix_new(fd); +#else + s->channel = g_io_channel_win32_new_socket(fd); +#endif + /* GLib is responsible for closing the file descriptor */ + g_io_channel_set_close_on_unref(s->channel, true); + /* NULL encoding means the stream is binary safe */ + g_io_channel_set_encoding(s->channel, NULL, NULL); + /* no buffering */ + g_io_channel_set_buffered(s->channel, false); + + s->input = fifo_buffer_new(4096); + s->output = fifo_buffer_new(4096); + + s->in_source = NULL; + s->out_source = NULL; + + tcp_socket_schedule_read(s); + + g_mutex_unlock(s->mutex); + + return s; +} + +void +tcp_socket_free(struct tcp_socket *s) +{ + tcp_socket_close_indirect(s); + g_mutex_free(s->mutex); + g_free(s); +} + +bool +tcp_socket_send(struct tcp_socket *s, const void *data, size_t length) +{ + assert(s != NULL); + + g_mutex_lock(s->mutex); + + if (s->output == NULL || s->channel == NULL) { + /* already disconnected */ + g_mutex_unlock(s->mutex); + return false; + } + + size_t max_length; + void *p = fifo_buffer_write(s->output, &max_length); + if (p == NULL || max_length < length) { + /* buffer is full */ + g_mutex_unlock(s->mutex); + return false; + } + + memcpy(p, data, length); + fifo_buffer_append(s->output, length); + tcp_socket_schedule_write(s); + + g_mutex_unlock(s->mutex); + return true; +} + diff --git a/src/tcp_socket.h b/src/tcp_socket.h new file mode 100644 index 000000000..b6b367b86 --- /dev/null +++ b/src/tcp_socket.h @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2003-2011 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_TCP_SOCKET_H +#define MPD_TCP_SOCKET_H + +#include + +#include +#include + +struct sockaddr; + +struct tcp_socket_handler { + /** + * New data has arrived. + * + * @return the number of bytes consumed; 0 if more data is + * needed + */ + size_t (*data)(const void *data, size_t length, void *ctx); + + void (*error)(GError *error, void *ctx); + + void (*disconnected)(void *ctx); +}; + +static inline GQuark +tcp_socket_quark(void) +{ + return g_quark_from_static_string("tcp_socket"); +} + +G_GNUC_MALLOC +struct tcp_socket * +tcp_socket_new(int fd, + const struct tcp_socket_handler *handler, void *ctx); + +void +tcp_socket_free(struct tcp_socket *s); + +bool +tcp_socket_send(struct tcp_socket *s, const void *data, size_t length); + +#endif -- cgit v1.2.3