From 395191bd75541c1709ee1d1bad4af5140317ddea Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Tue, 30 Aug 2011 08:13:28 +0200 Subject: rtsp_client: use the I/O thread Make the code portable. --- src/rtsp_client.c | 221 ++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 166 insertions(+), 55 deletions(-) (limited to 'src/rtsp_client.c') diff --git a/src/rtsp_client.c b/src/rtsp_client.c index 093b44710..d05efa619 100644 --- a/src/rtsp_client.c +++ b/src/rtsp_client.c @@ -22,13 +22,16 @@ */ #include "rtsp_client.h" +#include "tcp_socket.h" #include "glib_compat.h" +#include #include #include #include #include #include +#include #ifdef WIN32 #define WINVER 0x0501 @@ -37,8 +40,6 @@ #else #include #include -#include -#include #include #endif @@ -78,6 +79,9 @@ rtspcl_open(void) { struct rtspcl_data *rtspcld; rtspcld = g_new0(struct rtspcl_data, 1); + rtspcld->mutex = g_mutex_new(); + rtspcld->cond = g_cond_new(); + rtspcld->received_lines = g_queue_new(); rtspcld->useragent = "RTSPClient"; return rtspcld; } @@ -223,33 +227,141 @@ get_tcp_connect_by_host(int sd, const char *host, short destport, get_tcp_connect(sd, addr, error_r); } +static void +rtsp_client_flush_received(struct rtspcl_data *rtspcld) +{ + char *line; + while ((line = g_queue_pop_head(rtspcld->received_lines)) != NULL) + g_free(line); +} + +static size_t +rtsp_client_socket_data(const void *_data, size_t length, void *ctx) +{ + struct rtspcl_data *rtspcld = ctx; + + g_mutex_lock(rtspcld->mutex); + + if (rtspcld->tcp_socket == NULL) { + g_mutex_unlock(rtspcld->mutex); + return 0; + } + + const bool was_empty = g_queue_is_empty(rtspcld->received_lines); + bool added = false; + const char *data = _data, *end = data + length, *p = data, *eol; + while ((eol = memchr(p, '\n', end - p)) != NULL) { + const char *next = eol + 1; + + if (rtspcld->received_lines->length < 64) { + if (eol > p && eol[-1] == '\r') + --eol; + + g_queue_push_tail(rtspcld->received_lines, + g_strndup(p, eol - p)); + added = true; + } + + p = next; + } + + if (was_empty && added) + g_cond_broadcast(rtspcld->cond); + + g_mutex_unlock(rtspcld->mutex); + + return p - data; +} + +static void +rtsp_client_socket_error(GError *error, void *ctx) +{ + struct rtspcl_data *rtspcld = ctx; + + g_warning("%s", error->message); + g_error_free(error); + + g_mutex_lock(rtspcld->mutex); + + rtsp_client_flush_received(rtspcld); + + struct tcp_socket *s = rtspcld->tcp_socket; + rtspcld->tcp_socket = NULL; + + g_cond_broadcast(rtspcld->cond); + + g_mutex_unlock(rtspcld->mutex); + + if (s != NULL) + tcp_socket_free(s); +} + +static void +rtsp_client_socket_disconnected(void *ctx) +{ + struct rtspcl_data *rtspcld = ctx; + + g_mutex_lock(rtspcld->mutex); + + rtsp_client_flush_received(rtspcld); + + struct tcp_socket *s = rtspcld->tcp_socket; + rtspcld->tcp_socket = NULL; + + g_cond_broadcast(rtspcld->cond); + + g_mutex_unlock(rtspcld->mutex); + + if (s != NULL) + tcp_socket_free(s); +} + +static const struct tcp_socket_handler rtsp_client_socket_handler = { + .data = rtsp_client_socket_data, + .error = rtsp_client_socket_error, + .disconnected = rtsp_client_socket_disconnected, +}; + bool rtspcl_connect(struct rtspcl_data *rtspcld, const char *host, short destport, const char *sid, GError **error_r) { + assert(rtspcld->tcp_socket == NULL); + unsigned short myport = 0; struct sockaddr_in name; socklen_t namelen = sizeof(name); - if ((rtspcld->fd = open_tcp_socket(NULL, &myport, error_r)) == -1) + int fd = open_tcp_socket(NULL, &myport, error_r); + if (fd < 0) return false; - if (!get_tcp_connect_by_host(rtspcld->fd, host, destport, error_r)) + if (!get_tcp_connect_by_host(fd, host, destport, error_r)) return false; - getsockname(rtspcld->fd, (struct sockaddr*)&name, &namelen); + getsockname(fd, (struct sockaddr*)&name, &namelen); memcpy(&rtspcld->local_addr, &name.sin_addr,sizeof(struct in_addr)); sprintf(rtspcld->url, "rtsp://%s/%s", inet_ntoa(name.sin_addr), sid); - getpeername(rtspcld->fd, (struct sockaddr*)&name, &namelen); + getpeername(fd, (struct sockaddr*)&name, &namelen); memcpy(&rtspcld->host_addr, &name.sin_addr, sizeof(struct in_addr)); + + rtspcld->tcp_socket = tcp_socket_new(fd, &rtsp_client_socket_handler, + rtspcld); + return true; } static void rtspcl_disconnect(struct rtspcl_data *rtspcld) { - if (rtspcld->fd > 0) close(rtspcld->fd); - rtspcld->fd = 0; + g_mutex_lock(rtspcld->mutex); + rtsp_client_flush_received(rtspcld); + g_mutex_unlock(rtspcld->mutex); + + if (rtspcld->tcp_socket != NULL) { + tcp_socket_free(rtspcld->tcp_socket); + rtspcld->tcp_socket = NULL; + } } static void @@ -263,8 +375,11 @@ void rtspcl_close(struct rtspcl_data *rtspcld) { rtspcl_disconnect(rtspcld); + g_queue_free(rtspcld->received_lines); rtspcl_remove_all_exthds(rtspcld); g_free(rtspcld->session); + g_cond_free(rtspcld->cond); + g_mutex_free(rtspcld->mutex); g_free(rtspcld); } @@ -294,40 +409,51 @@ rtspcl_add_exthds(struct rtspcl_data *rtspcld, const char *key, char *data) * returned string in line is always null terminated, maxlen-1 is maximum string length */ static int -read_line(int fd, char *line, int maxlen, int timeout, int no_poll) +read_line(struct rtspcl_data *rtspcld, char *line, int maxlen, + int timeout) { - int i, rval; - int count = 0; - struct pollfd pfds; - char ch; - *line = 0; - pfds.events = POLLIN; - pfds.fd = fd; - for (i = 0;i < maxlen; i++) { - if (no_poll || poll(&pfds, 1, timeout)) - rval=read(fd,&ch,1); - else return 0; - - if (rval == -1) { - if (errno == EAGAIN) return 0; - g_warning("%s:read error: %s\n", __func__, strerror(errno)); - return -1; + g_mutex_lock(rtspcld->mutex); + + GTimeVal end_time; + if (timeout >= 0) { + g_get_current_time(&end_time); + + end_time.tv_sec += timeout / 1000; + timeout %= 1000; + end_time.tv_usec = timeout * 1000; + if (end_time.tv_usec > 1000000) { + end_time.tv_usec -= 1000000; + ++end_time.tv_sec; + } + } + + while (true) { + if (!g_queue_is_empty(rtspcld->received_lines)) { + /* success, copy to buffer */ + + char *p = g_queue_pop_head(rtspcld->received_lines); + g_mutex_unlock(rtspcld->mutex); + + g_strlcpy(line, p, maxlen); + g_free(p); + + return strlen(line); } - if (rval == 0) { - g_debug("%s:disconnected on the other end\n", __func__); + + if (rtspcld->tcp_socket == NULL) { + /* error */ + g_mutex_unlock(rtspcld->mutex); return -1; } - if(ch == '\n') { - *line = 0; - return count; + + if (timeout < 0) { + g_cond_wait(rtspcld->cond, rtspcld->mutex); + } else if (!g_cond_timed_wait(rtspcld->cond, rtspcld->mutex, + &end_time)) { + g_mutex_unlock(rtspcld->mutex); + return 0; } - if (ch == '\r') continue; - *line++ = ch; - count++; - if (count >= maxlen - 1) break; } - *line = 0; - return count; } /* @@ -346,13 +472,9 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd, char reql[128]; const char delimiters[] = " "; char *token, *dp; - int dsize = 0,rval; + int dsize = 0; int timeout = 5000; // msec unit - fd_set rdfds; - int fdmax = 0; - struct timeval tout = {.tv_sec=10, .tv_usec=0}; - if (!rtspcld) { g_set_error_literal(error_r, rtsp_client_quark(), 0, "not connected"); @@ -393,8 +515,7 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd, if (content_type && content) strncat(req, content, sizeof(req)); - rval = write(rtspcld->fd, req, strlen(req)); - if (rval < 0) { + if (!tcp_socket_send(rtspcld->tcp_socket, req, strlen(req))) { g_set_error(error_r, rtsp_client_quark(), errno, "write error: %s", g_strerror(errno)); @@ -403,17 +524,7 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd, if (!get_response) return true; - while (true) { - FD_ZERO(&rdfds); - FD_SET(rtspcld->fd, &rdfds); - fdmax = rtspcld->fd; - select(fdmax + 1, &rdfds, NULL, NULL, &tout); - if (FD_ISSET(rtspcld->fd, &rdfds)) { - break; - } - } - - if (read_line(rtspcld->fd, line, sizeof(line), timeout, 0) <= 0) { + if (read_line(rtspcld, line, sizeof(line), timeout) <= 0) { g_set_error_literal(error_r, rtsp_client_quark(), 0, "request failed"); return false; @@ -443,7 +554,7 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd, struct key_data *cur_kd = *kd; struct key_data *new_kd = NULL; - while (read_line(rtspcld->fd, line, sizeof(line), timeout, 0) > 0) { + while (read_line(rtspcld, line, sizeof(line), timeout) > 0) { timeout = 1000; // once it started, it shouldn't take a long time if (new_kd != NULL && line[0] == ' ') { const char *j = line; -- cgit v1.2.3