diff options
Diffstat (limited to '')
-rw-r--r-- | src/input/CurlInputPlugin.cxx | 248 |
1 files changed, 138 insertions, 110 deletions
diff --git a/src/input/CurlInputPlugin.cxx b/src/input/CurlInputPlugin.cxx index 800f7b1a3..d17937976 100644 --- a/src/input/CurlInputPlugin.cxx +++ b/src/input/CurlInputPlugin.cxx @@ -25,7 +25,8 @@ #include "ConfigData.hxx" #include "tag/Tag.hxx" #include "IcyMetaDataParser.hxx" -#include "event/MultiSocketMonitor.hxx" +#include "event/SocketMonitor.hxx" +#include "event/TimeoutMonitor.hxx" #include "event/Call.hxx" #include "IOThread.hxx" #include "util/ASCII.hxx" @@ -178,10 +179,63 @@ struct input_curl { input_curl &operator=(const input_curl &) = delete; }; +class CurlMulti; + +/** + * Monitor for one socket created by CURL. + */ +class CurlSocket final : SocketMonitor { + CurlMulti &multi; + +public: + CurlSocket(CurlMulti &_multi, EventLoop &_loop, int _fd) + :SocketMonitor(_fd, _loop), multi(_multi) {} + + ~CurlSocket() { + Abandon(); + } + + /** + * Callback function for CURLMOPT_SOCKETFUNCTION. + */ + static int SocketFunction(CURL *easy, + curl_socket_t s, int action, + void *userp, void *socketp); + + virtual bool OnSocketReady(unsigned flags) override; + +private: + static constexpr int FlagsToCurlCSelect(unsigned flags) { + return (flags & (READ | HANGUP) ? CURL_CSELECT_IN : 0) | + (flags & WRITE ? CURL_CSELECT_OUT : 0) | + (flags & ERROR ? CURL_CSELECT_ERR : 0); + } + + gcc_const + static unsigned CurlPollToFlags(int action) { + switch (action) { + case CURL_POLL_NONE: + return 0; + + case CURL_POLL_IN: + return READ; + + case CURL_POLL_OUT: + return WRITE; + + case CURL_POLL_INOUT: + return READ|WRITE; + } + + assert(false); + gcc_unreachable(); + } +}; + /** * Manager for the global CURLM object. */ -class CurlMulti final : private MultiSocketMonitor { +class CurlMulti final : private TimeoutMonitor { CURLM *const multi; public: @@ -201,20 +255,20 @@ public: */ void ReadInfo(); - /** - * Give control to CURL. - * - * Runs in the I/O thread. The caller must not hold locks. - */ - void Perform(); + void Assign(curl_socket_t fd, CurlSocket &cs) { + curl_multi_assign(multi, fd, &cs); + } - using MultiSocketMonitor::InvalidateSockets; + void SocketAction(curl_socket_t fd, int ev_bitmask); - void UpdateSockets(); + void InvalidateSockets() { + SocketAction(CURL_SOCKET_TIMEOUT, 0); + } private: - virtual int PrepareSockets() override; - virtual void DispatchSockets() override; + static int TimerFunction(CURLM *multi, long timeout_ms, void *userp); + + virtual void OnTimeout() override; }; /** libcurl should accept "ICY 200 OK" */ @@ -231,8 +285,14 @@ static constexpr Domain curl_domain("curl"); static constexpr Domain curlm_domain("curlm"); CurlMulti::CurlMulti(EventLoop &_loop, CURLM *_multi) - :MultiSocketMonitor(_loop), multi(_multi) + :TimeoutMonitor(_loop), multi(_multi) { + curl_multi_setopt(multi, CURLMOPT_SOCKETFUNCTION, + CurlSocket::SocketFunction); + curl_multi_setopt(multi, CURLMOPT_SOCKETDATA, this); + + curl_multi_setopt(multi, CURLMOPT_TIMERFUNCTION, TimerFunction); + curl_multi_setopt(multi, CURLMOPT_TIMERDATA, this); } /** @@ -266,73 +326,50 @@ input_curl_resume(struct input_curl *c) } } -/** - * Calculates the GLib event bit mask for one file descriptor, - * obtained from three #fd_set objects filled by curl_multi_fdset(). - */ -static unsigned -input_curl_fd_events(int fd, fd_set *rfds, fd_set *wfds, fd_set *efds) -{ - unsigned events = 0; +int +CurlSocket::SocketFunction(gcc_unused CURL *easy, + curl_socket_t s, int action, + void *userp, void *socketp) { + CurlMulti &multi = *(CurlMulti *)userp; + CurlSocket *cs = (CurlSocket *)socketp; - if (FD_ISSET(fd, rfds)) { - events |= MultiSocketMonitor::READ | MultiSocketMonitor::HANGUP - | MultiSocketMonitor::ERROR; - FD_CLR(fd, rfds); - } + assert(io_thread_inside()); - if (FD_ISSET(fd, wfds)) { - events |= MultiSocketMonitor::WRITE | - MultiSocketMonitor::ERROR; - FD_CLR(fd, wfds); + if (action == CURL_POLL_REMOVE) { + delete cs; + return 0; } - if (FD_ISSET(fd, efds)) { - events |= MultiSocketMonitor::HANGUP | - MultiSocketMonitor::ERROR; - FD_CLR(fd, efds); + if (cs == nullptr) { + cs = new CurlSocket(multi, io_thread_get(), s); + multi.Assign(s, *cs); + } else { +#ifdef USE_EPOLL + /* when using epoll, we need to unregister the socket + each time this callback is invoked, because older + CURL versions may omit the CURL_POLL_REMOVE call + when the socket has been closed and recreated with + the same file number (bug found in CURL 7.26, CURL + 7.33 not affected); in that case, epoll refuses the + EPOLL_CTL_MOD because it does not know the new + socket yet */ + cs->Cancel(); +#endif } - return events; + unsigned flags = CurlPollToFlags(action); + if (flags != 0) + cs->Schedule(flags); + return 0; } -/** - * Updates all registered GPollFD objects, unregisters old ones, - * registers new ones. - * - * Runs in the I/O thread. No lock needed. - */ -inline void -CurlMulti::UpdateSockets() +bool +CurlSocket::OnSocketReady(unsigned flags) { assert(io_thread_inside()); - fd_set rfds, wfds, efds; - - FD_ZERO(&rfds); - FD_ZERO(&wfds); - FD_ZERO(&efds); - - int max_fd; - CURLMcode mcode = curl_multi_fdset(multi, &rfds, &wfds, - &efds, &max_fd); - if (mcode != CURLM_OK) { - FormatError(curlm_domain, - "curl_multi_fdset() failed: %s", - curl_multi_strerror(mcode)); - return; - } - - UpdateSocketList([&rfds, &wfds, &efds](int fd){ - return input_curl_fd_events(fd, &rfds, - &wfds, &efds); - }); - - for (int fd = 0; fd <= max_fd; ++fd) { - unsigned events = input_curl_fd_events(fd, &rfds, &wfds, &efds); - if (events != 0) - AddSocket(fd, events); - } + multi.SocketAction(Get(), FlagsToCurlCSelect(flags)); + return true; } /** @@ -354,7 +391,6 @@ CurlMulti::Add(struct input_curl *c, Error &error) } InvalidateSockets(); - return true; } @@ -464,6 +500,20 @@ input_curl_handle_done(CURL *easy_handle, CURLcode result) input_curl_request_done(c, result, status); } +void +CurlMulti::SocketAction(curl_socket_t fd, int ev_bitmask) +{ + int running_handles; + CURLMcode mcode = curl_multi_socket_action(multi, fd, ev_bitmask, + &running_handles); + if (mcode != CURLM_OK) + FormatError(curlm_domain, + "curl_multi_socket_action() failed: %s", + curl_multi_strerror(mcode)); + + ReadInfo(); +} + /** * Check for finished HTTP responses. * @@ -484,53 +534,32 @@ CurlMulti::ReadInfo() } } -inline void -CurlMulti::Perform() +int +CurlMulti::TimerFunction(gcc_unused CURLM *_multi, long timeout_ms, void *userp) { - assert(io_thread_inside()); - - CURLMcode mcode; + CurlMulti &multi = *(CurlMulti *)userp; + assert(_multi == multi.multi); - do { - int running_handles; - mcode = curl_multi_perform(multi, &running_handles); - } while (mcode == CURLM_CALL_MULTI_PERFORM); + if (timeout_ms < 0) { + multi.Cancel(); + return 0; + } - if (mcode != CURLM_OK && mcode != CURLM_CALL_MULTI_PERFORM) - FormatError(curlm_domain, - "curl_multi_perform() failed: %s", - curl_multi_strerror(mcode)); -} + if (timeout_ms >= 0 && timeout_ms < 10) + /* CURL 7.21.1 likes to report "timeout=0", which + means we're running in a busy loop. Quite a bad + idea to waste so much CPU. Let's use a lower limit + of 10ms. */ + timeout_ms = 10; -int -CurlMulti::PrepareSockets() -{ - UpdateSockets(); - - long timeout2; - CURLMcode mcode = curl_multi_timeout(multi, &timeout2); - if (mcode == CURLM_OK) { - if (timeout2 >= 0 && timeout2 < 10) - /* CURL 7.21.1 likes to report "timeout=0", - which means we're running in a busy loop. - Quite a bad idea to waste so much CPU. - Let's use a lower limit of 10ms. */ - timeout2 = 10; - - return timeout2; - } else { - FormatWarning(curlm_domain, - "curl_multi_timeout() failed: %s", - curl_multi_strerror(mcode)); - return -1; - } + multi.Schedule(timeout_ms); + return 0; } void -CurlMulti::DispatchSockets() +CurlMulti::OnTimeout() { - Perform(); - ReadInfo(); + SocketAction(CURL_SOCKET_TIMEOUT, 0); } /* @@ -572,7 +601,6 @@ input_curl_init(const config_param ¶m, Error &error) } curl_multi = new CurlMulti(io_thread_get(), multi); - return true; } |