diff options
Diffstat (limited to '')
27 files changed, 2382 insertions, 349 deletions
diff --git a/src/input/archive_input_plugin.c b/src/input/archive_input_plugin.c index 97e4836ff..4a038b9e2 100644 --- a/src/input/archive_input_plugin.c +++ b/src/input/archive_input_plugin.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify @@ -34,7 +34,9 @@ * plugin and gzip fetches file from disk */ static struct input_stream * -input_archive_open(const char *pathname, GError **error_r) +input_archive_open(const char *pathname, + GMutex *mutex, GCond *cond, + GError **error_r) { const struct archive_plugin *arplug; struct archive_file *file; @@ -65,7 +67,8 @@ input_archive_open(const char *pathname, GError **error_r) return NULL; //setup fileops - is = archive_file_open_stream(file, filename, error_r); + is = archive_file_open_stream(file, filename, mutex, cond, + error_r); archive_file_close(file); g_free(pname); diff --git a/src/input/archive_input_plugin.h b/src/input/archive_input_plugin.h index 20568cfbe..51095f37f 100644 --- a/src/input/archive_input_plugin.h +++ b/src/input/archive_input_plugin.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify diff --git a/src/input/cdio_paranoia_input_plugin.c b/src/input/cdio_paranoia_input_plugin.c new file mode 100644 index 000000000..1a1c8d2c5 --- /dev/null +++ b/src/input/cdio_paranoia_input_plugin.c @@ -0,0 +1,392 @@ +/* + * Copyright (C) 2003-2011 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +/** + * CD-Audio handling (requires libcdio_paranoia) + */ + +#include "config.h" +#include "input/cdio_paranoia_input_plugin.h" +#include "input_internal.h" +#include "input_plugin.h" +#include "refcount.h" +#include "pcm_buffer.h" + +#include <stdio.h> +#include <stdint.h> +#include <stddef.h> +#include <string.h> +#include <stdlib.h> +#include <glib.h> +#include <assert.h> + +#include <cdio/paranoia.h> +#include <cdio/cd_types.h> + +struct input_cdio_paranoia { + struct input_stream base; + + cdrom_drive_t *drv; + CdIo_t *cdio; + cdrom_paranoia_t *para; + + int endian; + + lsn_t lsn_from, lsn_to; + int lsn_relofs; + + int trackno; + + char buffer[CDIO_CD_FRAMESIZE_RAW]; + int buffer_lsn; + + struct pcm_buffer conv_buffer; +}; + +static inline GQuark +cdio_quark(void) +{ + return g_quark_from_static_string("cdio"); +} + +static void +input_cdio_close(struct input_stream *is) +{ + struct input_cdio_paranoia *i = (struct input_cdio_paranoia *)is; + + pcm_buffer_deinit(&i->conv_buffer); + + if (i->para) + cdio_paranoia_free(i->para); + if (i->drv) + cdio_cddap_close_no_free_cdio( i->drv); + if (i->cdio) + cdio_destroy( i->cdio ); + + input_stream_deinit(&i->base); + g_free(i); +} + +struct cdio_uri { + char device[64]; + int track; +}; + +static bool +parse_cdio_uri(struct cdio_uri *dest, const char *src, GError **error_r) +{ + if (!g_str_has_prefix(src, "cdda://")) + return false; + + src += 7; + + if (*src == 0) { + /* play the whole CD in the default drive */ + dest->device[0] = 0; + dest->track = -1; + return true; + } + + const char *slash = strrchr(src, '/'); + if (slash == NULL) { + /* play the whole CD in the specified drive */ + g_strlcpy(dest->device, src, sizeof(dest->device)); + dest->track = -1; + return true; + } + + size_t device_length = slash - src; + if (device_length >= sizeof(dest->device)) + device_length = sizeof(dest->device) - 1; + + memcpy(dest->device, src, device_length); + dest->device[device_length] = 0; + + const char *track = slash + 1; + + char *endptr; + dest->track = strtoul(track, &endptr, 10); + if (*endptr != 0) { + g_set_error(error_r, cdio_quark(), 0, + "Malformed track number"); + return false; + } + + if (endptr == track) + /* play the whole CD */ + dest->track = -1; + + return true; +} + +static char * +cdio_detect_device(void) +{ + char **devices = cdio_get_devices_with_cap(NULL, CDIO_FS_AUDIO, false); + if (devices == NULL) + return NULL; + + char *device = g_strdup(devices[0]); + cdio_free_device_list(devices); + + return device; +} + +static struct input_stream * +input_cdio_open(const char *uri, + GMutex *mutex, GCond *cond, + GError **error_r) +{ + struct input_cdio_paranoia *i; + + struct cdio_uri parsed_uri; + if (!parse_cdio_uri(&parsed_uri, uri, error_r)) + return NULL; + + i = g_new(struct input_cdio_paranoia, 1); + input_stream_init(&i->base, &input_plugin_cdio_paranoia, uri, + mutex, cond); + + /* initialize everything (should be already) */ + i->drv = NULL; + i->cdio = NULL; + i->para = NULL; + i->trackno = parsed_uri.track; + pcm_buffer_init(&i->conv_buffer); + + /* get list of CD's supporting CD-DA */ + char *device = parsed_uri.device[0] != 0 + ? g_strdup(parsed_uri.device) + : cdio_detect_device(); + if (device == NULL) { + g_set_error(error_r, cdio_quark(), 0, + "Unable find or access a CD-ROM drive with an audio CD in it."); + input_cdio_close(&i->base); + return NULL; + } + + /* Found such a CD-ROM with a CD-DA loaded. Use the first drive in the list. */ + i->cdio = cdio_open(device, DRIVER_UNKNOWN); + g_free(device); + + i->drv = cdio_cddap_identify_cdio(i->cdio, 1, NULL); + + if ( !i->drv ) { + g_set_error(error_r, cdio_quark(), 0, + "Unable to identify audio CD disc."); + input_cdio_close(&i->base); + return NULL; + } + + cdda_verbose_set(i->drv, CDDA_MESSAGE_FORGETIT, CDDA_MESSAGE_FORGETIT); + + if ( 0 != cdio_cddap_open(i->drv) ) { + g_set_error(error_r, cdio_quark(), 0, "Unable to open disc."); + input_cdio_close(&i->base); + return NULL; + } + + i->endian = data_bigendianp(i->drv); + switch (i->endian) { + case -1: + g_debug("cdda: drive returns unknown audio data, assuming Little Endian"); + i->endian = 0; + break; + case 0: + g_debug("cdda: drive returns audio data Little Endian."); + break; + case 1: + g_debug("cdda: drive returns audio data Big Endian."); + break; + default: + g_set_error(error_r, cdio_quark(), 0, + "Drive returns unknown data type %d", i->endian); + input_cdio_close(&i->base); + return NULL; + } + + i->lsn_relofs = 0; + + if (i->trackno >= 0) { + i->lsn_from = cdio_get_track_lsn(i->cdio, i->trackno); + i->lsn_to = cdio_get_track_last_lsn(i->cdio, i->trackno); + } else { + i->lsn_from = 0; + i->lsn_to = cdio_get_disc_last_lsn(i->cdio); + } + + i->para = cdio_paranoia_init(i->drv); + + /* Set reading mode for full paranoia, but allow skipping sectors. */ + paranoia_modeset(i->para, PARANOIA_MODE_FULL^PARANOIA_MODE_NEVERSKIP); + + /* seek to beginning of the track */ + cdio_paranoia_seek(i->para, i->lsn_from, SEEK_SET); + + i->base.ready = true; + i->base.seekable = true; + i->base.size = (i->lsn_to - i->lsn_from + 1) * CDIO_CD_FRAMESIZE_RAW; + + /* hack to make MPD select the "pcm" decoder plugin */ + i->base.mime = g_strdup("audio/x-mpd-cdda-pcm"); + + return &i->base; +} + +static bool +input_cdio_seek(struct input_stream *is, + goffset offset, int whence, GError **error_r) +{ + struct input_cdio_paranoia *cis = (struct input_cdio_paranoia *)is; + + /* calculate absolute offset */ + switch (whence) { + case SEEK_SET: + break; + case SEEK_CUR: + offset += cis->base.offset; + break; + case SEEK_END: + offset += cis->base.size; + break; + } + + if (offset < 0 || offset > cis->base.size) { + g_set_error(error_r, cdio_quark(), 0, + "Invalid offset to seek %ld (%ld)", + (long int)offset, (long int)cis->base.size); + return false; + } + + /* simple case */ + if (offset == cis->base.offset) + return true; + + /* calculate current LSN */ + cis->lsn_relofs = offset / CDIO_CD_FRAMESIZE_RAW; + cis->base.offset = offset; + + cdio_paranoia_seek(cis->para, cis->lsn_from + cis->lsn_relofs, SEEK_SET); + + return true; +} + +static inline size_t +pcm16_to_wave(uint16_t *dst16, const uint16_t *src16, size_t length) +{ + size_t cnt = length >> 1; + while (cnt > 0) { + *dst16++ = GUINT16_TO_LE(*src16++); + cnt--; + } + return length; +} + +static size_t +input_cdio_read(struct input_stream *is, void *ptr, size_t length, + GError **error_r) +{ + struct input_cdio_paranoia *cis = (struct input_cdio_paranoia *)is; + size_t nbytes = 0; + int diff; + size_t len, maxwrite; + int16_t *rbuf; + char *s_err, *s_mess; + char *wptr = (char *) ptr; + + while (length > 0) { + + + /* end of track ? */ + if (cis->lsn_from + cis->lsn_relofs > cis->lsn_to) + break; + + //current sector was changed ? + if (cis->lsn_relofs != cis->buffer_lsn) { + rbuf = cdio_paranoia_read(cis->para, NULL); + + s_err = cdda_errors(cis->drv); + if (s_err) { + g_warning("paranoia_read: %s", s_err ); + free(s_err); + } + s_mess = cdda_messages(cis->drv); + if (s_mess) { + free(s_mess); + } + if (!rbuf) { + g_set_error(error_r, cdio_quark(), 0, + "paranoia read error. Stopping."); + return 0; + } + //do the swapping if nessesary + if (cis->endian != 0) { + uint16_t *conv_buffer = pcm_buffer_get(&cis->conv_buffer, CDIO_CD_FRAMESIZE_RAW ); + /* do endian conversion ! */ + pcm16_to_wave( conv_buffer, (uint16_t*) rbuf, CDIO_CD_FRAMESIZE_RAW); + rbuf = (int16_t *)conv_buffer; + } + //store current buffer + memcpy(cis->buffer, rbuf, CDIO_CD_FRAMESIZE_RAW); + cis->buffer_lsn = cis->lsn_relofs; + } else { + //use cached sector + rbuf = (int16_t*) cis->buffer; + } + + //correct offset + diff = cis->base.offset - cis->lsn_relofs * CDIO_CD_FRAMESIZE_RAW; + + assert(diff >= 0 && diff < CDIO_CD_FRAMESIZE_RAW); + + maxwrite = CDIO_CD_FRAMESIZE_RAW - diff; //# of bytes pending in current buffer + len = (length < maxwrite? length : maxwrite); + + //skip diff bytes from this lsn + memcpy(wptr, ((char*)rbuf) + diff, len); + //update pointer + wptr += len; + nbytes += len; + + //update offset + cis->base.offset += len; + cis->lsn_relofs = cis->base.offset / CDIO_CD_FRAMESIZE_RAW; + //update length + length -= len; + } + + return nbytes; +} + +static bool +input_cdio_eof(struct input_stream *is) +{ + struct input_cdio_paranoia *cis = (struct input_cdio_paranoia *)is; + + return (cis->lsn_from + cis->lsn_relofs > cis->lsn_to); +} + +const struct input_plugin input_plugin_cdio_paranoia = { + .name = "cdio_paranoia", + .open = input_cdio_open, + .close = input_cdio_close, + .seek = input_cdio_seek, + .read = input_cdio_read, + .eof = input_cdio_eof +}; diff --git a/src/input/cdio_paranoia_input_plugin.h b/src/input/cdio_paranoia_input_plugin.h new file mode 100644 index 000000000..71c5cbe8d --- /dev/null +++ b/src/input/cdio_paranoia_input_plugin.h @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2003-2011 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_CDIO_PARANOIA_INPUT_PLUGIN_H +#define MPD_CDIO_PARANOIA_INPUT_PLUGIN_H + +/** + * An input plugin based on libcdio_paranoia library. + */ +extern const struct input_plugin input_plugin_cdio_paranoia; + +#endif diff --git a/src/input/curl_input_plugin.c b/src/input/curl_input_plugin.c index 604965dd1..88a5556d2 100644 --- a/src/input/curl_input_plugin.c +++ b/src/input/curl_input_plugin.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify @@ -19,10 +19,12 @@ #include "config.h" #include "input/curl_input_plugin.h" +#include "input_internal.h" #include "input_plugin.h" #include "conf.h" #include "tag.h" #include "icy_metadata.h" +#include "io_thread.h" #include "glib_compat.h" #include <assert.h> @@ -50,6 +52,11 @@ static const size_t CURL_MAX_BUFFERED = 512 * 1024; /** + * Resume the stream at this number of bytes after it has been paused. + */ +static const size_t CURL_RESUME_AT = 384 * 1024; + +/** * Buffers created by input_curl_writefunction(). */ struct buffer { @@ -73,17 +80,29 @@ struct input_curl { /** the curl handles */ CURL *easy; - CURLM *multi; + + /** the GMainLoop source used to poll all CURL file + descriptors */ + GSource *source; + + /** the source id of #source */ + guint source_id; + + /** a linked list of all registered GPollFD objects */ + GSList *fds; /** list of buffers, where input_curl_writefunction() appends to, and input_curl_read() reads from them */ GQueue *buffers; - /** has something been added to the buffers list? */ - bool buffered; - - /** did libcurl tell us the we're at the end of the response body? */ - bool eof; +#if LIBCURL_VERSION_NUM >= 0x071200 + /** + * Is the connection currently paused? That happens when the + * buffer was getting too large. It will be unpaused when the + * buffer is below the threshold again. + */ + bool paused; +#endif /** error message provided by libcurl */ char error[CURL_ERROR_SIZE]; @@ -97,6 +116,8 @@ struct input_curl { /** the tag object ready to be requested via input_stream_tag() */ struct tag *tag; + + GError *postponed_error; }; /** libcurl should accept "ICY 200 OK" */ @@ -106,20 +127,527 @@ static struct curl_slist *http_200_aliases; static const char *proxy, *proxy_user, *proxy_password; static unsigned proxy_port; +static struct { + CURLM *multi; + + /** + * A linked list of all active HTTP requests. An active + * request is one that doesn't have the "eof" flag set. + */ + GSList *requests; + + /** + * The GMainLoop source used to poll all CURL file + * descriptors. + */ + GSource *source; + + /** + * The source id of #source. + */ + guint source_id; + + GSList *fds; + +#if LIBCURL_VERSION_NUM >= 0x070f04 + /** + * Did CURL give us a timeout? If yes, then we need to call + * curl_multi_perform(), even if there was no event on any + * file descriptor. + */ + bool timeout; + + /** + * The absolute time stamp when the timeout expires. This is + * used in the GSource method check(). + */ + GTimeVal absolute_timeout; +#endif +} curl; + static inline GQuark curl_quark(void) { return g_quark_from_static_string("curl"); } +/** + * Find a request by its CURL "easy" handle. + * + * Runs in the I/O thread. No lock needed. + */ +static struct input_curl * +input_curl_find_request(CURL *easy) +{ + assert(io_thread_inside()); + + for (GSList *i = curl.requests; i != NULL; i = g_slist_next(i)) { + struct input_curl *c = i->data; + if (c->easy == easy) + return c; + } + + return NULL; +} + +#if LIBCURL_VERSION_NUM >= 0x071200 + +static gpointer +input_curl_resume(gpointer data) +{ + assert(io_thread_inside()); + + struct input_curl *c = data; + + if (c->paused) { + c->paused = false; + curl_easy_pause(c->easy, CURLPAUSE_CONT); + } + + return NULL; +} + +#endif + +/** + * Calculates the GLib event bit mask for one file descriptor, + * obtained from three #fd_set objects filled by curl_multi_fdset(). + */ +static gushort +input_curl_fd_events(int fd, fd_set *rfds, fd_set *wfds, fd_set *efds) +{ + gushort events = 0; + + if (FD_ISSET(fd, rfds)) { + events |= G_IO_IN | G_IO_HUP | G_IO_ERR; + FD_CLR(fd, rfds); + } + + if (FD_ISSET(fd, wfds)) { + events |= G_IO_OUT | G_IO_ERR; + FD_CLR(fd, wfds); + } + + if (FD_ISSET(fd, efds)) { + events |= G_IO_HUP | G_IO_ERR; + FD_CLR(fd, efds); + } + + return events; +} + +/** + * Updates all registered GPollFD objects, unregisters old ones, + * registers new ones. + * + * Runs in the I/O thread. No lock needed. + */ +static void +curl_update_fds(void) +{ + assert(io_thread_inside()); + + fd_set rfds, wfds, efds; + + FD_ZERO(&rfds); + FD_ZERO(&wfds); + FD_ZERO(&efds); + + int max_fd; + CURLMcode mcode = curl_multi_fdset(curl.multi, &rfds, &wfds, + &efds, &max_fd); + if (mcode != CURLM_OK) { + g_warning("curl_multi_fdset() failed: %s\n", + curl_multi_strerror(mcode)); + return; + } + + GSList *fds = curl.fds; + curl.fds = NULL; + + while (fds != NULL) { + GPollFD *poll_fd = fds->data; + gushort events = input_curl_fd_events(poll_fd->fd, &rfds, + &wfds, &efds); + + assert(poll_fd->events != 0); + + fds = g_slist_remove(fds, poll_fd); + + if (events != poll_fd->events) + g_source_remove_poll(curl.source, poll_fd); + + if (events != 0) { + if (events != poll_fd->events) { + poll_fd->events = events; + g_source_add_poll(curl.source, poll_fd); + } + + curl.fds = g_slist_prepend(curl.fds, poll_fd); + } else { + g_free(poll_fd); + } + } + + for (int fd = 0; fd <= max_fd; ++fd) { + gushort events = input_curl_fd_events(fd, &rfds, &wfds, &efds); + if (events != 0) { + GPollFD *poll_fd = g_new(GPollFD, 1); + poll_fd->fd = fd; + poll_fd->events = events; + g_source_add_poll(curl.source, poll_fd); + curl.fds = g_slist_prepend(curl.fds, poll_fd); + } + } +} + +/** + * Runs in the I/O thread. No lock needed. + */ +static bool +input_curl_easy_add(struct input_curl *c, GError **error_r) +{ + assert(io_thread_inside()); + assert(c != NULL); + assert(c->easy != NULL); + assert(input_curl_find_request(c->easy) == NULL); + + curl.requests = g_slist_prepend(curl.requests, c); + + CURLMcode mcode = curl_multi_add_handle(curl.multi, c->easy); + if (mcode != CURLM_OK) { + g_set_error(error_r, curl_quark(), mcode, + "curl_multi_add_handle() failed: %s", + curl_multi_strerror(mcode)); + return false; + } + + curl_update_fds(); + + return true; +} + +struct easy_add_params { + struct input_curl *c; + GError **error_r; +}; + +static gpointer +input_curl_easy_add_callback(gpointer data) +{ + const struct easy_add_params *params = data; + + bool success = input_curl_easy_add(params->c, params->error_r); + return GUINT_TO_POINTER(success); +} + +/** + * Call input_curl_easy_add() in the I/O thread. May be called from + * any thread. Caller must not hold a mutex. + */ +static bool +input_curl_easy_add_indirect(struct input_curl *c, GError **error_r) +{ + assert(c != NULL); + assert(c->easy != NULL); + + struct easy_add_params params = { + .c = c, + .error_r = error_r, + }; + + gpointer result = + io_thread_call(input_curl_easy_add_callback, ¶ms); + return GPOINTER_TO_UINT(result); +} + +/** + * Frees the current "libcurl easy" handle, and everything associated + * with it. + * + * Runs in the I/O thread. + */ +static void +input_curl_easy_free(struct input_curl *c) +{ + assert(io_thread_inside()); + assert(c != NULL); + + if (c->easy == NULL) + return; + + curl.requests = g_slist_remove(curl.requests, c); + + curl_multi_remove_handle(curl.multi, c->easy); + curl_easy_cleanup(c->easy); + c->easy = NULL; + + curl_slist_free_all(c->request_headers); + c->request_headers = NULL; + + g_free(c->range); + c->range = NULL; +} + +static gpointer +input_curl_easy_free_callback(gpointer data) +{ + struct input_curl *c = data; + + input_curl_easy_free(c); + curl_update_fds(); + + return NULL; +} + +/** + * Frees the current "libcurl easy" handle, and everything associated + * with it. + * + * The mutex must not be locked. + */ +static void +input_curl_easy_free_indirect(struct input_curl *c) +{ + io_thread_call(input_curl_easy_free_callback, c); + assert(c->easy == NULL); +} + +/** + * Abort and free all HTTP requests. + * + * Runs in the I/O thread. The caller must not hold locks. + */ +static void +input_curl_abort_all_requests(GError *error) +{ + assert(io_thread_inside()); + assert(error != NULL); + + while (curl.requests != NULL) { + struct input_curl *c = curl.requests->data; + assert(c->postponed_error == NULL); + + input_curl_easy_free(c); + + g_mutex_lock(c->base.mutex); + c->postponed_error = g_error_copy(error); + c->base.ready = true; + g_cond_broadcast(c->base.cond); + g_mutex_unlock(c->base.mutex); + } + + g_error_free(error); + +} + +/** + * A HTTP request is finished. + * + * Runs in the I/O thread. The caller must not hold locks. + */ +static void +input_curl_request_done(struct input_curl *c, CURLcode result, long status) +{ + assert(io_thread_inside()); + assert(c != NULL); + assert(c->easy == NULL); + assert(c->postponed_error == NULL); + + g_mutex_lock(c->base.mutex); + + if (result != CURLE_OK) { + c->postponed_error = g_error_new(curl_quark(), result, + "curl failed: %s", + c->error); + } else if (status < 200 || status >= 300) { + c->postponed_error = g_error_new(curl_quark(), 0, + "got HTTP status %ld", + status); + } + + c->base.ready = true; + g_cond_broadcast(c->base.cond); + g_mutex_unlock(c->base.mutex); +} + +static void +input_curl_handle_done(CURL *easy_handle, CURLcode result) +{ + struct input_curl *c = input_curl_find_request(easy_handle); + assert(c != NULL); + + long status = 0; + curl_easy_getinfo(easy_handle, CURLINFO_RESPONSE_CODE, &status); + + input_curl_easy_free(c); + input_curl_request_done(c, result, status); +} + +/** + * Check for finished HTTP responses. + * + * Runs in the I/O thread. The caller must not hold locks. + */ +static void +input_curl_info_read(void) +{ + assert(io_thread_inside()); + + CURLMsg *msg; + int msgs_in_queue; + + while ((msg = curl_multi_info_read(curl.multi, + &msgs_in_queue)) != NULL) { + if (msg->msg == CURLMSG_DONE) + input_curl_handle_done(msg->easy_handle, msg->data.result); + } +} + +/** + * Give control to CURL. + * + * Runs in the I/O thread. The caller must not hold locks. + */ +static bool +input_curl_perform(void) +{ + assert(io_thread_inside()); + + CURLMcode mcode; + + do { + int running_handles; + mcode = curl_multi_perform(curl.multi, &running_handles); + } while (mcode == CURLM_CALL_MULTI_PERFORM); + + if (mcode != CURLM_OK && mcode != CURLM_CALL_MULTI_PERFORM) { + GError *error = g_error_new(curl_quark(), mcode, + "curl_multi_perform() failed: %s", + curl_multi_strerror(mcode)); + input_curl_abort_all_requests(error); + return false; + } + + return true; +} + +/* + * GSource methods + * + */ + +/** + * The GSource prepare() method implementation. + */ +static gboolean +input_curl_source_prepare(G_GNUC_UNUSED GSource *source, gint *timeout_r) +{ + curl_update_fds(); + +#if LIBCURL_VERSION_NUM >= 0x070f04 + curl.timeout = false; + + long timeout2; + CURLMcode mcode = curl_multi_timeout(curl.multi, &timeout2); + if (mcode == CURLM_OK) { + if (timeout2 >= 0) { + g_source_get_current_time(source, + &curl.absolute_timeout); + g_time_val_add(&curl.absolute_timeout, + timeout2 * 1000); + } + + if (timeout2 >= 0 && timeout2 < 10) + /* CURL 7.21.1 likes to report "timeout=0", + which means we're running in a busy loop. + Quite a bad idea to waste so much CPU. + Let's use a lower limit of 10ms. */ + timeout2 = 10; + + *timeout_r = timeout2; + + curl.timeout = timeout2 >= 0; + } else + g_warning("curl_multi_timeout() failed: %s\n", + curl_multi_strerror(mcode)); +#else + (void)timeout_r; +#endif + + return false; +} + +/** + * The GSource check() method implementation. + */ +static gboolean +input_curl_source_check(G_GNUC_UNUSED GSource *source) +{ +#if LIBCURL_VERSION_NUM >= 0x070f04 + if (curl.timeout) { + /* when a timeout has expired, we need to call + curl_multi_perform(), even if there was no file + descriptor event */ + + GTimeVal now; + g_source_get_current_time(source, &now); + if (now.tv_sec > curl.absolute_timeout.tv_sec || + (now.tv_sec == curl.absolute_timeout.tv_sec && + now.tv_usec >= curl.absolute_timeout.tv_usec)) + return true; + } +#endif + + for (GSList *i = curl.fds; i != NULL; i = i->next) { + GPollFD *poll_fd = i->data; + if (poll_fd->revents != 0) + return true; + } + + return false; +} + +/** + * The GSource dispatch() method implementation. The callback isn't + * used, because we're handling all events directly. + */ +static gboolean +input_curl_source_dispatch(G_GNUC_UNUSED GSource *source, + G_GNUC_UNUSED GSourceFunc callback, + G_GNUC_UNUSED gpointer user_data) +{ + if (input_curl_perform()) + input_curl_info_read(); + + return true; +} + +/** + * The vtable for our GSource implementation. Unfortunately, we + * cannot declare it "const", because g_source_new() takes a non-const + * pointer, for whatever reason. + */ +static GSourceFuncs curl_source_funcs = { + .prepare = input_curl_source_prepare, + .check = input_curl_source_check, + .dispatch = input_curl_source_dispatch, +}; + +/* + * input_plugin methods + * + */ + static bool input_curl_init(const struct config_param *param, G_GNUC_UNUSED GError **error_r) { CURLcode code = curl_global_init(CURL_GLOBAL_ALL); if (code != CURLE_OK) { - g_warning("curl_global_init() failed: %s\n", - curl_easy_strerror(code)); + g_set_error(error_r, curl_quark(), code, + "curl_global_init() failed: %s\n", + curl_easy_strerror(code)); return false; } @@ -140,20 +668,48 @@ input_curl_init(const struct config_param *param, ""); } + curl.multi = curl_multi_init(); + if (curl.multi == NULL) { + g_set_error(error_r, curl_quark(), 0, + "curl_multi_init() failed"); + return false; + } + + curl.source = g_source_new(&curl_source_funcs, sizeof(*curl.source)); + curl.source_id = g_source_attach(curl.source, io_thread_context()); + return true; } +static gpointer +curl_destroy_sources(G_GNUC_UNUSED gpointer data) +{ + g_source_destroy(curl.source); + + return NULL; +} + static void input_curl_finish(void) { + assert(curl.requests == NULL); + + io_thread_call(curl_destroy_sources, NULL); + + curl_multi_cleanup(curl.multi); + curl_slist_free_all(http_200_aliases); curl_global_cleanup(); } +#if LIBCURL_VERSION_NUM >= 0x071200 + /** * Determine the total sizes of all buffers, including portions that * have already been consumed. + * + * The caller must lock the mutex. */ G_GNUC_PURE static size_t @@ -170,6 +726,8 @@ curl_total_buffer_size(const struct input_curl *c) return total; } +#endif + static void buffer_free_callback(gpointer data, G_GNUC_UNUSED gpointer user_data) { @@ -180,31 +738,15 @@ buffer_free_callback(gpointer data, G_GNUC_UNUSED gpointer user_data) g_free(buffer); } -/** - * Frees the current "libcurl easy" handle, and everything associated - * with it. - */ static void -input_curl_easy_free(struct input_curl *c) +input_curl_flush_buffers(struct input_curl *c) { - if (c->easy != NULL) { - curl_multi_remove_handle(c->multi, c->easy); - curl_easy_cleanup(c->easy); - c->easy = NULL; - } - - curl_slist_free_all(c->request_headers); - c->request_headers = NULL; - - g_free(c->range); - c->range = NULL; - g_queue_foreach(c->buffers, buffer_free_callback, NULL); g_queue_clear(c->buffers); } /** - * Frees this stream (but not the input_stream struct itself). + * Frees this stream, including the input_stream struct. */ static void input_curl_free(struct input_curl *c) @@ -213,142 +755,53 @@ input_curl_free(struct input_curl *c) tag_free(c->tag); g_free(c->meta_name); - input_curl_easy_free(c); - - if (c->multi != NULL) - curl_multi_cleanup(c->multi); + input_curl_easy_free_indirect(c); + input_curl_flush_buffers(c); g_queue_free(c->buffers); + if (c->postponed_error != NULL) + g_error_free(c->postponed_error); + g_free(c->url); input_stream_deinit(&c->base); g_free(c); } -static struct tag * -input_curl_tag(struct input_stream *is) -{ - struct input_curl *c = (struct input_curl *)is; - struct tag *tag = c->tag; - - c->tag = NULL; - return tag; -} - static bool -input_curl_multi_info_read(struct input_curl *c, GError **error_r) +input_curl_check(struct input_stream *is, GError **error_r) { - CURLMsg *msg; - int msgs_in_queue; + struct input_curl *c = (struct input_curl *)is; - while ((msg = curl_multi_info_read(c->multi, - &msgs_in_queue)) != NULL) { - if (msg->msg == CURLMSG_DONE) { - c->eof = true; - c->base.ready = true; - - if (msg->data.result != CURLE_OK) { - g_set_error(error_r, curl_quark(), - msg->data.result, - "curl failed: %s", c->error); - return false; - } - } + bool success = c->postponed_error == NULL; + if (!success) { + g_propagate_error(error_r, c->postponed_error); + c->postponed_error = NULL; } - return true; + return success; } -/** - * Wait for the libcurl socket. - * - * @return -1 on error, 0 if no data is available yet, 1 if data is - * available - */ -static int -input_curl_select(struct input_curl *c, GError **error_r) +static struct tag * +input_curl_tag(struct input_stream *is) { - fd_set rfds, wfds, efds; - int max_fd, ret; - CURLMcode mcode; - struct timeval timeout = { - .tv_sec = 1, - .tv_usec = 0, - }; - - assert(!c->eof); - - FD_ZERO(&rfds); - FD_ZERO(&wfds); - FD_ZERO(&efds); - - mcode = curl_multi_fdset(c->multi, &rfds, &wfds, &efds, &max_fd); - if (mcode != CURLM_OK) { - g_set_error(error_r, curl_quark(), mcode, - "curl_multi_fdset() failed: %s", - curl_multi_strerror(mcode)); - return -1; - } - -#if LIBCURL_VERSION_NUM >= 0x070f04 - long timeout2; - mcode = curl_multi_timeout(c->multi, &timeout2); - if (mcode != CURLM_OK) { - g_warning("curl_multi_timeout() failed: %s\n", - curl_multi_strerror(mcode)); - return -1; - } - - if (timeout2 >= 0) { - if (timeout2 > 10000) - timeout2 = 10000; - - timeout.tv_sec = timeout2 / 1000; - timeout.tv_usec = (timeout2 % 1000) * 1000; - } -#endif - - ret = select(max_fd + 1, &rfds, &wfds, &efds, &timeout); - if (ret < 0) - g_set_error(error_r, g_quark_from_static_string("errno"), - errno, - "select() failed: %s\n", g_strerror(errno)); + struct input_curl *c = (struct input_curl *)is; + struct tag *tag = c->tag; - return ret; + c->tag = NULL; + return tag; } static bool -fill_buffer(struct input_stream *is, GError **error_r) +fill_buffer(struct input_curl *c, GError **error_r) { - struct input_curl *c = (struct input_curl *)is; - CURLMcode mcode = CURLM_CALL_MULTI_PERFORM; + while (c->easy != NULL && g_queue_is_empty(c->buffers)) + g_cond_wait(c->base.cond, c->base.mutex); - while (!c->eof && g_queue_is_empty(c->buffers)) { - int running_handles; - bool bret; - - if (mcode != CURLM_CALL_MULTI_PERFORM) { - /* if we're still here, there is no input yet - - wait for input */ - int ret = input_curl_select(c, error_r); - if (ret <= 0) - /* no data yet or error */ - return false; - } - - mcode = curl_multi_perform(c->multi, &running_handles); - if (mcode != CURLM_OK && mcode != CURLM_CALL_MULTI_PERFORM) { - g_set_error(error_r, curl_quark(), mcode, - "curl_multi_perform() failed: %s", - curl_multi_strerror(mcode)); - c->eof = true; - is->ready = true; - return false; - } - - bret = input_curl_multi_info_read(c, error_r); - if (!bret) - return false; + if (c->postponed_error != NULL) { + g_propagate_error(error_r, c->postponed_error); + c->postponed_error = NULL; + return false; } return !g_queue_is_empty(c->buffers); @@ -444,6 +897,15 @@ copy_icy_tag(struct input_curl *c) c->tag = tag; } +static bool +input_curl_available(struct input_stream *is) +{ + struct input_curl *c = (struct input_curl *)is; + + return c->postponed_error != NULL || c->easy == NULL || + !g_queue_is_empty(c->buffers); +} + static size_t input_curl_read(struct input_stream *is, void *ptr, size_t size, GError **error_r) @@ -456,7 +918,7 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size, do { /* fill the buffer */ - success = fill_buffer(is, error_r); + success = fill_buffer(c, error_r); if (!success) return 0; @@ -476,6 +938,14 @@ input_curl_read(struct input_stream *is, void *ptr, size_t size, is->offset += (goffset)nbytes; +#if LIBCURL_VERSION_NUM >= 0x071200 + if (c->paused && curl_total_buffer_size(c) < CURL_RESUME_AT) { + g_mutex_unlock(c->base.mutex); + io_thread_call(input_curl_resume, c); + g_mutex_lock(c->base.mutex); + } +#endif + return nbytes; } @@ -492,49 +962,7 @@ input_curl_eof(G_GNUC_UNUSED struct input_stream *is) { struct input_curl *c = (struct input_curl *)is; - return c->eof && g_queue_is_empty(c->buffers); -} - -static int -input_curl_buffer(struct input_stream *is, GError **error_r) -{ - struct input_curl *c = (struct input_curl *)is; - - if (curl_total_buffer_size(c) >= CURL_MAX_BUFFERED) - return 0; - - CURLMcode mcode; - int running_handles; - bool ret; - - c->buffered = false; - - if (!is->ready && !c->eof) - /* not ready yet means the caller is waiting in a busy - loop; relax that by calling select() on the - socket */ - if (input_curl_select(c, error_r) < 0) - return -1; - - do { - mcode = curl_multi_perform(c->multi, &running_handles); - } while (mcode == CURLM_CALL_MULTI_PERFORM && - g_queue_is_empty(c->buffers)); - - if (mcode != CURLM_OK && mcode != CURLM_CALL_MULTI_PERFORM) { - g_set_error(error_r, curl_quark(), mcode, - "curl_multi_perform() failed: %s", - curl_multi_strerror(mcode)); - c->eof = true; - is->ready = true; - return -1; - } - - ret = input_curl_multi_info_read(c, error_r); - if (!ret) - return -1; - - return c->buffered; + return c->easy == NULL && g_queue_is_empty(c->buffers); } /** called by curl when new data is available */ @@ -632,15 +1060,27 @@ input_curl_writefunction(void *ptr, size_t size, size_t nmemb, void *stream) if (size == 0) return 0; + g_mutex_lock(c->base.mutex); + +#if LIBCURL_VERSION_NUM >= 0x071200 + if (curl_total_buffer_size(c) + size >= CURL_MAX_BUFFERED) { + c->paused = true; + g_mutex_unlock(c->base.mutex); + return CURL_WRITEFUNC_PAUSE; + } +#endif + buffer = g_malloc(sizeof(*buffer) - sizeof(buffer->data) + size); buffer->size = size; buffer->consumed = 0; memcpy(buffer->data, ptr, size); - g_queue_push_tail(c->buffers, buffer); - c->buffered = true; + g_queue_push_tail(c->buffers, buffer); c->base.ready = true; + g_cond_broadcast(c->base.cond); + g_mutex_unlock(c->base.mutex); + return size; } @@ -648,9 +1088,6 @@ static bool input_curl_easy_init(struct input_curl *c, GError **error_r) { CURLcode code; - CURLMcode mcode; - - c->eof = false; c->easy = curl_easy_init(); if (c->easy == NULL) { @@ -659,14 +1096,6 @@ input_curl_easy_init(struct input_curl *c, GError **error_r) return false; } - mcode = curl_multi_add_handle(c->multi, c->easy); - if (mcode != CURLM_OK) { - g_set_error(error_r, curl_quark(), mcode, - "curl_multi_add_handle() failed: %s", - curl_multi_strerror(mcode)); - return false; - } - curl_easy_setopt(c->easy, CURLOPT_USERAGENT, "Music Player Daemon " VERSION); curl_easy_setopt(c->easy, CURLOPT_HEADERFUNCTION, @@ -677,6 +1106,7 @@ input_curl_easy_init(struct input_curl *c, GError **error_r) curl_easy_setopt(c->easy, CURLOPT_WRITEDATA, c); curl_easy_setopt(c->easy, CURLOPT_HTTP200ALIASES, http_200_aliases); curl_easy_setopt(c->easy, CURLOPT_FOLLOWLOCATION, 1); + curl_easy_setopt(c->easy, CURLOPT_NETRC, 1); curl_easy_setopt(c->easy, CURLOPT_MAXREDIRS, 5); curl_easy_setopt(c->easy, CURLOPT_FAILONERROR, true); curl_easy_setopt(c->easy, CURLOPT_ERRORBUFFER, c->error); @@ -713,38 +1143,6 @@ input_curl_easy_init(struct input_curl *c, GError **error_r) return true; } -void -input_curl_reinit(struct input_stream *is) -{ - struct input_curl *c = (struct input_curl *)is; - - assert(c->base.plugin == &input_plugin_curl); - assert(c->easy != NULL); - - curl_easy_setopt(c->easy, CURLOPT_WRITEHEADER, is); - curl_easy_setopt(c->easy, CURLOPT_WRITEDATA, is); -} - -static bool -input_curl_send_request(struct input_curl *c, GError **error_r) -{ - CURLMcode mcode; - int running_handles; - - do { - mcode = curl_multi_perform(c->multi, &running_handles); - } while (mcode == CURLM_CALL_MULTI_PERFORM); - - if (mcode != CURLM_OK) { - g_set_error(error_r, curl_quark(), mcode, - "curl_multi_perform() failed: %s", - curl_multi_strerror(mcode)); - return false; - } - - return true; -} - static bool input_curl_seek(struct input_stream *is, goffset offset, int whence, GError **error_r) @@ -810,14 +1208,16 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, /* close the old connection and open a new one */ - input_curl_easy_free(c); + g_mutex_unlock(c->base.mutex); + + input_curl_easy_free_indirect(c); + input_curl_flush_buffers(c); is->offset = offset; if (is->offset == is->size) { /* seek to EOF: simulate empty result; avoid triggering a "416 Requested Range Not Satisfiable" response */ - c->eof = true; return true; } @@ -832,53 +1232,59 @@ input_curl_seek(struct input_stream *is, goffset offset, int whence, curl_easy_setopt(c->easy, CURLOPT_RANGE, c->range); } - ret = input_curl_send_request(c, error_r); - if (!ret) + c->base.ready = false; + + if (!input_curl_easy_add_indirect(c, error_r)) return false; - return input_curl_multi_info_read(c, error_r); + g_mutex_lock(c->base.mutex); + + while (!c->base.ready) + g_cond_wait(c->base.cond, c->base.mutex); + + if (c->postponed_error != NULL) { + g_propagate_error(error_r, c->postponed_error); + c->postponed_error = NULL; + return false; + } + + return true; } static struct input_stream * -input_curl_open(const char *url, GError **error_r) +input_curl_open(const char *url, GMutex *mutex, GCond *cond, + GError **error_r) { + assert(mutex != NULL); + assert(cond != NULL); + struct input_curl *c; - bool ret; if (strncmp(url, "http://", 7) != 0) return NULL; c = g_new0(struct input_curl, 1); - input_stream_init(&c->base, &input_plugin_curl, url); + input_stream_init(&c->base, &input_plugin_curl, url, + mutex, cond); c->url = g_strdup(url); c->buffers = g_queue_new(); - c->multi = curl_multi_init(); - if (c->multi == NULL) { - g_set_error(error_r, curl_quark(), 0, - "curl_multi_init() failed"); - input_curl_free(c); - return NULL; - } - icy_clear(&c->icy_metadata); c->tag = NULL; - ret = input_curl_easy_init(c, error_r); - if (!ret) { - input_curl_free(c); - return NULL; - } + c->postponed_error = NULL; + +#if LIBCURL_VERSION_NUM >= 0x071200 + c->paused = false; +#endif - ret = input_curl_send_request(c, error_r); - if (!ret) { + if (!input_curl_easy_init(c, error_r)) { input_curl_free(c); return NULL; } - ret = input_curl_multi_info_read(c, error_r); - if (!ret) { + if (!input_curl_easy_add_indirect(c, error_r)) { input_curl_free(c); return NULL; } @@ -893,8 +1299,9 @@ const struct input_plugin input_plugin_curl = { .open = input_curl_open, .close = input_curl_close, + .check = input_curl_check, .tag = input_curl_tag, - .buffer = input_curl_buffer, + .available = input_curl_available, .read = input_curl_read, .eof = input_curl_eof, .seek = input_curl_seek, diff --git a/src/input/curl_input_plugin.h b/src/input/curl_input_plugin.h index be7db4e26..c6e71bf40 100644 --- a/src/input/curl_input_plugin.h +++ b/src/input/curl_input_plugin.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify @@ -24,12 +24,4 @@ struct input_stream; extern const struct input_plugin input_plugin_curl; -/** - * This is a workaround for an input_stream API deficiency; after - * exchanging the input_stream pointer in input_rewind_open(), this - * function is called to reinitialize CURL's data pointers. - */ -void -input_curl_reinit(struct input_stream *is); - #endif diff --git a/src/input/despotify_input_plugin.c b/src/input/despotify_input_plugin.c new file mode 100644 index 000000000..200a0afd6 --- /dev/null +++ b/src/input/despotify_input_plugin.c @@ -0,0 +1,230 @@ +/* + * Copyright (C) 2011 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "config.h" +#include "input/despotify_input_plugin.h" +#include "input_internal.h" +#include "input_plugin.h" +#include "tag.h" +#include "despotify_utils.h" + +#include <glib.h> + +#include <unistd.h> +#include <string.h> +#include <errno.h> +#include <despotify.h> + +#include <stdio.h> + +struct input_despotify { + struct input_stream base; + + struct despotify_session *session; + struct ds_track *track; + struct tag *tag; + struct ds_pcm_data pcm; + size_t len_available; + bool eof; +}; + + +static void +refill_buffer(struct input_despotify *ctx) +{ + /* Wait until there is data */ + while (1) { + int rc = despotify_get_pcm(ctx->session, &ctx->pcm); + + if (rc == 0 && ctx->pcm.len) { + ctx->len_available = ctx->pcm.len; + break; + } + if (ctx->eof == true) + break; + + if (rc < 0) { + g_debug("despotify_get_pcm error\n"); + ctx->eof = true; + break; + } + + /* Wait a while until next iteration */ + usleep(50 * 1000); + } +} + +static void callback(G_GNUC_UNUSED struct despotify_session* ds, + int sig, G_GNUC_UNUSED void* data, void* callback_data) +{ + struct input_despotify *ctx = (struct input_despotify *)callback_data; + + switch (sig) { + case DESPOTIFY_NEW_TRACK: + break; + + case DESPOTIFY_TIME_TELL: + break; + + case DESPOTIFY_TRACK_PLAY_ERROR: + g_debug("Track play error\n"); + ctx->eof = true; + ctx->len_available = 0; + break; + + case DESPOTIFY_END_OF_PLAYLIST: + ctx->eof = true; + g_debug("End of playlist: %d\n", ctx->eof); + break; + } +} + + +static struct input_stream * +input_despotify_open(const char *url, + GMutex *mutex, GCond *cond, + G_GNUC_UNUSED GError **error_r) +{ + struct input_despotify *ctx; + struct despotify_session *session; + struct ds_link *ds_link; + struct ds_track *track; + + if (!g_str_has_prefix(url, "spt://")) + return NULL; + + session = mpd_despotify_get_session(); + if (!session) + return NULL; + + ds_link = despotify_link_from_uri(url + 6); + if (!ds_link) { + g_debug("Can't find %s\n", url); + return NULL; + } + if (ds_link->type != LINK_TYPE_TRACK) { + despotify_free_link(ds_link); + return NULL; + } + + ctx = g_new(struct input_despotify, 1); + memset(ctx, 0, sizeof(*ctx)); + + track = despotify_link_get_track(session, ds_link); + despotify_free_link(ds_link); + if (!track) { + g_free(ctx); + return NULL; + } + + input_stream_init(&ctx->base, &input_plugin_despotify, url, + mutex, cond); + ctx->session = session; + ctx->track = track; + ctx->tag = mpd_despotify_tag_from_track(track); + ctx->eof = false; + /* Despotify outputs pcm data */ + ctx->base.mime = g_strdup("audio/x-mpd-cdda-pcm"); + ctx->base.ready = true; + + if (!mpd_despotify_register_callback(callback, ctx)) { + despotify_free_link(ds_link); + + return NULL; + } + + if (despotify_play(ctx->session, ctx->track, false) == false) { + despotify_free_track(ctx->track); + g_free(ctx); + return NULL; + } + + return &ctx->base; +} + +static size_t +input_despotify_read(struct input_stream *is, void *ptr, size_t size, + G_GNUC_UNUSED GError **error_r) +{ + struct input_despotify *ctx = (struct input_despotify *)is; + size_t to_cpy = size; + + if (ctx->len_available == 0) + refill_buffer(ctx); + + if (ctx->len_available < size) + to_cpy = ctx->len_available; + memcpy(ptr, ctx->pcm.buf, to_cpy); + ctx->len_available -= to_cpy; + + is->offset += to_cpy; + + return to_cpy; +} + +static void +input_despotify_close(struct input_stream *is) +{ + struct input_despotify *ctx = (struct input_despotify *)is; + + if (ctx->tag != NULL) + tag_free(ctx->tag); + + mpd_despotify_unregister_callback(callback); + despotify_free_track(ctx->track); + input_stream_deinit(&ctx->base); + g_free(ctx); +} + +static bool +input_despotify_eof(struct input_stream *is) +{ + struct input_despotify *ctx = (struct input_despotify *)is; + + return ctx->eof; +} + +static bool +input_despotify_seek(G_GNUC_UNUSED struct input_stream *is, + G_GNUC_UNUSED goffset offset, G_GNUC_UNUSED int whence, + G_GNUC_UNUSED GError **error_r) +{ + return false; +} + +static struct tag * +input_despotify_tag(struct input_stream *is) +{ + struct input_despotify *ctx = (struct input_despotify *)is; + struct tag *tag = ctx->tag; + + ctx->tag = NULL; + + return tag; +} + +const struct input_plugin input_plugin_despotify = { + .name = "spt", + .open = input_despotify_open, + .close = input_despotify_close, + .read = input_despotify_read, + .eof = input_despotify_eof, + .seek = input_despotify_seek, + .tag = input_despotify_tag, +}; diff --git a/src/input/despotify_input_plugin.h b/src/input/despotify_input_plugin.h new file mode 100644 index 000000000..4c070d882 --- /dev/null +++ b/src/input/despotify_input_plugin.h @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2011 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef INPUT_DESPOTIFY_H +#define INPUT_DESPOTIFY_H + +extern const struct input_plugin input_plugin_despotify; + +#endif diff --git a/src/input/ffmpeg_input_plugin.c b/src/input/ffmpeg_input_plugin.c index 0a6be29bc..1c64b52c1 100644 --- a/src/input/ffmpeg_input_plugin.c +++ b/src/input/ffmpeg_input_plugin.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify @@ -19,15 +19,11 @@ #include "config.h" #include "input/ffmpeg_input_plugin.h" +#include "input_internal.h" #include "input_plugin.h" -#ifdef OLD_FFMPEG_INCLUDES -#include <avio.h> -#include <avformat.h> -#else #include <libavformat/avio.h> #include <libavformat/avformat.h> -#endif #undef G_LOG_DOMAIN #define G_LOG_DOMAIN "input_ffmpeg" @@ -35,7 +31,11 @@ struct input_ffmpeg { struct input_stream base; +#if LIBAVFORMAT_VERSION_INT >= AV_VERSION_INT(53,0,0) + AVIOContext *h; +#else URLContext *h; +#endif bool eof; }; @@ -46,26 +46,37 @@ ffmpeg_quark(void) return g_quark_from_static_string("ffmpeg"); } +static inline bool +input_ffmpeg_supported(void) +{ +#if LIBAVFORMAT_VERSION_INT >= AV_VERSION_INT(53,0,0) + void *opaque = NULL; + return avio_enum_protocols(&opaque, 0) != NULL; +#else + return av_protocol_next(NULL) != NULL; +#endif +} + static bool input_ffmpeg_init(G_GNUC_UNUSED const struct config_param *param, G_GNUC_UNUSED GError **error_r) { av_register_all(); -#if LIBAVFORMAT_VERSION_MAJOR >= 52 /* disable this plugin if there's no registered protocol */ - if (av_protocol_next(NULL) == NULL) { + if (!input_ffmpeg_supported()) { g_set_error(error_r, ffmpeg_quark(), 0, "No protocol"); return false; } -#endif return true; } static struct input_stream * -input_ffmpeg_open(const char *uri, GError **error_r) +input_ffmpeg_open(const char *uri, + GMutex *mutex, GCond *cond, + GError **error_r) { struct input_ffmpeg *i; @@ -78,9 +89,16 @@ input_ffmpeg_open(const char *uri, GError **error_r) return NULL; i = g_new(struct input_ffmpeg, 1); - input_stream_init(&i->base, &input_plugin_ffmpeg, uri); + input_stream_init(&i->base, &input_plugin_ffmpeg, uri, + mutex, cond); +#if LIBAVFORMAT_VERSION_INT >= AV_VERSION_INT(53,1,0) + int ret = avio_open(&i->h, uri, AVIO_FLAG_READ); +#elif LIBAVFORMAT_VERSION_INT >= AV_VERSION_INT(53,0,0) + int ret = avio_open(&i->h, uri, AVIO_RDONLY); +#else int ret = url_open(&i->h, uri, URL_RDONLY); +#endif if (ret != 0) { g_free(i); g_set_error(error_r, ffmpeg_quark(), ret, @@ -91,8 +109,13 @@ input_ffmpeg_open(const char *uri, GError **error_r) i->eof = false; i->base.ready = true; +#if LIBAVFORMAT_VERSION_INT >= AV_VERSION_INT(53,0,0) + i->base.seekable = (i->h->seekable & AVIO_SEEKABLE_NORMAL) != 0; + i->base.size = avio_size(i->h); +#else i->base.seekable = !i->h->is_streamed; i->base.size = url_filesize(i->h); +#endif /* hack to make MPD select the "ffmpeg" decoder plugin - since avio.h doesn't tell us the MIME type of the resource, we @@ -109,7 +132,11 @@ input_ffmpeg_read(struct input_stream *is, void *ptr, size_t size, { struct input_ffmpeg *i = (struct input_ffmpeg *)is; +#if LIBAVFORMAT_VERSION_INT >= AV_VERSION_INT(53,0,0) + int ret = avio_read(i->h, ptr, size); +#else int ret = url_read(i->h, ptr, size); +#endif if (ret <= 0) { if (ret < 0) g_set_error(error_r, ffmpeg_quark(), 0, @@ -128,7 +155,11 @@ input_ffmpeg_close(struct input_stream *is) { struct input_ffmpeg *i = (struct input_ffmpeg *)is; +#if LIBAVFORMAT_VERSION_INT >= AV_VERSION_INT(53,0,0) + avio_close(i->h); +#else url_close(i->h); +#endif input_stream_deinit(&i->base); g_free(i); } @@ -146,7 +177,11 @@ input_ffmpeg_seek(struct input_stream *is, goffset offset, int whence, G_GNUC_UNUSED GError **error_r) { struct input_ffmpeg *i = (struct input_ffmpeg *)is; +#if LIBAVFORMAT_VERSION_INT >= AV_VERSION_INT(53,0,0) + int64_t ret = avio_seek(i->h, offset, whence); +#else int64_t ret = url_seek(i->h, offset, whence); +#endif if (ret >= 0) { i->eof = false; diff --git a/src/input/ffmpeg_input_plugin.h b/src/input/ffmpeg_input_plugin.h index ff87064be..393836ca5 100644 --- a/src/input/ffmpeg_input_plugin.h +++ b/src/input/ffmpeg_input_plugin.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify diff --git a/src/input/file_input_plugin.c b/src/input/file_input_plugin.c index 3646c656e..5ee3f200b 100644 --- a/src/input/file_input_plugin.c +++ b/src/input/file_input_plugin.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify @@ -19,6 +19,7 @@ #include "config.h" /* must be first for large file support */ #include "input/file_input_plugin.h" +#include "input_internal.h" #include "input_plugin.h" #include "fd_util.h" #include "open.h" @@ -45,14 +46,16 @@ file_quark(void) } static struct input_stream * -input_file_open(const char *filename, GError **error_r) +input_file_open(const char *filename, + GMutex *mutex, GCond *cond, + GError **error_r) { int fd, ret; struct stat st; struct file_input_stream *fis; if (!g_path_is_absolute(filename)) - return false; + return NULL; fd = open_cloexec(filename, O_RDONLY|O_BINARY, 0); if (fd < 0) { @@ -60,7 +63,7 @@ input_file_open(const char *filename, GError **error_r) g_set_error(error_r, file_quark(), errno, "Failed to open \"%s\": %s", filename, g_strerror(errno)); - return false; + return NULL; } ret = fstat(fd, &st); @@ -69,14 +72,14 @@ input_file_open(const char *filename, GError **error_r) "Failed to stat \"%s\": %s", filename, g_strerror(errno)); close(fd); - return false; + return NULL; } if (!S_ISREG(st.st_mode)) { g_set_error(error_r, file_quark(), 0, "Not a regular file: %s", filename); close(fd); - return false; + return NULL; } #ifdef POSIX_FADV_SEQUENTIAL @@ -84,7 +87,8 @@ input_file_open(const char *filename, GError **error_r) #endif fis = g_new(struct file_input_stream, 1); - input_stream_init(&fis->base, &input_plugin_file, filename); + input_stream_init(&fis->base, &input_plugin_file, filename, + mutex, cond); fis->base.size = st.st_size; fis->base.seekable = true; diff --git a/src/input/file_input_plugin.h b/src/input/file_input_plugin.h index 40340e8bd..f24769d57 100644 --- a/src/input/file_input_plugin.h +++ b/src/input/file_input_plugin.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify diff --git a/src/input/mms_input_plugin.c b/src/input/mms_input_plugin.c index 834d111b8..cff15125b 100644 --- a/src/input/mms_input_plugin.c +++ b/src/input/mms_input_plugin.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify @@ -19,6 +19,7 @@ #include "config.h" #include "input/mms_input_plugin.h" +#include "input_internal.h" #include "input_plugin.h" #include <glib.h> @@ -45,7 +46,9 @@ mms_quark(void) } static struct input_stream * -input_mms_open(const char *url, GError **error_r) +input_mms_open(const char *url, + GMutex *mutex, GCond *cond, + GError **error_r) { struct input_mms *m; @@ -56,7 +59,8 @@ input_mms_open(const char *url, GError **error_r) return NULL; m = g_new(struct input_mms, 1); - input_stream_init(&m->base, &input_plugin_mms, url); + input_stream_init(&m->base, &input_plugin_mms, url, + mutex, cond); m->mms = mmsx_connect(NULL, NULL, url, 128 * 1024); if (m->mms == NULL) { diff --git a/src/input/mms_input_plugin.h b/src/input/mms_input_plugin.h index 2e10cfbb9..d6aa593f2 100644 --- a/src/input/mms_input_plugin.h +++ b/src/input/mms_input_plugin.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify diff --git a/src/input/rewind_input_plugin.c b/src/input/rewind_input_plugin.c index f0d533bc8..cf06fc57b 100644 --- a/src/input/rewind_input_plugin.c +++ b/src/input/rewind_input_plugin.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify @@ -19,7 +19,7 @@ #include "config.h" #include "input/rewind_input_plugin.h" -#include "input/curl_input_plugin.h" +#include "input_internal.h" #include "input_plugin.h" #include "tag.h" @@ -107,6 +107,23 @@ input_rewind_close(struct input_stream *is) g_free(r); } +static bool +input_rewind_check(struct input_stream *is, GError **error_r) +{ + struct input_rewind *r = (struct input_rewind *)is; + + return input_stream_check(r->input, error_r); +} + +static void +input_rewind_update(struct input_stream *is) +{ + struct input_rewind *r = (struct input_rewind *)is; + + if (!reading_from_buffer(r)) + copy_attributes(r); +} + static struct tag * input_rewind_tag(struct input_stream *is) { @@ -115,16 +132,12 @@ input_rewind_tag(struct input_stream *is) return input_stream_tag(r->input); } -static int -input_rewind_buffer(struct input_stream *is, GError **error_r) +static bool +input_rewind_available(struct input_stream *is) { struct input_rewind *r = (struct input_rewind *)is; - int ret = input_stream_buffer(r->input, error_r); - if (ret < 0 || !reading_from_buffer(r)) - copy_attributes(r); - - return ret; + return input_stream_available(r->input); } static size_t @@ -212,8 +225,10 @@ input_rewind_seek(struct input_stream *is, goffset offset, int whence, static const struct input_plugin rewind_input_plugin = { .close = input_rewind_close, + .check = input_rewind_check, + .update = input_rewind_update, .tag = input_rewind_tag, - .buffer = input_rewind_buffer, + .available = input_rewind_available, .read = input_rewind_read, .eof = input_rewind_eof, .seek = input_rewind_seek, @@ -232,7 +247,8 @@ input_rewind_open(struct input_stream *is) return is; c = g_new(struct input_rewind, 1); - input_stream_init(&c->base, &rewind_input_plugin, is->uri); + input_stream_init(&c->base, &rewind_input_plugin, is->uri, + is->mutex, is->cond); c->tail = 0; c->input = is; diff --git a/src/input/rewind_input_plugin.h b/src/input/rewind_input_plugin.h index 23d25d94d..83abe257a 100644 --- a/src/input/rewind_input_plugin.h +++ b/src/input/rewind_input_plugin.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify diff --git a/src/input/soup_input_plugin.c b/src/input/soup_input_plugin.c new file mode 100644 index 000000000..52dcec427 --- /dev/null +++ b/src/input/soup_input_plugin.c @@ -0,0 +1,460 @@ +/* + * Copyright (C) 2003-2011 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "config.h" +#include "input/soup_input_plugin.h" +#include "input_internal.h" +#include "input_plugin.h" +#include "io_thread.h" +#include "conf.h" + +#include <libsoup/soup-uri.h> +#include <libsoup/soup-session-async.h> + +#include <assert.h> +#include <string.h> + +#undef G_LOG_DOMAIN +#define G_LOG_DOMAIN "input_soup" + +/** + * Do not buffer more than this number of bytes. It should be a + * reasonable limit that doesn't make low-end machines suffer too + * much, but doesn't cause stuttering on high-latency lines. + */ +static const size_t SOUP_MAX_BUFFERED = 512 * 1024; + +/** + * Resume the stream at this number of bytes after it has been paused. + */ +static const size_t SOUP_RESUME_AT = 384 * 1024; + +static SoupURI *soup_proxy; +static SoupSession *soup_session; + +struct input_soup { + struct input_stream base; + + SoupMessage *msg; + + GQueue *buffers; + + size_t current_consumed; + + size_t total_buffered; + + bool alive, pause, eof; + + /** + * Set when the session callback has been invoked, when it is + * safe to free this object. + */ + bool completed; + + GError *postponed_error; +}; + +static inline GQuark +soup_quark(void) +{ + return g_quark_from_static_string("soup"); +} + +static bool +input_soup_init(const struct config_param *param, GError **error_r) +{ + assert(soup_proxy == NULL); + assert(soup_session == NULL); + + g_type_init(); + + const char *proxy = config_get_block_string(param, "proxy", NULL); + + if (proxy != NULL) { + soup_proxy = soup_uri_new(proxy); + if (soup_proxy == NULL) { + g_set_error(error_r, soup_quark(), 0, + "failed to parse proxy setting"); + return false; + } + } + + soup_session = + soup_session_async_new_with_options(SOUP_SESSION_PROXY_URI, + soup_proxy, + SOUP_SESSION_ASYNC_CONTEXT, + io_thread_context(), + NULL); + + return true; +} + +static void +input_soup_finish(void) +{ + assert(soup_session != NULL); + + soup_session_abort(soup_session); + g_object_unref(G_OBJECT(soup_session)); + + if (soup_proxy != NULL) + soup_uri_free(soup_proxy); +} + +/** + * Copy the error from the SoupMessage object to + * input_soup::postponed_error. + * + * @return true if there was no error + */ +static bool +input_soup_copy_error(struct input_soup *s, const SoupMessage *msg) +{ + if (SOUP_STATUS_IS_SUCCESSFUL(msg->status_code)) + return true; + + if (msg->status_code == SOUP_STATUS_CANCELLED) + /* failure, but don't generate a GError, because this + status was caused by _close() */ + return false; + + if (s->postponed_error != NULL) + /* there's already a GError, don't overwrite it */ + return false; + + if (SOUP_STATUS_IS_TRANSPORT_ERROR(msg->status_code)) + s->postponed_error = + g_error_new(soup_quark(), msg->status_code, + "HTTP client error: %s", + msg->reason_phrase); + else + s->postponed_error = + g_error_new(soup_quark(), msg->status_code, + "got HTTP status: %d %s", + msg->status_code, msg->reason_phrase); + + return false; +} + +static void +input_soup_session_callback(G_GNUC_UNUSED SoupSession *session, + SoupMessage *msg, gpointer user_data) +{ + struct input_soup *s = user_data; + + assert(msg == s->msg); + assert(!s->completed); + + g_mutex_lock(s->base.mutex); + + if (!s->base.ready) + input_soup_copy_error(s, msg); + + s->base.ready = true; + s->alive = false; + s->completed = true; + + g_cond_broadcast(s->base.cond); + g_mutex_unlock(s->base.mutex); +} + +static void +input_soup_got_headers(SoupMessage *msg, gpointer user_data) +{ + struct input_soup *s = user_data; + + g_mutex_lock(s->base.mutex); + + if (!input_soup_copy_error(s, msg)) { + g_mutex_unlock(s->base.mutex); + + soup_session_cancel_message(soup_session, msg, + SOUP_STATUS_CANCELLED); + return; + } + + s->base.ready = true; + g_cond_broadcast(s->base.cond); + g_mutex_unlock(s->base.mutex); + + soup_message_body_set_accumulate(msg->response_body, false); +} + +static void +input_soup_got_chunk(SoupMessage *msg, SoupBuffer *chunk, gpointer user_data) +{ + struct input_soup *s = user_data; + + assert(msg == s->msg); + + g_mutex_lock(s->base.mutex); + + g_queue_push_tail(s->buffers, soup_buffer_copy(chunk)); + s->total_buffered += chunk->length; + + if (s->total_buffered >= SOUP_MAX_BUFFERED && !s->pause) { + s->pause = true; + soup_session_pause_message(soup_session, msg); + } + + g_cond_broadcast(s->base.cond); + g_mutex_unlock(s->base.mutex); +} + +static void +input_soup_got_body(G_GNUC_UNUSED SoupMessage *msg, gpointer user_data) +{ + struct input_soup *s = user_data; + + assert(msg == s->msg); + + g_mutex_lock(s->base.mutex); + + s->base.ready = true; + s->eof = true; + s->alive = false; + + g_cond_broadcast(s->base.cond); + g_mutex_unlock(s->base.mutex); +} + +static bool +input_soup_wait_data(struct input_soup *s) +{ + while (true) { + if (s->eof) + return true; + + if (!s->alive) + return false; + + if (!g_queue_is_empty(s->buffers)) + return true; + + assert(s->current_consumed == 0); + + g_cond_wait(s->base.cond, s->base.mutex); + } +} + +static gpointer +input_soup_queue(gpointer data) +{ + struct input_soup *s = data; + + soup_session_queue_message(soup_session, s->msg, + input_soup_session_callback, s); + + return NULL; +} + +static struct input_stream * +input_soup_open(const char *uri, + GMutex *mutex, GCond *cond, + G_GNUC_UNUSED GError **error_r) +{ + if (strncmp(uri, "http://", 7) != 0) + return NULL; + + struct input_soup *s = g_new(struct input_soup, 1); + input_stream_init(&s->base, &input_plugin_soup, uri, + mutex, cond); + + s->buffers = g_queue_new(); + s->current_consumed = 0; + s->total_buffered = 0; + + s->msg = soup_message_new(SOUP_METHOD_GET, uri); + soup_message_set_flags(s->msg, SOUP_MESSAGE_NO_REDIRECT); + + soup_message_headers_append(s->msg->request_headers, "User-Agent", + "Music Player Daemon " VERSION); + + g_signal_connect(s->msg, "got-headers", + G_CALLBACK(input_soup_got_headers), s); + g_signal_connect(s->msg, "got-chunk", + G_CALLBACK(input_soup_got_chunk), s); + g_signal_connect(s->msg, "got-body", + G_CALLBACK(input_soup_got_body), s); + + s->alive = true; + s->pause = false; + s->eof = false; + s->completed = false; + s->postponed_error = NULL; + + io_thread_call(input_soup_queue, s); + + return &s->base; +} + +static gpointer +input_soup_cancel(gpointer data) +{ + struct input_soup *s = data; + + if (!s->completed) + soup_session_cancel_message(soup_session, s->msg, + SOUP_STATUS_CANCELLED); + + return NULL; +} + +static void +input_soup_close(struct input_stream *is) +{ + struct input_soup *s = (struct input_soup *)is; + + g_mutex_lock(s->base.mutex); + + if (!s->completed) { + /* the messages's session callback hasn't been invoked + yet; cancel it and wait for completion */ + + g_mutex_unlock(s->base.mutex); + + io_thread_call(input_soup_cancel, s); + + g_mutex_lock(s->base.mutex); + while (!s->completed) + g_cond_wait(s->base.cond, s->base.mutex); + } + + g_mutex_unlock(s->base.mutex); + + SoupBuffer *buffer; + while ((buffer = g_queue_pop_head(s->buffers)) != NULL) + soup_buffer_free(buffer); + g_queue_free(s->buffers); + + if (s->postponed_error != NULL) + g_error_free(s->postponed_error); + + input_stream_deinit(&s->base); + g_free(s); +} + +static bool +input_soup_check(struct input_stream *is, GError **error_r) +{ + struct input_soup *s = (struct input_soup *)is; + + bool success = s->postponed_error == NULL; + if (!success) { + g_propagate_error(error_r, s->postponed_error); + s->postponed_error = NULL; + } + + return success; +} + +static bool +input_soup_available(struct input_stream *is) +{ + struct input_soup *s = (struct input_soup *)is; + + return s->eof || !s->alive || !g_queue_is_empty(s->buffers); +} + +static size_t +input_soup_read(struct input_stream *is, void *ptr, size_t size, + G_GNUC_UNUSED GError **error_r) +{ + struct input_soup *s = (struct input_soup *)is; + + if (!input_soup_wait_data(s)) { + assert(!s->alive); + + if (s->postponed_error != NULL) { + g_propagate_error(error_r, s->postponed_error); + s->postponed_error = NULL; + } else + g_set_error_literal(error_r, soup_quark(), 0, + "HTTP failure"); + return 0; + } + + char *p0 = ptr, *p = p0, *p_end = p0 + size; + + while (p < p_end) { + SoupBuffer *buffer = g_queue_pop_head(s->buffers); + if (buffer == NULL) { + assert(s->current_consumed == 0); + break; + } + + assert(s->current_consumed < buffer->length); + assert(s->total_buffered >= buffer->length); + + const char *q = buffer->data; + q += s->current_consumed; + + size_t remaining = buffer->length - s->current_consumed; + size_t nbytes = p_end - p; + if (nbytes > remaining) + nbytes = remaining; + + memcpy(p, q, nbytes); + p += nbytes; + + s->current_consumed += remaining; + if (s->current_consumed >= buffer->length) { + /* done with this buffer */ + s->total_buffered -= buffer->length; + soup_buffer_free(buffer); + s->current_consumed = 0; + } else { + /* partial read */ + assert(p == p_end); + + g_queue_push_head(s->buffers, buffer); + } + } + + if (s->pause && s->total_buffered < SOUP_RESUME_AT) { + s->pause = false; + soup_session_unpause_message(soup_session, s->msg); + } + + size_t nbytes = p - p0; + s->base.offset += nbytes; + + return nbytes; +} + +static bool +input_soup_eof(G_GNUC_UNUSED struct input_stream *is) +{ + struct input_soup *s = (struct input_soup *)is; + + return !s->alive && g_queue_is_empty(s->buffers); +} + +const struct input_plugin input_plugin_soup = { + .name = "soup", + .init = input_soup_init, + .finish = input_soup_finish, + + .open = input_soup_open, + .close = input_soup_close, + .check = input_soup_check, + .available = input_soup_available, + .read = input_soup_read, + .eof = input_soup_eof, +}; diff --git a/src/input/soup_input_plugin.h b/src/input/soup_input_plugin.h new file mode 100644 index 000000000..689b2d971 --- /dev/null +++ b/src/input/soup_input_plugin.h @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2003-2011 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_INPUT_SOUP_H +#define MPD_INPUT_SOUP_H + +extern const struct input_plugin input_plugin_soup; + +#endif diff --git a/src/input_init.c b/src/input_init.c index 1438c3e52..cf5affb4e 100644 --- a/src/input_init.c +++ b/src/input_init.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify @@ -24,6 +24,7 @@ #include "conf.h" #include "glib_compat.h" +#include <assert.h> #include <string.h> static inline GQuark @@ -67,6 +68,11 @@ input_stream_global_init(GError **error_r) for (unsigned i = 0; input_plugins[i] != NULL; ++i) { const struct input_plugin *plugin = input_plugins[i]; + + assert(plugin->name != NULL); + assert(*plugin->name != 0); + assert(plugin->open != NULL); + const struct config_param *param = input_plugin_config(plugin->name, &error); if (param == NULL && error != NULL) { diff --git a/src/input_init.h b/src/input_init.h index eded15fa9..ad92cda08 100644 --- a/src/input_init.h +++ b/src/input_init.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify @@ -28,7 +28,7 @@ /** * Initializes this library and all input_stream implementations. * - * @param error_r location to store the error occuring, or NULL to + * @param error_r location to store the error occurring, or NULL to * ignore errors */ bool diff --git a/src/input_internal.c b/src/input_internal.c new file mode 100644 index 000000000..92a71856e --- /dev/null +++ b/src/input_internal.c @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2003-2011 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "config.h" +#include "input_internal.h" +#include "input_stream.h" + +#include <assert.h> + +void +input_stream_init(struct input_stream *is, const struct input_plugin *plugin, + const char *uri, GMutex *mutex, GCond *cond) +{ + assert(is != NULL); + assert(plugin != NULL); + assert(uri != NULL); + + is->plugin = plugin; + is->uri = g_strdup(uri); + is->mutex = mutex; + is->cond = cond; + is->ready = false; + is->seekable = false; + is->size = -1; + is->offset = 0; + is->mime = NULL; +} + +void +input_stream_deinit(struct input_stream *is) +{ + assert(is != NULL); + assert(is->plugin != NULL); + + g_free(is->uri); + g_free(is->mime); +} + +void +input_stream_signal_client(struct input_stream *is) +{ + if (is->cond != NULL) + g_cond_broadcast(is->cond); +} + +void +input_stream_set_ready(struct input_stream *is) +{ + g_mutex_lock(is->mutex); + + if (!is->ready) { + is->ready = true; + input_stream_signal_client(is); + } + + g_mutex_unlock(is->mutex); +} diff --git a/src/input_internal.h b/src/input_internal.h new file mode 100644 index 000000000..d95142e46 --- /dev/null +++ b/src/input_internal.h @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2003-2011 The Music Player Daemon Project + * http://www.musicpd.org + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef MPD_INPUT_INTERNAL_H +#define MPD_INPUT_INTERNAL_H + +#include "check.h" + +#include <glib.h> + +struct input_stream; +struct input_plugin; + +void +input_stream_init(struct input_stream *is, const struct input_plugin *plugin, + const char *uri, GMutex *mutex, GCond *cond); + +void +input_stream_deinit(struct input_stream *is); + +void +input_stream_signal_client(struct input_stream *is); + +void +input_stream_set_ready(struct input_stream *is); + +#endif diff --git a/src/input_plugin.h b/src/input_plugin.h index 10be48dbb..6b0c77c85 100644 --- a/src/input_plugin.h +++ b/src/input_plugin.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify @@ -35,7 +35,7 @@ struct input_plugin { /** * Global initialization. This method is called when MPD starts. * - * @param error_r location to store the error occuring, or + * @param error_r location to store the error occurring, or * NULL to ignore errors * @return true on success, false if the plugin should be * disabled @@ -48,11 +48,37 @@ struct input_plugin { */ void (*finish)(void); - struct input_stream *(*open)(const char *uri, GError **error_r); + struct input_stream *(*open)(const char *uri, + GMutex *mutex, GCond *cond, + GError **error_r); void (*close)(struct input_stream *is); + /** + * Check for errors that may have occurred in the I/O thread. + * May be unimplemented for synchronous plugins. + * + * @return false on error + */ + bool (*check)(struct input_stream *is, GError **error_r); + + /** + * Update the public attributes. Call before access. Can be + * NULL if the plugin always keeps its attributes up to date. + */ + void (*update)(struct input_stream *is); + struct tag *(*tag)(struct input_stream *is); - int (*buffer)(struct input_stream *is, GError **error_r); + + /** + * Returns true if the next read operation will not block: + * either data is available, or end-of-stream has been + * reached, or an error has occurred. + * + * If this method is unimplemented, then it is assumed that + * reading will never block. + */ + bool (*available)(struct input_stream *is); + size_t (*read)(struct input_stream *is, void *ptr, size_t size, GError **error_r); bool (*eof)(struct input_stream *is); diff --git a/src/input_registry.c b/src/input_registry.c index 0b9b47d10..5987d5da2 100644 --- a/src/input_registry.c +++ b/src/input_registry.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify @@ -29,6 +29,10 @@ #include "input/curl_input_plugin.h" #endif +#ifdef ENABLE_SOUP +#include "input/soup_input_plugin.h" +#endif + #ifdef HAVE_FFMPEG #include "input/ffmpeg_input_plugin.h" #endif @@ -37,6 +41,14 @@ #include "input/mms_input_plugin.h" #endif +#ifdef ENABLE_CDIO_PARANOIA +#include "input/cdio_paranoia_input_plugin.h" +#endif + +#ifdef ENABLE_DESPOTIFY +#include "input/despotify_input_plugin.h" +#endif + #include <glib.h> const struct input_plugin *const input_plugins[] = { @@ -47,12 +59,21 @@ const struct input_plugin *const input_plugins[] = { #ifdef ENABLE_CURL &input_plugin_curl, #endif +#ifdef ENABLE_SOUP + &input_plugin_soup, +#endif #ifdef HAVE_FFMPEG &input_plugin_ffmpeg, #endif #ifdef ENABLE_MMS &input_plugin_mms, #endif +#ifdef ENABLE_CDIO_PARANOIA + &input_plugin_cdio_paranoia, +#endif +#ifdef ENABLE_DESPOTIFY + &input_plugin_despotify, +#endif NULL }; diff --git a/src/input_registry.h b/src/input_registry.h index e85d6be8e..a1b057469 100644 --- a/src/input_registry.h +++ b/src/input_registry.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify diff --git a/src/input_stream.c b/src/input_stream.c index e769adb92..60a1559ba 100644 --- a/src/input_stream.c +++ b/src/input_stream.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify @@ -33,10 +33,13 @@ input_quark(void) } struct input_stream * -input_stream_open(const char *url, GError **error_r) +input_stream_open(const char *url, + GMutex *mutex, GCond *cond, + GError **error_r) { GError *error = NULL; + assert(mutex != NULL); assert(error_r == NULL || *error_r == NULL); for (unsigned i = 0; input_plugins[i] != NULL; ++i) { @@ -46,7 +49,7 @@ input_stream_open(const char *url, GError **error_r) if (!input_plugins_enabled[i]) continue; - is = plugin->open(url, &error); + is = plugin->open(url, mutex, cond, &error); if (is != NULL) { assert(is->plugin != NULL); assert(is->plugin->close != NULL); @@ -64,29 +67,131 @@ input_stream_open(const char *url, GError **error_r) } g_set_error(error_r, input_quark(), 0, "Unrecognized URI"); - return false; + return NULL; +} + +bool +input_stream_check(struct input_stream *is, GError **error_r) +{ + assert(is != NULL); + assert(is->plugin != NULL); + + return is->plugin->check == NULL || + is->plugin->check(is, error_r); +} + +void +input_stream_update(struct input_stream *is) +{ + assert(is != NULL); + assert(is->plugin != NULL); + + if (is->plugin->update != NULL) + is->plugin->update(is); +} + +void +input_stream_wait_ready(struct input_stream *is) +{ + assert(is != NULL); + assert(is->mutex != NULL); + assert(is->cond != NULL); + + while (true) { + input_stream_update(is); + if (is->ready) + break; + + g_cond_wait(is->cond, is->mutex); + } +} + +void +input_stream_lock_wait_ready(struct input_stream *is) +{ + assert(is != NULL); + assert(is->mutex != NULL); + assert(is->cond != NULL); + + g_mutex_lock(is->mutex); + input_stream_wait_ready(is); + g_mutex_unlock(is->mutex); } bool input_stream_seek(struct input_stream *is, goffset offset, int whence, GError **error_r) { + assert(is != NULL); + assert(is->plugin != NULL); + if (is->plugin->seek == NULL) return false; return is->plugin->seek(is, offset, whence, error_r); } +bool +input_stream_lock_seek(struct input_stream *is, goffset offset, int whence, + GError **error_r) +{ + assert(is != NULL); + assert(is->plugin != NULL); + + if (is->plugin->seek == NULL) + return false; + + if (is->mutex == NULL) + /* no locking */ + return input_stream_seek(is, offset, whence, error_r); + + g_mutex_lock(is->mutex); + bool success = input_stream_seek(is, offset, whence, error_r); + g_mutex_unlock(is->mutex); + return success; +} + struct tag * input_stream_tag(struct input_stream *is) { assert(is != NULL); + assert(is->plugin != NULL); return is->plugin->tag != NULL ? is->plugin->tag(is) : NULL; } +struct tag * +input_stream_lock_tag(struct input_stream *is) +{ + assert(is != NULL); + assert(is->plugin != NULL); + + if (is->plugin->tag == NULL) + return false; + + if (is->mutex == NULL) + /* no locking */ + return input_stream_tag(is); + + g_mutex_lock(is->mutex); + struct tag *tag = input_stream_tag(is); + g_mutex_unlock(is->mutex); + return tag; +} + +bool +input_stream_available(struct input_stream *is) +{ + assert(is != NULL); + assert(is->plugin != NULL); + + return is->plugin->available != NULL + ? is->plugin->available(is) + : true; +} + size_t input_stream_read(struct input_stream *is, void *ptr, size_t size, GError **error_r) @@ -97,6 +202,23 @@ input_stream_read(struct input_stream *is, void *ptr, size_t size, return is->plugin->read(is, ptr, size, error_r); } +size_t +input_stream_lock_read(struct input_stream *is, void *ptr, size_t size, + GError **error_r) +{ + assert(ptr != NULL); + assert(size > 0); + + if (is->mutex == NULL) + /* no locking */ + return input_stream_read(is, ptr, size, error_r); + + g_mutex_lock(is->mutex); + size_t nbytes = input_stream_read(is, ptr, size, error_r); + g_mutex_unlock(is->mutex); + return nbytes; +} + void input_stream_close(struct input_stream *is) { is->plugin->close(is); @@ -107,11 +229,19 @@ bool input_stream_eof(struct input_stream *is) return is->plugin->eof(is); } -int -input_stream_buffer(struct input_stream *is, GError **error_r) +bool +input_stream_lock_eof(struct input_stream *is) { - if (is->plugin->buffer == NULL) - return 0; + assert(is != NULL); + assert(is->plugin != NULL); + + if (is->mutex == NULL) + /* no locking */ + return input_stream_eof(is); - return is->plugin->buffer(is, error_r); + g_mutex_lock(is->mutex); + bool eof = input_stream_eof(is); + g_mutex_unlock(is->mutex); + return eof; } + diff --git a/src/input_stream.h b/src/input_stream.h index 056d008a7..6a10831d2 100644 --- a/src/input_stream.h +++ b/src/input_stream.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2003-2010 The Music Player Daemon Project + * Copyright (C) 2003-2011 The Music Player Daemon Project * http://www.musicpd.org * * This program is free software; you can redistribute it and/or modify @@ -21,6 +21,7 @@ #define MPD_INPUT_STREAM_H #include "check.h" +#include "gcc.h" #include <glib.h> @@ -45,6 +46,26 @@ struct input_stream { char *uri; /** + * A mutex that protects the mutable attributes of this object + * and its implementation. It must be locked before calling + * any of the public methods. + * + * This object is allocated by the client, and the client is + * responsible for freeing it. + */ + GMutex *mutex; + + /** + * A cond that gets signalled when the state of this object + * changes from the I/O thread. The client of this object may + * wait on it. Optional, may be NULL. + * + * This object is allocated by the client, and the client is + * responsible for freeing it. + */ + GCond *cond; + + /** * indicates whether the stream is ready for reading and * whether the other attributes in this struct are valid */ @@ -71,88 +92,180 @@ struct input_stream { char *mime; }; +/** + * Opens a new input stream. You may not access it until the "ready" + * flag is set. + * + * @param mutex a mutex that is used to protect this object; must be + * locked before calling any of the public methods + * @param cond a cond that gets signalled when the state of + * this object changes; may be NULL if the caller doesn't want to get + * notifications + * @return an #input_stream object on success, NULL on error + */ +gcc_nonnull(1, 2) +G_GNUC_MALLOC +struct input_stream * +input_stream_open(const char *uri, + GMutex *mutex, GCond *cond, + GError **error_r); + +/** + * Close the input stream and free resources. + * + * The caller must not lock the mutex. + */ +gcc_nonnull(1) +void +input_stream_close(struct input_stream *is); + +gcc_nonnull(1) static inline void -input_stream_init(struct input_stream *is, const struct input_plugin *plugin, - const char *uri) +input_stream_lock(struct input_stream *is) { - is->plugin = plugin; - is->uri = g_strdup(uri); - is->ready = false; - is->seekable = false; - is->size = -1; - is->offset = 0; - is->mime = NULL; + g_mutex_lock(is->mutex); } +gcc_nonnull(1) static inline void -input_stream_deinit(struct input_stream *is) +input_stream_unlock(struct input_stream *is) { - g_free(is->uri); - g_free(is->mime); + g_mutex_unlock(is->mutex); } /** - * Opens a new input stream. You may not access it until the "ready" - * flag is set. + * Check for errors that may have occurred in the I/O thread. * - * @return an #input_stream object on success, NULL on error + * @return false on error */ -struct input_stream * -input_stream_open(const char *uri, GError **error_r); +gcc_nonnull(1) +bool +input_stream_check(struct input_stream *is, GError **error_r); /** - * Close the input stream and free resources. + * Update the public attributes. Call before accessing attributes + * such as "ready" or "offset". */ +gcc_nonnull(1) void -input_stream_close(struct input_stream *is); +input_stream_update(struct input_stream *is); + +/** + * Wait until the stream becomes ready. + * + * The caller must lock the mutex. + */ +gcc_nonnull(1) +void +input_stream_wait_ready(struct input_stream *is); + +/** + * Wrapper for input_stream_wait_locked() which locks and unlocks the + * mutex; the caller must not be holding it already. + */ +gcc_nonnull(1) +void +input_stream_lock_wait_ready(struct input_stream *is); /** * Seeks to the specified position in the stream. This will most * likely fail if the "seekable" flag is false. * + * The caller must lock the mutex. + * * @param is the input_stream object * @param offset the relative offset * @param whence the base of the seek, one of SEEK_SET, SEEK_CUR, SEEK_END */ +gcc_nonnull(1) bool input_stream_seek(struct input_stream *is, goffset offset, int whence, GError **error_r); /** + * Wrapper for input_stream_seek() which locks and unlocks the + * mutex; the caller must not be holding it already. + */ +gcc_nonnull(1) +bool +input_stream_lock_seek(struct input_stream *is, goffset offset, int whence, + GError **error_r); + +/** * Returns true if the stream has reached end-of-file. + * + * The caller must lock the mutex. */ +gcc_nonnull(1) +G_GNUC_PURE bool input_stream_eof(struct input_stream *is); /** + * Wrapper for input_stream_eof() which locks and unlocks the mutex; + * the caller must not be holding it already. + */ +gcc_nonnull(1) +G_GNUC_PURE +bool +input_stream_lock_eof(struct input_stream *is); + +/** * Reads the tag from the stream. * + * The caller must lock the mutex. + * * @return a tag object which must be freed with tag_free(), or NULL * if the tag has not changed since the last call */ +gcc_nonnull(1) +G_GNUC_MALLOC struct tag * input_stream_tag(struct input_stream *is); /** - * Reads some of the stream into its buffer. The following return - * codes are defined: -1 = error, 1 = something was buffered, 0 = - * nothing was buffered. + * Wrapper for input_stream_tag() which locks and unlocks the + * mutex; the caller must not be holding it already. + */ +gcc_nonnull(1) +G_GNUC_MALLOC +struct tag * +input_stream_lock_tag(struct input_stream *is); + +/** + * Returns true if the next read operation will not block: either data + * is available, or end-of-stream has been reached, or an error has + * occurred. * - * The semantics of this function are not well-defined, and it will - * eventually be removed. + * The caller must lock the mutex. */ -int input_stream_buffer(struct input_stream *is, GError **error_r); +gcc_nonnull(1) +G_GNUC_PURE +bool +input_stream_available(struct input_stream *is); /** * Reads data from the stream into the caller-supplied buffer. * Returns 0 on error or eof (check with input_stream_eof()). * + * The caller must lock the mutex. + * * @param is the input_stream object * @param ptr the buffer to read into * @param size the maximum number of bytes to read * @return the number of bytes read */ +gcc_nonnull(1, 2) size_t input_stream_read(struct input_stream *is, void *ptr, size_t size, GError **error_r); +/** + * Wrapper for input_stream_tag() which locks and unlocks the + * mutex; the caller must not be holding it already. + */ +gcc_nonnull(1, 2) +size_t +input_stream_lock_read(struct input_stream *is, void *ptr, size_t size, + GError **error_r); + #endif |