Factor RequestManager threading into Libcurl impl

Because that's the only impl that needs it and is likely to ever need it. I hope I don't have to factor it back out for Android.
This commit is contained in:
Tamás Bálint Misius 2023-05-28 10:29:18 +02:00
parent 7cd88a094c
commit 0cc179ae4e
No account linked to committer's email address
5 changed files with 224 additions and 213 deletions

View File

@ -1,6 +1,7 @@
#include "Request.h"
#include "requestmanager/RequestManager.h"
#include <memory>
#include <iostream>
namespace http
{
@ -97,6 +98,20 @@ namespace http
return { handle->statusCode, std::move(handle->responseData) };
}
void RequestHandle::MarkDone()
{
{
std::lock_guard lk(stateMx);
assert(state == RequestHandle::running);
state = RequestHandle::done;
}
stateCv.notify_one();
if (error.size())
{
std::cerr << error << std::endl;
}
}
std::pair<int, ByteString> Request::Simple(ByteString uri, FormData postData)
{
return SimpleAuth(uri, "", "", postData);

View File

@ -1,7 +1,6 @@
#include "RequestManager.h"
#include "client/http/Request.h"
#include "Config.h"
#include <iostream>
namespace http
{
@ -19,85 +18,22 @@ namespace http
"; ", IDENT,
") TPTPP/", SAVE_VERSION, ".", MINOR_VERSION, ".", BUILD_NUM, IDENT_RELTYPE, ".", SNAPSHOT_ID
);
worker = std::thread([this]() {
Worker();
});
}
RequestManager::~RequestManager()
{
{
std::lock_guard lk(sharedStateMx);
running = false;
}
worker.join();
}
void RequestManager::Worker()
{
InitWorker();
while (true)
{
{
std::lock_guard lk(sharedStateMx);
for (auto &requestHandle : requestHandles)
{
if (requestHandle->statusCode)
{
requestHandlesToUnregister.push_back(requestHandle);
}
}
for (auto &requestHandle : requestHandlesToRegister)
{
requestHandles.push_back(requestHandle);
RegisterRequestHandle(requestHandle);
}
requestHandlesToRegister.clear();
for (auto &requestHandle : requestHandlesToUnregister)
{
auto eraseFrom = std::remove(requestHandles.begin(), requestHandles.end(), requestHandle);
if (eraseFrom != requestHandles.end())
{
assert(eraseFrom + 1 == requestHandles.end());
UnregisterRequestHandle(requestHandle);
requestHandles.erase(eraseFrom, requestHandles.end());
if (requestHandle->error.size())
{
std::cerr << requestHandle->error << std::endl;
}
{
std::lock_guard lk(requestHandle->stateMx);
requestHandle->state = RequestHandle::done;
}
requestHandle->stateCv.notify_one();
}
}
requestHandlesToUnregister.clear();
if (!running)
{
break;
}
}
Tick();
}
assert(!requestHandles.size());
ExitWorker();
}
bool RequestManager::DisableNetwork() const
{
return disableNetwork;
}
void RequestManager::RegisterRequest(Request &request)
{
std::lock_guard lk(sharedStateMx);
requestHandlesToRegister.push_back(request.handle);
if (disableNetwork)
{
request.handle->statusCode = 604;
request.handle->error = "network disabled upon request";
request.handle->MarkDone();
return;
}
RegisterRequestImpl(request);
}
void RequestManager::UnregisterRequest(Request &request)
{
std::lock_guard lk(sharedStateMx);
requestHandlesToUnregister.push_back(request.handle);
UnregisterRequestImpl(request);
}
}

View File

@ -96,36 +96,209 @@ namespace http
{
using RequestManager::RequestManager;
RequestManagerImpl(ByteString newProxy, ByteString newCafile, ByteString newCapath, bool newDisableNetwork);
~RequestManagerImpl();
std::thread worker;
void Worker();
void WorkerInit();
void WorkerPerform();
void WorkerExit();
// State shared between Request threads and the worker thread.
std::vector<std::shared_ptr<RequestHandle>> requestHandlesToRegister;
std::vector<std::shared_ptr<RequestHandle>> requestHandlesToUnregister;
bool running = true;
std::mutex sharedStateMx;
std::vector<std::shared_ptr<RequestHandle>> requestHandles;
void RegisterRequestHandle(std::shared_ptr<RequestHandle> requestHandle);
void UnregisterRequestHandle(std::shared_ptr<RequestHandle> requestHandle);
bool curlGlobalInit = false;
CURLM *curlMulti = NULL;
};
void RequestManager::InitWorker()
RequestManagerImpl::RequestManagerImpl(ByteString newProxy, ByteString newCafile, ByteString newCapath, bool newDisableNetwork) :
RequestManager(newProxy, newCafile, newCapath, newDisableNetwork)
{
worker = std::thread([this]() {
Worker();
});
}
RequestManagerImpl::~RequestManagerImpl()
{
{
std::lock_guard lk(sharedStateMx);
running = false;
}
worker.join();
}
void RequestManagerImpl::WorkerInit()
{
auto manager = static_cast<RequestManagerImpl *>(this);
if (!curl_global_init(CURL_GLOBAL_DEFAULT))
{
manager->curlGlobalInit = true;
manager->curlMulti = curl_multi_init();
if (manager->curlMulti)
curlGlobalInit = true;
curlMulti = curl_multi_init();
if (curlMulti)
{
HandleCURLMcode(curl_multi_setopt(manager->curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, curlMaxHostConnections));
HandleCURLMcode(curl_multi_setopt(curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, curlMaxHostConnections));
#if defined(CURL_AT_LEAST_VERSION) && CURL_AT_LEAST_VERSION(7, 67, 0)
HandleCURLMcode(curl_multi_setopt(manager->curlMulti, CURLMOPT_MAX_CONCURRENT_STREAMS, curlMaxConcurrentStreams));
HandleCURLMcode(curl_multi_setopt(curlMulti, CURLMOPT_MAX_CONCURRENT_STREAMS, curlMaxConcurrentStreams));
#endif
}
}
}
void RequestManager::ExitWorker()
void RequestManagerImpl::WorkerPerform()
{
auto manager = static_cast<RequestManagerImpl *>(this);
curl_multi_cleanup(manager->curlMulti);
manager->curlMulti = NULL;
int dontcare;
HandleCURLMcode(curl_multi_poll(manager->curlMulti, NULL, 0, 1000, &dontcare));
HandleCURLMcode(curl_multi_perform(manager->curlMulti, &dontcare));
while (auto msg = curl_multi_info_read(manager->curlMulti, &dontcare))
{
if (msg->msg == CURLMSG_DONE)
{
RequestHandleHttp *handle;
HandleCURLcode(curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &handle));
handle->statusCode = 600;
switch (msg->data.result)
{
case CURLE_OK:
{
long code;
HandleCURLcode(curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &code));
assert(code);
handle->statusCode = int(code);
}
break;
case CURLE_UNSUPPORTED_PROTOCOL: handle->statusCode = 601; break;
case CURLE_COULDNT_RESOLVE_HOST: handle->statusCode = 602; break;
case CURLE_OPERATION_TIMEDOUT: handle->statusCode = 605; break;
case CURLE_URL_MALFORMAT: handle->statusCode = 606; break;
case CURLE_COULDNT_CONNECT: handle->statusCode = 607; break;
case CURLE_COULDNT_RESOLVE_PROXY: handle->statusCode = 608; break;
case CURLE_TOO_MANY_REDIRECTS: handle->statusCode = 611; break;
case CURLE_SSL_CONNECT_ERROR: handle->statusCode = 612; break;
case CURLE_SSL_ENGINE_NOTFOUND: handle->statusCode = 613; break;
case CURLE_SSL_ENGINE_SETFAILED: handle->statusCode = 614; break;
case CURLE_SSL_CERTPROBLEM: handle->statusCode = 615; break;
case CURLE_SSL_CIPHER: handle->statusCode = 616; break;
case CURLE_SSL_ENGINE_INITFAILED: handle->statusCode = 617; break;
case CURLE_SSL_CACERT_BADFILE: handle->statusCode = 618; break;
case CURLE_SSL_CRL_BADFILE: handle->statusCode = 619; break;
case CURLE_SSL_ISSUER_ERROR: handle->statusCode = 620; break;
case CURLE_SSL_PINNEDPUBKEYNOTMATCH: handle->statusCode = 621; break;
case CURLE_SSL_INVALIDCERTSTATUS: handle->statusCode = 609; break;
case CURLE_HTTP2:
case CURLE_HTTP2_STREAM:
case CURLE_FAILED_INIT:
case CURLE_NOT_BUILT_IN:
default:
break;
}
if (handle->statusCode >= 600)
{
handle->error = handle->curlErrorBuffer;
}
}
}
for (auto &requestHandle : requestHandles)
{
auto handle = static_cast<RequestHandleHttp *>(requestHandle.get());
if (handle->curlEasy)
{
#ifdef REQUEST_USE_CURL_OFFSET_T
curl_off_t total, done;
HandleCURLcode(curl_easy_getinfo(handle->curlEasy, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &total)); // stores -1 if unknown
HandleCURLcode(curl_easy_getinfo(handle->curlEasy, CURLINFO_SIZE_DOWNLOAD_T, &done));
#else
double total, done;
HandleCURLcode(curl_easy_getinfo(handle->curlEasy, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &total)); // stores -1 if unknown
HandleCURLcode(curl_easy_getinfo(handle->curlEasy, CURLINFO_SIZE_DOWNLOAD, &done));
#endif
handle->bytesTotal = int(total);
handle->bytesDone = int(done);
}
else
{
handle->bytesTotal = -1;
handle->bytesDone = 0;
}
}
}
void RequestManagerImpl::WorkerExit()
{
curl_multi_cleanup(curlMulti);
curlMulti = NULL;
curl_global_cleanup();
}
void RequestManager::RegisterRequestHandle(std::shared_ptr<RequestHandle> requestHandle)
void RequestManagerImpl::Worker()
{
WorkerInit();
while (true)
{
{
std::lock_guard lk(sharedStateMx);
for (auto &requestHandle : requestHandles)
{
if (requestHandle->statusCode)
{
requestHandlesToUnregister.push_back(requestHandle);
}
}
for (auto &requestHandle : requestHandlesToRegister)
{
requestHandles.push_back(requestHandle);
RegisterRequestHandle(requestHandle);
}
requestHandlesToRegister.clear();
for (auto &requestHandle : requestHandlesToUnregister)
{
auto eraseFrom = std::remove(requestHandles.begin(), requestHandles.end(), requestHandle);
if (eraseFrom != requestHandles.end())
{
assert(eraseFrom + 1 == requestHandles.end());
UnregisterRequestHandle(requestHandle);
requestHandles.erase(eraseFrom, requestHandles.end());
requestHandle->MarkDone();
}
}
requestHandlesToUnregister.clear();
if (!running)
{
break;
}
}
WorkerPerform();
}
assert(!requestHandles.size());
WorkerExit();
}
void RequestManager::RegisterRequestImpl(Request &request)
{
auto manager = static_cast<RequestManagerImpl *>(this);
std::lock_guard lk(manager->sharedStateMx);
manager->requestHandlesToRegister.push_back(request.handle);
curl_multi_wakeup(manager->curlMulti);
}
void RequestManager::UnregisterRequestImpl(Request &request)
{
auto manager = static_cast<RequestManagerImpl *>(this);
std::lock_guard lk(manager->sharedStateMx);
manager->requestHandlesToUnregister.push_back(request.handle);
curl_multi_wakeup(manager->curlMulti);
}
void RequestManagerImpl::RegisterRequestHandle(std::shared_ptr<RequestHandle> requestHandle)
{
auto manager = static_cast<RequestManagerImpl *>(this);
auto handle = static_cast<RequestHandleHttp *>(requestHandle.get());
@ -133,10 +306,6 @@ namespace http
requestHandle->statusCode = statusCode;
requestHandle->error = error;
};
if (disableNetwork)
{
return failEarly(604, "network disabled upon request");
}
if (!manager->curlGlobalInit)
{
return failEarly(600, "no CURL");
@ -293,7 +462,7 @@ namespace http
handle->curlAddedToMulti = true;
}
void RequestManager::UnregisterRequestHandle(std::shared_ptr<RequestHandle> requestHandle)
void RequestManagerImpl::UnregisterRequestHandle(std::shared_ptr<RequestHandle> requestHandle)
{
auto manager = static_cast<RequestManagerImpl *>(this);
auto handle = static_cast<RequestHandleHttp *>(requestHandle.get());
@ -311,91 +480,6 @@ namespace http
curl_slist_free_all(handle->curlHeaders);
}
void RequestManager::Tick()
{
if (!requestHandles.size())
{
std::this_thread::sleep_for(std::chrono::milliseconds(TickMs));
return;
}
auto manager = static_cast<RequestManagerImpl *>(this);
int dontcare;
HandleCURLMcode(curl_multi_wait(manager->curlMulti, NULL, 0, TickMs, &dontcare));
HandleCURLMcode(curl_multi_perform(manager->curlMulti, &dontcare));
while (auto msg = curl_multi_info_read(manager->curlMulti, &dontcare))
{
if (msg->msg == CURLMSG_DONE)
{
RequestHandleHttp *handle;
HandleCURLcode(curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &handle));
handle->statusCode = 600;
switch (msg->data.result)
{
case CURLE_OK:
{
long code;
HandleCURLcode(curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &code));
assert(code);
handle->statusCode = int(code);
}
break;
case CURLE_UNSUPPORTED_PROTOCOL: handle->statusCode = 601; break;
case CURLE_COULDNT_RESOLVE_HOST: handle->statusCode = 602; break;
case CURLE_OPERATION_TIMEDOUT: handle->statusCode = 605; break;
case CURLE_URL_MALFORMAT: handle->statusCode = 606; break;
case CURLE_COULDNT_CONNECT: handle->statusCode = 607; break;
case CURLE_COULDNT_RESOLVE_PROXY: handle->statusCode = 608; break;
case CURLE_TOO_MANY_REDIRECTS: handle->statusCode = 611; break;
case CURLE_SSL_CONNECT_ERROR: handle->statusCode = 612; break;
case CURLE_SSL_ENGINE_NOTFOUND: handle->statusCode = 613; break;
case CURLE_SSL_ENGINE_SETFAILED: handle->statusCode = 614; break;
case CURLE_SSL_CERTPROBLEM: handle->statusCode = 615; break;
case CURLE_SSL_CIPHER: handle->statusCode = 616; break;
case CURLE_SSL_ENGINE_INITFAILED: handle->statusCode = 617; break;
case CURLE_SSL_CACERT_BADFILE: handle->statusCode = 618; break;
case CURLE_SSL_CRL_BADFILE: handle->statusCode = 619; break;
case CURLE_SSL_ISSUER_ERROR: handle->statusCode = 620; break;
case CURLE_SSL_PINNEDPUBKEYNOTMATCH: handle->statusCode = 621; break;
case CURLE_SSL_INVALIDCERTSTATUS: handle->statusCode = 609; break;
case CURLE_HTTP2:
case CURLE_HTTP2_STREAM:
case CURLE_FAILED_INIT:
case CURLE_NOT_BUILT_IN:
default:
break;
}
if (handle->statusCode >= 600)
{
handle->error = handle->curlErrorBuffer;
}
}
}
for (auto &requestHandle : requestHandles)
{
auto handle = static_cast<RequestHandleHttp *>(requestHandle.get());
if (handle->curlEasy)
{
#ifdef REQUEST_USE_CURL_OFFSET_T
curl_off_t total, done;
HandleCURLcode(curl_easy_getinfo(handle->curlEasy, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &total)); // stores -1 if unknown
HandleCURLcode(curl_easy_getinfo(handle->curlEasy, CURLINFO_SIZE_DOWNLOAD_T, &done));
#else
double total, done;
HandleCURLcode(curl_easy_getinfo(handle->curlEasy, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &total)); // stores -1 if unknown
HandleCURLcode(curl_easy_getinfo(handle->curlEasy, CURLINFO_SIZE_DOWNLOAD, &done));
#endif
handle->bytesTotal = int(total);
handle->bytesDone = int(done);
}
else
{
handle->bytesTotal = -1;
handle->bytesDone = 0;
}
}
}
RequestManagerPtr RequestManager::Create(ByteString newProxy, ByteString newCafile, ByteString newCapath, bool newDisableNetwork)
{
return RequestManagerPtr(new RequestManagerImpl(newProxy, newCafile, newCapath, newDisableNetwork));

View File

@ -8,29 +8,17 @@ namespace http
return std::make_shared<RequestHandle>(CtorTag{});
}
void RequestManager::InitWorker()
void RequestManager::RegisterRequestImpl(Request &request)
{
request.handle->statusCode = 604;
request.handle->error = "network support not compiled in";
request.handle->MarkDone();
}
void RequestManager::ExitWorker()
void RequestManager::UnregisterRequestImpl(Request &request)
{
}
void RequestManager::RegisterRequestHandle(std::shared_ptr<RequestHandle> requestHandle)
{
requestHandle->statusCode = 604;
requestHandle->error = "network support not compiled in";
}
void RequestManager::UnregisterRequestHandle(std::shared_ptr<RequestHandle> requestHandle)
{
}
void RequestManager::Tick()
{
std::this_thread::sleep_for(std::chrono::milliseconds(TickMs));
}
RequestManagerPtr RequestManager::Create(ByteString newProxy, ByteString newCafile, ByteString newCapath, bool newDisableNetwork)
{
return RequestManagerPtr(new RequestManager(newProxy, newCafile, newCapath, newDisableNetwork));

View File

@ -51,6 +51,8 @@ namespace http
RequestHandle(const RequestHandle &) = delete;
RequestHandle &operator =(const RequestHandle &) = delete;
void MarkDone();
static std::shared_ptr<RequestHandle> Create();
};
@ -62,41 +64,27 @@ namespace http
using RequestManagerPtr = std::unique_ptr<RequestManager, RequestManagerDeleter>;
class RequestManager : public ExplicitSingleton<RequestManager>
{
protected:
ByteString proxy;
ByteString cafile;
ByteString capath;
ByteString userAgent;
bool disableNetwork;
std::thread worker;
void InitWorker();
void Worker();
void ExitWorker();
std::vector<std::shared_ptr<RequestHandle>> requestHandles;
void RegisterRequestHandle(std::shared_ptr<RequestHandle> requestHandle);
void UnregisterRequestHandle(std::shared_ptr<RequestHandle> requestHandle);
void Tick();
// State shared between Request threads and the worker thread.
std::vector<std::shared_ptr<RequestHandle>> requestHandlesToRegister;
std::vector<std::shared_ptr<RequestHandle>> requestHandlesToUnregister;
bool running = true;
std::mutex sharedStateMx;
protected:
RequestManager(ByteString newProxy, ByteString newCafile, ByteString newCapath, bool newDisableNetwork);
public:
~RequestManager();
void RegisterRequestImpl(Request &request);
void UnregisterRequestImpl(Request &request);
public:
void RegisterRequest(Request &request);
void UnregisterRequest(Request &request);
bool DisableNetwork() const;
bool DisableNetwork() const
{
return disableNetwork;
}
static RequestManagerPtr Create(ByteString newProxy, ByteString newCafile, ByteString newCapath, bool newDisableNetwork);
};
constexpr int TickMs = 100;
}