From 5e8f51a9632e63656b7e68e90412374b8508fa30 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Sun, 27 Jan 2013 22:58:07 +0100 Subject: output/httpd: use the BufferedSocket class for HttpdClient --- src/output/HttpdClient.cxx | 437 +++++++++++++-------------------------- src/output/HttpdClient.hxx | 61 ++---- src/output/HttpdOutputPlugin.cxx | 2 +- 3 files changed, 163 insertions(+), 337 deletions(-) (limited to 'src/output') diff --git a/src/output/HttpdClient.cxx b/src/output/HttpdClient.cxx index dd2f44fa0..0a00ee2f9 100644 --- a/src/output/HttpdClient.cxx +++ b/src/output/HttpdClient.cxx @@ -23,7 +23,7 @@ #include "util/fifo_buffer.h" #include "Page.hxx" #include "IcyMetaDataServer.hxx" -#include "glib_socket.h" +#include "SocketError.hxx" #include #include @@ -34,22 +34,15 @@ HttpdClient::~HttpdClient() { if (state == RESPONSE) { - if (write_source_id != 0) - g_source_remove(write_source_id); - if (current_page != nullptr) current_page->Unref(); for (auto page : pages) page->Unref(); - } else - fifo_buffer_free(input); + } if (metadata) metadata->Unref(); - - g_source_remove(read_source_id); - g_io_channel_unref(channel); } void @@ -71,7 +64,6 @@ HttpdClient::BeginResponse() assert(state != RESPONSE); state = RESPONSE; - write_source_id = 0; current_page = nullptr; httpd->SendHeader(*this); @@ -129,31 +121,6 @@ HttpdClient::HandleLine(const char *line) } } -char * -HttpdClient::ReadLine() -{ - assert(state != RESPONSE); - - const ScopeLock protect(httpd->mutex); - - size_t length; - const char *p = (const char *)fifo_buffer_read(input, &length); - if (p == nullptr) - /* empty input buffer */ - return nullptr; - - const char *newline = (const char *)memchr(p, '\n', length); - if (newline == nullptr) - /* incomplete line */ - return nullptr; - - char *line = g_strndup(p, newline - p); - fifo_buffer_consume(input, newline - p + 1); - - /* remove trailing whitespace (e.g. '\r') */ - return g_strchomp(line); -} - /** * Sends the status line and response headers to the client. */ @@ -161,10 +128,6 @@ bool HttpdClient::SendResponse() { char buffer[1024]; - GError *error = nullptr; - GIOStatus status; - gsize bytes_written; - assert(state == RESPONSE); if (dlna_streaming_requested) { @@ -205,141 +168,21 @@ HttpdClient::SendResponse() httpd->content_type); } - status = g_io_channel_write_chars(channel, - buffer, strlen(buffer), - &bytes_written, &error); - - switch (status) { - case G_IO_STATUS_NORMAL: - case G_IO_STATUS_AGAIN: - return true; - - case G_IO_STATUS_EOF: - /* client has disconnected */ - + ssize_t nbytes = SocketMonitor::Write(buffer, strlen(buffer)); + if (gcc_unlikely(nbytes < 0)) { + const SocketErrorMessage msg; + g_warning("failed to write to client: %s", (const char *)msg); Close(); return false; - - case G_IO_STATUS_ERROR: - /* I/O error */ - - g_warning("failed to write to client: %s", error->message); - g_error_free(error); - - Close(); - return false; - } - - /* unreachable */ - Close(); - return false; -} - -bool -HttpdClient::Received() -{ - assert(state != RESPONSE); - - char *line; - bool success; - - while ((line = ReadLine()) != nullptr) { - success = HandleLine(line); - g_free(line); - if (!success) { - assert(state != RESPONSE); - return false; - } - - if (state == RESPONSE) { - if (!fifo_buffer_is_empty(input)) { - g_warning("unexpected input from client"); - return false; - } - - fifo_buffer_free(input); - - return SendResponse(); - } } return true; } -bool -HttpdClient::Read() -{ - size_t max_length; - GError *error = nullptr; - GIOStatus status; - gsize bytes_read; - - if (state == RESPONSE) { - /* the client has already sent the request, and he - must not send more */ - char buffer[1]; - - status = g_io_channel_read_chars(channel, buffer, - sizeof(buffer), &bytes_read, - nullptr); - if (status == G_IO_STATUS_NORMAL) - g_warning("unexpected input from client"); - - return false; - } - - char *p = (char *)fifo_buffer_write(input, &max_length); - if (p == nullptr) { - g_warning("buffer overflow"); - return false; - } - - status = g_io_channel_read_chars(channel, p, max_length, - &bytes_read, &error); - switch (status) { - case G_IO_STATUS_NORMAL: - fifo_buffer_append(input, bytes_read); - return Received(); - - case G_IO_STATUS_AGAIN: - /* try again later, after select() */ - return true; - - case G_IO_STATUS_EOF: - /* peer disconnected */ - return false; - - case G_IO_STATUS_ERROR: - /* I/O error */ - g_warning("failed to read from client: %s", - error->message); - g_error_free(error); - return false; - } - - /* unreachable */ - return false; -} - -static gboolean -httpd_client_in_event(G_GNUC_UNUSED GIOChannel *source, GIOCondition condition, - gpointer data) -{ - HttpdClient *client = (HttpdClient *)data; - - if (condition == G_IO_IN && client->Read()) { - return true; - } else { - client->LockClose(); - return false; - } -} - -HttpdClient::HttpdClient(HttpdOutput *_httpd, int _fd, +HttpdClient::HttpdClient(HttpdOutput *_httpd, int _fd, EventLoop &_loop, bool _metadata_supported) - :httpd(_httpd), - channel(g_io_channel_new_socket(_fd)), - input(fifo_buffer_new(4096)), + :BufferedSocket(_fd, _loop), + httpd(_httpd), state(REQUEST), dlna_streaming_requested(false), metadata_supported(_metadata_supported), @@ -348,16 +191,6 @@ HttpdClient::HttpdClient(HttpdOutput *_httpd, int _fd, metadata(nullptr), metadata_current_position(0), metadata_fill(0) { - /* GLib is responsible for closing the file descriptor */ - g_io_channel_set_close_on_unref(channel, true); - /* NULL encoding means the stream is binary safe */ - g_io_channel_set_encoding(channel, nullptr, nullptr); - /* we prefer to do buffering */ - g_io_channel_set_buffered(channel, false); - - read_source_id = g_io_add_watch(channel, - GIOCondition(G_IO_IN|G_IO_ERR|G_IO_HUP), - httpd_client_in_event, this); } size_t @@ -382,49 +215,27 @@ HttpdClient::CancelQueue() page->Unref(); pages.clear(); - if (write_source_id != 0 && current_page == nullptr) { - g_source_remove(write_source_id); - write_source_id = 0; - } + if (current_page == nullptr) + CancelWrite(); } -static GIOStatus -write_page_to_channel(GIOChannel *channel, - const Page &page, size_t position, - gsize *bytes_written_r, GError **error) +ssize_t +HttpdClient::TryWritePage(const Page &page, size_t position) { - assert(channel != nullptr); assert(position < page.size); - return g_io_channel_write_chars(channel, - (const gchar*)page.data + position, - page.size - position, - bytes_written_r, error); + return Write(page.data + position, page.size - position); } -static GIOStatus -write_n_bytes_to_channel(GIOChannel *channel, const Page &page, - size_t position, gint n, - gsize *bytes_written_r, GError **error) +ssize_t +HttpdClient::TryWritePageN(const Page &page, size_t position, ssize_t n) { - GIOStatus status; - - assert(channel != nullptr); - assert(position < page.size); - - if (n == -1) { - status = write_page_to_channel (channel, page, position, - bytes_written_r, error); - } else { - status = g_io_channel_write_chars(channel, - (const gchar*)page.data + position, - n, bytes_written_r, error); - } - - return status; + return n >= 0 + ? Write(page.data + position, n) + : TryWritePage(page, position); } -int +ssize_t HttpdClient::GetBytesTillMetaData() const { if (metadata_requested && @@ -435,40 +246,47 @@ HttpdClient::GetBytesTillMetaData() const } inline bool -HttpdClient::Write() +HttpdClient::TryWrite() { - GError *error = nullptr; - GIOStatus status; - gsize bytes_written; - const ScopeLock protect(httpd->mutex); assert(state == RESPONSE); - if (write_source_id == 0) - /* another thread has removed the event source while - this thread was waiting for httpd->mutex */ - return false; - if (current_page == nullptr) { + if (pages.empty()) { + /* another thread has removed the event source + while this thread was waiting for + httpd->mutex */ + CancelWrite(); + return true; + } + current_page = pages.front(); pages.pop_front(); current_position = 0; } - const gint bytes_to_write = GetBytesTillMetaData(); + const ssize_t bytes_to_write = GetBytesTillMetaData(); if (bytes_to_write == 0) { - gint metadata_to_write; - - metadata_to_write = metadata_current_position; - if (!metadata_sent) { - status = write_page_to_channel(channel, - *metadata, - metadata_to_write, - &bytes_written, &error); + ssize_t nbytes = TryWritePage(*metadata, + metadata_current_position); + if (nbytes < 0) { + auto e = GetSocketError(); + if (IsSocketErrorAgain(e)) + return true; + + if (!IsSocketErrorClosed(e)) { + SocketErrorMessage msg(e); + g_warning("failed to write to client: %s", + (const char *)msg); + } + + Close(); + return false; + } - metadata_current_position += bytes_written; + metadata_current_position += nbytes; if (metadata->size - metadata_current_position == 0) { metadata_fill = 0; @@ -478,85 +296,62 @@ HttpdClient::Write() } else { guchar empty_data = 0; - Page *empty_meta = Page::Copy(&empty_data, 1); + ssize_t nbytes = Write(&empty_data, 1); + if (nbytes < 0) { + auto e = GetSocketError(); + if (IsSocketErrorAgain(e)) + return true; - status = write_page_to_channel(channel, - *empty_meta, - metadata_to_write, - &bytes_written, &error); + if (!IsSocketErrorClosed(e)) { + SocketErrorMessage msg(e); + g_warning("failed to write to client: %s", + (const char *)msg); + } - metadata_current_position += bytes_written; - - if (empty_meta->size - metadata_current_position == 0) { - metadata_fill = 0; - metadata_current_position = 0; + Close(); + return false; } - empty_meta->Unref(); + metadata_fill = 0; + metadata_current_position = 0; } - - bytes_written = 0; } else { - status = write_n_bytes_to_channel(channel, *current_page, - current_position, bytes_to_write, - &bytes_written, &error); - } + ssize_t nbytes = + TryWritePageN(*current_page, current_position, + bytes_to_write); + if (nbytes < 0) { + auto e = GetSocketError(); + if (IsSocketErrorAgain(e)) + return true; + + if (!IsSocketErrorClosed(e)) { + SocketErrorMessage msg(e); + g_warning("failed to write to client: %s", + (const char *)msg); + } - switch (status) { - case G_IO_STATUS_NORMAL: - current_position += bytes_written; + Close(); + return false; + } + + current_position += nbytes; assert(current_position <= current_page->size); if (metadata_requested) - metadata_fill += bytes_written; + metadata_fill += nbytes; if (current_position >= current_page->size) { current_page->Unref(); current_page = nullptr; - if (pages.empty()) { + if (pages.empty()) /* all pages are sent: remove the event source */ - write_source_id = 0; - - return false; - } + CancelWrite(); } - - return true; - - case G_IO_STATUS_AGAIN: - return true; - - case G_IO_STATUS_EOF: - /* client has disconnected */ - - Close(); - return false; - - case G_IO_STATUS_ERROR: - /* I/O error */ - - g_warning("failed to write to client: %s", error->message); - g_error_free(error); - - Close(); - return false; } - /* unreachable */ - Close(); - return false; -} - -static gboolean -httpd_client_out_event(gcc_unused GIOChannel *source, - gcc_unused GIOCondition condition, gpointer data) -{ - assert(condition == G_IO_OUT); - - HttpdClient *client = (HttpdClient *)data; - return client->Write(); + return true; } void @@ -569,10 +364,7 @@ HttpdClient::PushPage(Page *page) page->Ref(); pages.push_back(page); - if (write_source_id == 0) - write_source_id = g_io_add_watch(channel, G_IO_OUT, - httpd_client_out_event, - this); + ScheduleWrite(); } void @@ -589,3 +381,64 @@ HttpdClient::PushMetaData(Page *page) metadata = page; metadata_sent = false; } + +bool +HttpdClient::OnSocketReady(unsigned flags) +{ + if (!BufferedSocket::OnSocketReady(flags)) + return false; + + if (flags & WRITE) + if (!TryWrite()) + return false; + + return true; +} + +BufferedSocket::InputResult +HttpdClient::OnSocketInput(const void *data, size_t length) +{ + if (state == RESPONSE) { + g_warning("unexpected input from client"); + LockClose(); + return InputResult::CLOSED; + } + + const char *line = (const char *)data; + const char *newline = (const char *)memchr(line, '\n', length); + if (newline == nullptr) + return InputResult::MORE; + + ConsumeInput(newline + 1 - line); + + if (newline > line && newline[-1] == '\r') + --newline; + + /* terminate the string at the end of the line; the const_cast + is a dirty hack */ + *const_cast(newline) = 0; + + if (!HandleLine(line)) { + assert(state == RESPONSE); + LockClose(); + return InputResult::CLOSED; + } + + if (state == RESPONSE && !SendResponse()) + return InputResult::CLOSED; + + return InputResult::AGAIN; +} + +void +HttpdClient::OnSocketError(GError *error) +{ + g_warning("error on HTTP client: %s", error->message); + g_error_free(error); +} + +void +HttpdClient::OnSocketClosed() +{ + LockClose(); +} diff --git a/src/output/HttpdClient.hxx b/src/output/HttpdClient.hxx index 1dd4eead1..1f61d988d 100644 --- a/src/output/HttpdClient.hxx +++ b/src/output/HttpdClient.hxx @@ -20,6 +20,7 @@ #ifndef MPD_OUTPUT_HTTPD_CLIENT_HXX #define MPD_OUTPUT_HTTPD_CLIENT_HXX +#include "event/BufferedSocket.hxx" #include "gcc.h" #include @@ -31,36 +32,12 @@ struct HttpdOutput; class Page; -class HttpdClient final { +class HttpdClient final : public BufferedSocket { /** * The httpd output object this client is connected to. */ HttpdOutput *const httpd; - /** - * The TCP socket. - */ - GIOChannel *channel; - - /** - * The GLib main loop source id for reading from the socket, - * and to detect errors. - */ - guint read_source_id; - - /** - * The GLib main loop source id for writing to the socket. If - * 0, then there is no event source currently (because there - * are no queued pages). - */ - guint write_source_id; - - /** - * For buffered reading. This pointer is only valid while the - * HTTP request is read. - */ - struct fifo_buffer *input; - /** * The current state of the client. */ @@ -140,7 +117,8 @@ public: * @param httpd the HTTP output device * @param fd the socket file descriptor */ - HttpdClient(HttpdOutput *httpd, int _fd, bool _metadata_supported); + HttpdClient(HttpdOutput *httpd, int _fd, EventLoop &_loop, + bool _metadata_supported); /** * Note: this does not remove the client from the @@ -166,21 +144,6 @@ public: */ void CancelQueue(); - bool Read(); - - /** - * Data has been received from the client and it is appended - * to the input buffer. - */ - bool Received(); - - /** - * Check if a complete line of input is present in the input - * buffer, and duplicates it. It is removed from the input - * buffer. The return value has to be freed with g_free(). - */ - char *ReadLine(); - /** * Handle a line of the HTTP request. */ @@ -197,9 +160,12 @@ public: bool SendResponse(); gcc_pure - int GetBytesTillMetaData() const; + ssize_t GetBytesTillMetaData() const; - bool Write(); + ssize_t TryWritePage(const Page &page, size_t position); + ssize_t TryWritePageN(const Page &page, size_t position, ssize_t n); + + bool TryWrite(); /** * Appends a page to the client's queue. @@ -209,7 +175,14 @@ public: /** * Sends the passed metadata. */ -void PushMetaData(Page *page); + void PushMetaData(Page *page); + +protected: + virtual bool OnSocketReady(unsigned flags) override; + virtual InputResult OnSocketInput(const void *data, + size_t length) override; + virtual void OnSocketError(GError *error) override; + virtual void OnSocketClosed() override; }; #endif diff --git a/src/output/HttpdOutputPlugin.cxx b/src/output/HttpdOutputPlugin.cxx index 6c67030cb..cb515e657 100644 --- a/src/output/HttpdOutputPlugin.cxx +++ b/src/output/HttpdOutputPlugin.cxx @@ -189,7 +189,7 @@ httpd_output_finish(struct audio_output *ao) inline void HttpdOutput::AddClient(int fd) { - clients.emplace_front(this, fd, + clients.emplace_front(this, fd, GetEventLoop(), encoder->plugin->tag == NULL); ++clients_cnt; -- cgit v1.2.3