diff options
Diffstat (limited to 'src/input')
-rw-r--r-- | src/input/CurlInputPlugin.cxx | 109 |
1 files changed, 66 insertions, 43 deletions
diff --git a/src/input/CurlInputPlugin.cxx b/src/input/CurlInputPlugin.cxx index 7fe4dab1f..800f7b1a3 100644 --- a/src/input/CurlInputPlugin.cxx +++ b/src/input/CurlInputPlugin.cxx @@ -179,18 +179,40 @@ struct input_curl { }; /** - * This class monitors all CURL file descriptors. + * Manager for the global CURLM object. */ -class CurlSockets final : private MultiSocketMonitor { +class CurlMulti final : private MultiSocketMonitor { + CURLM *const multi; + public: - CurlSockets(EventLoop &_loop) - :MultiSocketMonitor(_loop) {} + CurlMulti(EventLoop &_loop, CURLM *_multi); + + ~CurlMulti() { + curl_multi_cleanup(multi); + } + + bool Add(input_curl *c, Error &error); + void Remove(input_curl *c); + + /** + * Check for finished HTTP responses. + * + * Runs in the I/O thread. The caller must not hold locks. + */ + void ReadInfo(); + + /** + * Give control to CURL. + * + * Runs in the I/O thread. The caller must not hold locks. + */ + void Perform(); using MultiSocketMonitor::InvalidateSockets; -private: void UpdateSockets(); +private: virtual int PrepareSockets() override; virtual void DispatchSockets() override; }; @@ -202,16 +224,17 @@ static struct curl_slist *http_200_aliases; static const char *proxy, *proxy_user, *proxy_password; static unsigned proxy_port; -static struct { - CURLM *multi; - - CurlSockets *sockets; -} curl; +static CurlMulti *curl_multi; static constexpr Domain http_domain("http"); static constexpr Domain curl_domain("curl"); static constexpr Domain curlm_domain("curlm"); +CurlMulti::CurlMulti(EventLoop &_loop, CURLM *_multi) + :MultiSocketMonitor(_loop), multi(_multi) +{ +} + /** * Find a request by its CURL "easy" handle. * @@ -239,7 +262,7 @@ input_curl_resume(struct input_curl *c) if (c->paused) { c->paused = false; curl_easy_pause(c->easy, CURLPAUSE_CONT); - curl.sockets->InvalidateSockets(); + curl_multi->InvalidateSockets(); } } @@ -279,8 +302,8 @@ input_curl_fd_events(int fd, fd_set *rfds, fd_set *wfds, fd_set *efds) * * Runs in the I/O thread. No lock needed. */ -void -CurlSockets::UpdateSockets() +inline void +CurlMulti::UpdateSockets() { assert(io_thread_inside()); @@ -291,7 +314,7 @@ CurlSockets::UpdateSockets() FD_ZERO(&efds); int max_fd; - CURLMcode mcode = curl_multi_fdset(curl.multi, &rfds, &wfds, + CURLMcode mcode = curl_multi_fdset(multi, &rfds, &wfds, &efds, &max_fd); if (mcode != CURLM_OK) { FormatError(curlm_domain, @@ -315,14 +338,14 @@ CurlSockets::UpdateSockets() /** * Runs in the I/O thread. No lock needed. */ -static bool -input_curl_easy_add(struct input_curl *c, Error &error) +inline bool +CurlMulti::Add(struct input_curl *c, Error &error) { assert(io_thread_inside()); assert(c != nullptr); assert(c->easy != nullptr); - CURLMcode mcode = curl_multi_add_handle(curl.multi, c->easy); + CURLMcode mcode = curl_multi_add_handle(multi, c->easy); if (mcode != CURLM_OK) { error.Format(curlm_domain, mcode, "curl_multi_add_handle() failed: %s", @@ -330,7 +353,7 @@ input_curl_easy_add(struct input_curl *c, Error &error) return false; } - curl.sockets->InvalidateSockets(); + InvalidateSockets(); return true; } @@ -347,11 +370,17 @@ input_curl_easy_add_indirect(struct input_curl *c, Error &error) bool result; BlockingCall(io_thread_get(), [c, &error, &result](){ - result = input_curl_easy_add(c, error); + result = curl_multi->Add(c, error); }); return result; } +inline void +CurlMulti::Remove(input_curl *c) +{ + curl_multi_remove_handle(multi, c->easy); +} + /** * Frees the current "libcurl easy" handle, and everything associated * with it. @@ -367,7 +396,8 @@ input_curl_easy_free(struct input_curl *c) if (c->easy == nullptr) return; - curl_multi_remove_handle(curl.multi, c->easy); + curl_multi->Remove(c); + curl_easy_cleanup(c->easy); c->easy = nullptr; @@ -386,7 +416,7 @@ input_curl_easy_free_indirect(struct input_curl *c) { BlockingCall(io_thread_get(), [c](){ input_curl_easy_free(c); - curl.sockets->InvalidateSockets(); + curl_multi->InvalidateSockets(); }); assert(c->easy == nullptr); @@ -439,28 +469,23 @@ input_curl_handle_done(CURL *easy_handle, CURLcode result) * * Runs in the I/O thread. The caller must not hold locks. */ -static void -input_curl_info_read(void) +inline void +CurlMulti::ReadInfo() { assert(io_thread_inside()); CURLMsg *msg; int msgs_in_queue; - while ((msg = curl_multi_info_read(curl.multi, + while ((msg = curl_multi_info_read(multi, &msgs_in_queue)) != nullptr) { if (msg->msg == CURLMSG_DONE) input_curl_handle_done(msg->easy_handle, msg->data.result); } } -/** - * Give control to CURL. - * - * Runs in the I/O thread. The caller must not hold locks. - */ -static void -input_curl_perform(void) +inline void +CurlMulti::Perform() { assert(io_thread_inside()); @@ -468,7 +493,7 @@ input_curl_perform(void) do { int running_handles; - mcode = curl_multi_perform(curl.multi, &running_handles); + mcode = curl_multi_perform(multi, &running_handles); } while (mcode == CURLM_CALL_MULTI_PERFORM); if (mcode != CURLM_OK && mcode != CURLM_CALL_MULTI_PERFORM) @@ -478,12 +503,12 @@ input_curl_perform(void) } int -CurlSockets::PrepareSockets() +CurlMulti::PrepareSockets() { UpdateSockets(); long timeout2; - CURLMcode mcode = curl_multi_timeout(curl.multi, &timeout2); + CURLMcode mcode = curl_multi_timeout(multi, &timeout2); if (mcode == CURLM_OK) { if (timeout2 >= 0 && timeout2 < 10) /* CURL 7.21.1 likes to report "timeout=0", @@ -502,10 +527,10 @@ CurlSockets::PrepareSockets() } void -CurlSockets::DispatchSockets() +CurlMulti::DispatchSockets() { - input_curl_perform(); - input_curl_info_read(); + Perform(); + ReadInfo(); } /* @@ -540,13 +565,13 @@ input_curl_init(const config_param ¶m, Error &error) ""); } - curl.multi = curl_multi_init(); - if (curl.multi == nullptr) { + CURLM *multi = curl_multi_init(); + if (multi == nullptr) { error.Set(curl_domain, 0, "curl_multi_init() failed"); return false; } - curl.sockets = new CurlSockets(io_thread_get()); + curl_multi = new CurlMulti(io_thread_get(), multi); return true; } @@ -555,11 +580,9 @@ static void input_curl_finish(void) { BlockingCall(io_thread_get(), [](){ - delete curl.sockets; + delete curl_multi; }); - curl_multi_cleanup(curl.multi); - curl_slist_free_all(http_200_aliases); curl_global_cleanup(); |