aboutsummaryrefslogtreecommitdiffstats
path: root/src/rtsp_client.c
diff options
context:
space:
mode:
authorMax Kellermann <max@duempel.org>2011-08-30 08:13:28 +0200
committerMax Kellermann <max@duempel.org>2011-08-31 08:32:09 +0200
commit395191bd75541c1709ee1d1bad4af5140317ddea (patch)
tree7285f02a4a157a025646bae53ea613fc7616bd21 /src/rtsp_client.c
parentec7d8fb6bd7b9a83e354aa5cb1c812435d587fab (diff)
downloadmpd-395191bd75541c1709ee1d1bad4af5140317ddea.tar.gz
mpd-395191bd75541c1709ee1d1bad4af5140317ddea.tar.xz
mpd-395191bd75541c1709ee1d1bad4af5140317ddea.zip
rtsp_client: use the I/O thread
Make the code portable.
Diffstat (limited to 'src/rtsp_client.c')
-rw-r--r--src/rtsp_client.c221
1 files changed, 166 insertions, 55 deletions
diff --git a/src/rtsp_client.c b/src/rtsp_client.c
index 093b44710..d05efa619 100644
--- a/src/rtsp_client.c
+++ b/src/rtsp_client.c
@@ -22,13 +22,16 @@
*/
#include "rtsp_client.h"
+#include "tcp_socket.h"
#include "glib_compat.h"
+#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <stdlib.h>
+#include <sys/time.h>
#ifdef WIN32
#define WINVER 0x0501
@@ -37,8 +40,6 @@
#else
#include <arpa/inet.h>
#include <sys/socket.h>
-#include <sys/select.h>
-#include <sys/poll.h>
#include <netdb.h>
#endif
@@ -78,6 +79,9 @@ rtspcl_open(void)
{
struct rtspcl_data *rtspcld;
rtspcld = g_new0(struct rtspcl_data, 1);
+ rtspcld->mutex = g_mutex_new();
+ rtspcld->cond = g_cond_new();
+ rtspcld->received_lines = g_queue_new();
rtspcld->useragent = "RTSPClient";
return rtspcld;
}
@@ -223,33 +227,141 @@ get_tcp_connect_by_host(int sd, const char *host, short destport,
get_tcp_connect(sd, addr, error_r);
}
+static void
+rtsp_client_flush_received(struct rtspcl_data *rtspcld)
+{
+ char *line;
+ while ((line = g_queue_pop_head(rtspcld->received_lines)) != NULL)
+ g_free(line);
+}
+
+static size_t
+rtsp_client_socket_data(const void *_data, size_t length, void *ctx)
+{
+ struct rtspcl_data *rtspcld = ctx;
+
+ g_mutex_lock(rtspcld->mutex);
+
+ if (rtspcld->tcp_socket == NULL) {
+ g_mutex_unlock(rtspcld->mutex);
+ return 0;
+ }
+
+ const bool was_empty = g_queue_is_empty(rtspcld->received_lines);
+ bool added = false;
+ const char *data = _data, *end = data + length, *p = data, *eol;
+ while ((eol = memchr(p, '\n', end - p)) != NULL) {
+ const char *next = eol + 1;
+
+ if (rtspcld->received_lines->length < 64) {
+ if (eol > p && eol[-1] == '\r')
+ --eol;
+
+ g_queue_push_tail(rtspcld->received_lines,
+ g_strndup(p, eol - p));
+ added = true;
+ }
+
+ p = next;
+ }
+
+ if (was_empty && added)
+ g_cond_broadcast(rtspcld->cond);
+
+ g_mutex_unlock(rtspcld->mutex);
+
+ return p - data;
+}
+
+static void
+rtsp_client_socket_error(GError *error, void *ctx)
+{
+ struct rtspcl_data *rtspcld = ctx;
+
+ g_warning("%s", error->message);
+ g_error_free(error);
+
+ g_mutex_lock(rtspcld->mutex);
+
+ rtsp_client_flush_received(rtspcld);
+
+ struct tcp_socket *s = rtspcld->tcp_socket;
+ rtspcld->tcp_socket = NULL;
+
+ g_cond_broadcast(rtspcld->cond);
+
+ g_mutex_unlock(rtspcld->mutex);
+
+ if (s != NULL)
+ tcp_socket_free(s);
+}
+
+static void
+rtsp_client_socket_disconnected(void *ctx)
+{
+ struct rtspcl_data *rtspcld = ctx;
+
+ g_mutex_lock(rtspcld->mutex);
+
+ rtsp_client_flush_received(rtspcld);
+
+ struct tcp_socket *s = rtspcld->tcp_socket;
+ rtspcld->tcp_socket = NULL;
+
+ g_cond_broadcast(rtspcld->cond);
+
+ g_mutex_unlock(rtspcld->mutex);
+
+ if (s != NULL)
+ tcp_socket_free(s);
+}
+
+static const struct tcp_socket_handler rtsp_client_socket_handler = {
+ .data = rtsp_client_socket_data,
+ .error = rtsp_client_socket_error,
+ .disconnected = rtsp_client_socket_disconnected,
+};
+
bool
rtspcl_connect(struct rtspcl_data *rtspcld, const char *host, short destport,
const char *sid, GError **error_r)
{
+ assert(rtspcld->tcp_socket == NULL);
+
unsigned short myport = 0;
struct sockaddr_in name;
socklen_t namelen = sizeof(name);
- if ((rtspcld->fd = open_tcp_socket(NULL, &myport, error_r)) == -1)
+ int fd = open_tcp_socket(NULL, &myport, error_r);
+ if (fd < 0)
return false;
- if (!get_tcp_connect_by_host(rtspcld->fd, host, destport, error_r))
+ if (!get_tcp_connect_by_host(fd, host, destport, error_r))
return false;
- getsockname(rtspcld->fd, (struct sockaddr*)&name, &namelen);
+ getsockname(fd, (struct sockaddr*)&name, &namelen);
memcpy(&rtspcld->local_addr, &name.sin_addr,sizeof(struct in_addr));
sprintf(rtspcld->url, "rtsp://%s/%s", inet_ntoa(name.sin_addr), sid);
- getpeername(rtspcld->fd, (struct sockaddr*)&name, &namelen);
+ getpeername(fd, (struct sockaddr*)&name, &namelen);
memcpy(&rtspcld->host_addr, &name.sin_addr, sizeof(struct in_addr));
+
+ rtspcld->tcp_socket = tcp_socket_new(fd, &rtsp_client_socket_handler,
+ rtspcld);
+
return true;
}
static void
rtspcl_disconnect(struct rtspcl_data *rtspcld)
{
- if (rtspcld->fd > 0) close(rtspcld->fd);
- rtspcld->fd = 0;
+ g_mutex_lock(rtspcld->mutex);
+ rtsp_client_flush_received(rtspcld);
+ g_mutex_unlock(rtspcld->mutex);
+
+ if (rtspcld->tcp_socket != NULL) {
+ tcp_socket_free(rtspcld->tcp_socket);
+ rtspcld->tcp_socket = NULL;
+ }
}
static void
@@ -263,8 +375,11 @@ void
rtspcl_close(struct rtspcl_data *rtspcld)
{
rtspcl_disconnect(rtspcld);
+ g_queue_free(rtspcld->received_lines);
rtspcl_remove_all_exthds(rtspcld);
g_free(rtspcld->session);
+ g_cond_free(rtspcld->cond);
+ g_mutex_free(rtspcld->mutex);
g_free(rtspcld);
}
@@ -294,40 +409,51 @@ rtspcl_add_exthds(struct rtspcl_data *rtspcld, const char *key, char *data)
* returned string in line is always null terminated, maxlen-1 is maximum string length
*/
static int
-read_line(int fd, char *line, int maxlen, int timeout, int no_poll)
+read_line(struct rtspcl_data *rtspcld, char *line, int maxlen,
+ int timeout)
{
- int i, rval;
- int count = 0;
- struct pollfd pfds;
- char ch;
- *line = 0;
- pfds.events = POLLIN;
- pfds.fd = fd;
- for (i = 0;i < maxlen; i++) {
- if (no_poll || poll(&pfds, 1, timeout))
- rval=read(fd,&ch,1);
- else return 0;
-
- if (rval == -1) {
- if (errno == EAGAIN) return 0;
- g_warning("%s:read error: %s\n", __func__, strerror(errno));
- return -1;
+ g_mutex_lock(rtspcld->mutex);
+
+ GTimeVal end_time;
+ if (timeout >= 0) {
+ g_get_current_time(&end_time);
+
+ end_time.tv_sec += timeout / 1000;
+ timeout %= 1000;
+ end_time.tv_usec = timeout * 1000;
+ if (end_time.tv_usec > 1000000) {
+ end_time.tv_usec -= 1000000;
+ ++end_time.tv_sec;
+ }
+ }
+
+ while (true) {
+ if (!g_queue_is_empty(rtspcld->received_lines)) {
+ /* success, copy to buffer */
+
+ char *p = g_queue_pop_head(rtspcld->received_lines);
+ g_mutex_unlock(rtspcld->mutex);
+
+ g_strlcpy(line, p, maxlen);
+ g_free(p);
+
+ return strlen(line);
}
- if (rval == 0) {
- g_debug("%s:disconnected on the other end\n", __func__);
+
+ if (rtspcld->tcp_socket == NULL) {
+ /* error */
+ g_mutex_unlock(rtspcld->mutex);
return -1;
}
- if(ch == '\n') {
- *line = 0;
- return count;
+
+ if (timeout < 0) {
+ g_cond_wait(rtspcld->cond, rtspcld->mutex);
+ } else if (!g_cond_timed_wait(rtspcld->cond, rtspcld->mutex,
+ &end_time)) {
+ g_mutex_unlock(rtspcld->mutex);
+ return 0;
}
- if (ch == '\r') continue;
- *line++ = ch;
- count++;
- if (count >= maxlen - 1) break;
}
- *line = 0;
- return count;
}
/*
@@ -346,13 +472,9 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd,
char reql[128];
const char delimiters[] = " ";
char *token, *dp;
- int dsize = 0,rval;
+ int dsize = 0;
int timeout = 5000; // msec unit
- fd_set rdfds;
- int fdmax = 0;
- struct timeval tout = {.tv_sec=10, .tv_usec=0};
-
if (!rtspcld) {
g_set_error_literal(error_r, rtsp_client_quark(), 0,
"not connected");
@@ -393,8 +515,7 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd,
if (content_type && content)
strncat(req, content, sizeof(req));
- rval = write(rtspcld->fd, req, strlen(req));
- if (rval < 0) {
+ if (!tcp_socket_send(rtspcld->tcp_socket, req, strlen(req))) {
g_set_error(error_r, rtsp_client_quark(), errno,
"write error: %s",
g_strerror(errno));
@@ -403,17 +524,7 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd,
if (!get_response) return true;
- while (true) {
- FD_ZERO(&rdfds);
- FD_SET(rtspcld->fd, &rdfds);
- fdmax = rtspcld->fd;
- select(fdmax + 1, &rdfds, NULL, NULL, &tout);
- if (FD_ISSET(rtspcld->fd, &rdfds)) {
- break;
- }
- }
-
- if (read_line(rtspcld->fd, line, sizeof(line), timeout, 0) <= 0) {
+ if (read_line(rtspcld, line, sizeof(line), timeout) <= 0) {
g_set_error_literal(error_r, rtsp_client_quark(), 0,
"request failed");
return false;
@@ -443,7 +554,7 @@ exec_request(struct rtspcl_data *rtspcld, const char *cmd,
struct key_data *cur_kd = *kd;
struct key_data *new_kd = NULL;
- while (read_line(rtspcld->fd, line, sizeof(line), timeout, 0) > 0) {
+ while (read_line(rtspcld, line, sizeof(line), timeout) > 0) {
timeout = 1000; // once it started, it shouldn't take a long time
if (new_kd != NULL && line[0] == ' ') {
const char *j = line;