Fix a bunch of threading-related issues

This commit is contained in:
Tamás Bálint Misius 2019-03-14 11:09:24 +01:00 committed by jacob1
parent 263f756a20
commit 5916c9db9c
3 changed files with 75 additions and 35 deletions

View File

@ -129,6 +129,7 @@ namespace http
pthread_mutex_lock(&rm_mutex); pthread_mutex_lock(&rm_mutex);
rm_started = true; rm_started = true;
pthread_mutex_unlock(&rm_mutex); pthread_mutex_unlock(&rm_mutex);
RequestManager::Ref().StartRequest(this);
} }
@ -146,14 +147,15 @@ namespace http
pthread_cond_wait(&done_cv, &rm_mutex); pthread_cond_wait(&done_cv, &rm_mutex);
} }
rm_started = false; rm_started = false;
rm_canceled = true; // signals to RequestManager that the Request can be deleted rm_canceled = true;
ByteString response_out = std::move(response_body);
if (status_out) if (status_out)
{ {
*status_out = status; *status_out = status;
} }
ByteString response_out = std::move(response_body);
pthread_mutex_unlock(&rm_mutex); pthread_mutex_unlock(&rm_mutex);
RequestManager::Ref().RemoveRequest(this);
return response_out; return response_out;
} }
@ -205,6 +207,7 @@ namespace http
pthread_mutex_lock(&rm_mutex); pthread_mutex_lock(&rm_mutex);
rm_canceled = true; rm_canceled = true;
pthread_mutex_unlock(&rm_mutex); pthread_mutex_unlock(&rm_mutex);
RequestManager::Ref().RemoveRequest(this);
} }
ByteString Request::Simple(ByteString uri, int *status, std::map<ByteString, ByteString> post_data) ByteString Request::Simple(ByteString uri, int *status, std::map<ByteString, ByteString> post_data)

View File

@ -13,6 +13,9 @@ namespace http
ByteString user_agent; ByteString user_agent;
RequestManager::RequestManager(): RequestManager::RequestManager():
requests_added_to_multi(0),
requests_to_start(false),
requests_to_remove(false),
rt_shutting_down(false), rt_shutting_down(false),
multi(NULL) multi(NULL)
{ {
@ -67,36 +70,26 @@ namespace http
bool shutting_down = false; bool shutting_down = false;
while (!shutting_down) while (!shutting_down)
{ {
for (Request *request : requests_to_remove)
{
requests.erase(request);
if (multi && request->easy && request->added_to_multi)
{
curl_multi_remove_handle(multi, request->easy);
request->added_to_multi = false;
}
delete request;
}
requests_to_remove.clear();
pthread_mutex_lock(&rt_mutex); pthread_mutex_lock(&rt_mutex);
if (!requests_added_to_multi)
{
while (!rt_shutting_down && requests_to_add.empty() && !requests_to_start && !requests_to_remove)
{
pthread_cond_wait(&rt_cv, &rt_mutex);
}
}
shutting_down = rt_shutting_down; shutting_down = rt_shutting_down;
requests_to_remove = false;
requests_to_start = false;
for (Request *request : requests_to_add) for (Request *request : requests_to_add)
{ {
request->status = 0; request->status = 0;
requests.insert(request); requests.insert(request);
} }
requests_to_add.clear(); requests_to_add.clear();
if (requests.empty())
{
while (!rt_shutting_down && requests_to_add.empty())
{
pthread_cond_wait(&rt_cv, &rt_mutex);
}
}
pthread_mutex_unlock(&rt_mutex); pthread_mutex_unlock(&rt_mutex);
if (multi && !requests.empty()) if (multi && requests_added_to_multi)
{ {
int dontcare; int dontcare;
struct CURLMsg *msg; struct CURLMsg *msg;
@ -153,10 +146,10 @@ namespace http
}; };
} }
std::set<Request *> requests_to_remove;
for (Request *request : requests) for (Request *request : requests)
{ {
pthread_mutex_lock(&request->rm_mutex); pthread_mutex_lock(&request->rm_mutex);
if (shutting_down) if (shutting_down)
{ {
// In the weird case that a http::Request::Simple* call is // In the weird case that a http::Request::Simple* call is
@ -164,25 +157,17 @@ namespace http
// instead of cancelling it ourselves. // instead of cancelling it ourselves.
request->status = 610; request->status = 610;
} }
if (!request->rm_canceled && request->rm_started && !request->added_to_multi && !request->status)
if (request->rm_canceled)
{
requests_to_remove.insert(request);
}
if (!request->rm_canceled && request->rm_started && !request->added_to_multi)
{ {
if (multi && request->easy) if (multi && request->easy)
{ {
curl_multi_add_handle(multi, request->easy); MultiAdd(request);
request->added_to_multi = true;
} }
else else
{ {
request->status = 604; request->status = 604;
} }
} }
if (!request->rm_canceled && request->rm_started && !request->rm_finished) if (!request->rm_canceled && request->rm_started && !request->rm_finished)
{ {
if (multi && request->easy) if (multi && request->easy)
@ -193,12 +178,42 @@ namespace http
if (request->status) if (request->status)
{ {
request->rm_finished = true; request->rm_finished = true;
MultiRemove(request);
pthread_cond_signal(&request->done_cv); pthread_cond_signal(&request->done_cv);
} }
} }
if (request->rm_canceled)
{
requests_to_remove.insert(request);
}
pthread_mutex_unlock(&request->rm_mutex); pthread_mutex_unlock(&request->rm_mutex);
} }
for (Request *request : requests_to_remove)
{
requests.erase(request);
MultiRemove(request);
delete request;
}
}
}
void RequestManager::MultiAdd(Request *request)
{
if (multi && request->easy && !request->added_to_multi)
{
curl_multi_add_handle(multi, request->easy);
request->added_to_multi = true;
++requests_added_to_multi;
}
}
void RequestManager::MultiRemove(Request *request)
{
if (request->added_to_multi)
{
curl_multi_remove_handle(multi, request->easy);
request->added_to_multi = false;
--requests_added_to_multi;
} }
} }
@ -209,4 +224,20 @@ namespace http
pthread_cond_signal(&rt_cv); pthread_cond_signal(&rt_cv);
pthread_mutex_unlock(&rt_mutex); pthread_mutex_unlock(&rt_mutex);
} }
void RequestManager::StartRequest(Request *request)
{
pthread_mutex_lock(&rt_mutex);
requests_to_start = true;
pthread_cond_signal(&rt_cv);
pthread_mutex_unlock(&rt_mutex);
}
void RequestManager::RemoveRequest(Request *request)
{
pthread_mutex_lock(&rt_mutex);
requests_to_remove = true;
pthread_cond_signal(&rt_cv);
pthread_mutex_unlock(&rt_mutex);
}
} }

View File

@ -15,9 +15,11 @@ namespace http
{ {
pthread_t worker_thread; pthread_t worker_thread;
std::set<Request *> requests; std::set<Request *> requests;
int requests_added_to_multi;
std::set<Request *> requests_to_add; std::set<Request *> requests_to_add;
std::set<Request *> requests_to_remove; bool requests_to_start;
bool requests_to_remove;
bool rt_shutting_down; bool rt_shutting_down;
pthread_mutex_t rt_mutex; pthread_mutex_t rt_mutex;
pthread_cond_t rt_cv; pthread_cond_t rt_cv;
@ -26,7 +28,11 @@ namespace http
void Start(); void Start();
void Worker(); void Worker();
void MultiAdd(Request *request);
void MultiRemove(Request *request);
void AddRequest(Request *request); void AddRequest(Request *request);
void StartRequest(Request *request);
void RemoveRequest(Request *request);
static TH_ENTRY_POINT void *RequestManagerHelper(void *obj); static TH_ENTRY_POINT void *RequestManagerHelper(void *obj);