Refactor for RequestManager
Reduces request latency and should make a WebAssembly port easier.
This commit is contained in:
parent
6d4f6218a4
commit
845836c4f9
@ -20,7 +20,7 @@ namespace http
|
||||
") TPTPP/", SAVE_VERSION, ".", MINOR_VERSION, ".", BUILD_NUM, IDENT_RELTYPE, ".", SNAPSHOT_ID
|
||||
);
|
||||
worker = std::thread([this]() {
|
||||
Worker();
|
||||
Run();
|
||||
});
|
||||
}
|
||||
|
||||
@ -30,58 +30,64 @@ namespace http
|
||||
std::lock_guard lk(sharedStateMx);
|
||||
running = false;
|
||||
}
|
||||
sharedStateCv.notify_all();
|
||||
worker.join();
|
||||
}
|
||||
|
||||
void RequestManager::Worker()
|
||||
bool RequestManager::ProcessEvents(bool shouldWait)
|
||||
{
|
||||
InitWorker();
|
||||
while (true)
|
||||
std::unique_lock lk(sharedStateMx);
|
||||
if (shouldWait)
|
||||
{
|
||||
{
|
||||
std::lock_guard lk(sharedStateMx);
|
||||
for (auto &requestHandle : requestHandles)
|
||||
{
|
||||
if (requestHandle->statusCode)
|
||||
{
|
||||
requestHandlesToUnregister.push_back(requestHandle);
|
||||
}
|
||||
}
|
||||
for (auto &requestHandle : requestHandlesToRegister)
|
||||
{
|
||||
requestHandles.push_back(requestHandle);
|
||||
RegisterRequestHandle(requestHandle);
|
||||
}
|
||||
requestHandlesToRegister.clear();
|
||||
for (auto &requestHandle : requestHandlesToUnregister)
|
||||
{
|
||||
auto eraseFrom = std::remove(requestHandles.begin(), requestHandles.end(), requestHandle);
|
||||
if (eraseFrom != requestHandles.end())
|
||||
{
|
||||
assert(eraseFrom + 1 == requestHandles.end());
|
||||
UnregisterRequestHandle(requestHandle);
|
||||
requestHandles.erase(eraseFrom, requestHandles.end());
|
||||
if (requestHandle->error.size())
|
||||
{
|
||||
std::cerr << requestHandle->error << std::endl;
|
||||
}
|
||||
{
|
||||
std::lock_guard lk(requestHandle->stateMx);
|
||||
requestHandle->state = RequestHandle::done;
|
||||
}
|
||||
requestHandle->stateCv.notify_one();
|
||||
}
|
||||
}
|
||||
requestHandlesToUnregister.clear();
|
||||
if (!running)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
Tick();
|
||||
sharedStateCv.wait(lk);
|
||||
}
|
||||
assert(!requestHandles.size());
|
||||
ExitWorker();
|
||||
|
||||
for (auto &requestHandle : requestHandlesToRegister)
|
||||
{
|
||||
requestHandles.push_back(requestHandle);
|
||||
RegisterRequestHandle(requestHandle);
|
||||
}
|
||||
requestHandlesToRegister.clear();
|
||||
for (auto &requestHandle : requestHandlesToUnregister)
|
||||
{
|
||||
RequestDone(requestHandle);
|
||||
}
|
||||
requestHandlesToUnregister.clear();
|
||||
|
||||
return running;
|
||||
}
|
||||
|
||||
void RequestManager::RequestDone(std::shared_ptr<RequestHandle> &requestHandle)
|
||||
{
|
||||
auto toRemove = std::find(requestHandles.begin(), requestHandles.end(), requestHandle);
|
||||
if (toRemove != requestHandles.end()) RemoveRequest(toRemove);
|
||||
}
|
||||
|
||||
void RequestManager::RequestDone(RequestHandle *handle)
|
||||
{
|
||||
auto toRemove = std::find_if(requestHandles.begin(), requestHandles.end(), [handle] (const std::shared_ptr<RequestHandle> &sptr) {
|
||||
return handle == sptr.get();
|
||||
});
|
||||
RemoveRequest(toRemove);
|
||||
}
|
||||
|
||||
void RequestManager::RemoveRequest(std::vector<std::shared_ptr<RequestHandle>>::iterator toRemove) {
|
||||
assert(toRemove != requestHandles.end());
|
||||
// swap removed request to end before removing
|
||||
auto swapTo = requestHandles.end() - 1;
|
||||
std::swap(*toRemove, *swapTo);
|
||||
auto requestHandle = *swapTo;
|
||||
UnregisterRequestHandle(requestHandle);
|
||||
requestHandles.erase(swapTo, requestHandles.end());
|
||||
if (!requestHandle->error.empty())
|
||||
{
|
||||
std::cerr << requestHandle->error << std::endl;
|
||||
}
|
||||
{
|
||||
std::lock_guard handle_lock(requestHandle->stateMx);
|
||||
requestHandle->state = RequestHandle::done;
|
||||
}
|
||||
requestHandle->stateCv.notify_one();
|
||||
}
|
||||
|
||||
bool RequestManager::DisableNetwork() const
|
||||
@ -91,13 +97,19 @@ namespace http
|
||||
|
||||
void RequestManager::RegisterRequest(Request &request)
|
||||
{
|
||||
std::lock_guard lk(sharedStateMx);
|
||||
requestHandlesToRegister.push_back(request.handle);
|
||||
{
|
||||
std::lock_guard lk(sharedStateMx);
|
||||
requestHandlesToRegister.push_back(request.handle);
|
||||
}
|
||||
sharedStateCv.notify_one();
|
||||
}
|
||||
|
||||
void RequestManager::UnregisterRequest(Request &request)
|
||||
{
|
||||
std::lock_guard lk(sharedStateMx);
|
||||
requestHandlesToUnregister.push_back(request.handle);
|
||||
{
|
||||
std::lock_guard lk(sharedStateMx);
|
||||
requestHandlesToUnregister.push_back(request.handle);
|
||||
}
|
||||
sharedStateCv.notify_one();
|
||||
}
|
||||
}
|
||||
|
@ -71,7 +71,7 @@ namespace http
|
||||
{
|
||||
if (bytes > 2) // Don't include header list terminator (but include the status line).
|
||||
{
|
||||
handle->responseHeaders.push_back(ByteString(ptr, ptr + bytes - 2));
|
||||
handle->responseHeaders.emplace_back(ptr, ptr + bytes - 2);
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
@ -100,6 +100,19 @@ namespace http
|
||||
CURLM *curlMulti = NULL;
|
||||
};
|
||||
|
||||
void RequestManager::Run()
|
||||
{
|
||||
InitWorker();
|
||||
while (true)
|
||||
{
|
||||
bool shouldWait = requestHandles.empty();
|
||||
if (!ProcessEvents(shouldWait)) break;
|
||||
Tick();
|
||||
}
|
||||
assert(requestHandles.empty());
|
||||
ExitWorker();
|
||||
}
|
||||
|
||||
void RequestManager::InitWorker()
|
||||
{
|
||||
auto manager = static_cast<RequestManagerImpl *>(this);
|
||||
@ -306,11 +319,6 @@ namespace http
|
||||
|
||||
void RequestManager::Tick()
|
||||
{
|
||||
if (!requestHandles.size())
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(TickMs));
|
||||
return;
|
||||
}
|
||||
auto manager = static_cast<RequestManagerImpl *>(this);
|
||||
int dontcare;
|
||||
HandleCURLMcode(curl_multi_wait(manager->curlMulti, NULL, 0, TickMs, &dontcare));
|
||||
@ -362,6 +370,8 @@ namespace http
|
||||
{
|
||||
handle->error = handle->curlErrorBuffer;
|
||||
}
|
||||
|
||||
manager->RequestDone(handle);
|
||||
}
|
||||
}
|
||||
for (auto &requestHandle : requestHandles)
|
||||
|
@ -8,6 +8,15 @@ namespace http
|
||||
return std::make_shared<RequestHandle>(CtorTag{});
|
||||
}
|
||||
|
||||
void RequestManager::Run()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
bool shouldWait = requestHandles.empty();
|
||||
if (!ProcessEvents(shouldWait)) break;
|
||||
}
|
||||
}
|
||||
|
||||
void RequestManager::InitWorker()
|
||||
{
|
||||
}
|
||||
@ -20,6 +29,7 @@ namespace http
|
||||
{
|
||||
requestHandle->statusCode = 604;
|
||||
requestHandle->error = "network support not compiled in";
|
||||
RequestDone(requestHandle);
|
||||
}
|
||||
|
||||
void RequestManager::UnregisterRequestHandle(std::shared_ptr<RequestHandle> requestHandle)
|
||||
@ -28,7 +38,6 @@ namespace http
|
||||
|
||||
void RequestManager::Tick()
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(TickMs));
|
||||
}
|
||||
|
||||
RequestManagerPtr RequestManager::Create(ByteString newProxy, ByteString newCafile, ByteString newCapath, bool newDisableNetwork)
|
||||
|
@ -69,8 +69,9 @@ namespace http
|
||||
bool disableNetwork;
|
||||
|
||||
std::thread worker;
|
||||
void Run();
|
||||
void InitWorker();
|
||||
void Worker();
|
||||
bool ProcessEvents(bool shouldWait);
|
||||
void ExitWorker();
|
||||
|
||||
std::vector<std::shared_ptr<RequestHandle>> requestHandles;
|
||||
@ -83,6 +84,13 @@ namespace http
|
||||
std::vector<std::shared_ptr<RequestHandle>> requestHandlesToUnregister;
|
||||
bool running = true;
|
||||
std::mutex sharedStateMx;
|
||||
std::condition_variable sharedStateCv;
|
||||
|
||||
void RequestDone(std::shared_ptr<RequestHandle> &requestHandle);
|
||||
void RequestDone(RequestHandle *handle);
|
||||
|
||||
// Removes one request
|
||||
void RemoveRequest(std::vector<std::shared_ptr<RequestHandle>>::iterator toRemove);
|
||||
|
||||
protected:
|
||||
RequestManager(ByteString newProxy, ByteString newCafile, ByteString newCapath, bool newDisableNetwork);
|
||||
@ -96,7 +104,8 @@ namespace http
|
||||
bool DisableNetwork() const;
|
||||
|
||||
static RequestManagerPtr Create(ByteString newProxy, ByteString newCafile, ByteString newCapath, bool newDisableNetwork);
|
||||
|
||||
};
|
||||
|
||||
constexpr int TickMs = 100;
|
||||
constexpr int TickMs = 25;
|
||||
}
|
||||
|
Reference in New Issue
Block a user