Add curl, rework Request and RequestManager a bit

This commit is contained in:
Tamás Bálint Misius 2019-03-13 20:57:31 +01:00 committed by jacob1
parent 3d22c1ed4e
commit fc8740f7d5
8 changed files with 547 additions and 1805 deletions

View File

@ -332,6 +332,10 @@ def findLibs(env, conf):
if not conf.CheckLib(['z', 'zlib']): if not conf.CheckLib(['z', 'zlib']):
FatalError("libz not found or not installed") FatalError("libz not found or not installed")
#Look for libcurl
if not conf.CheckLib(['curl', 'libcurl']):
FatalError("libcurl not found or not installed")
#Look for pthreads #Look for pthreads
if not conf.CheckLib(['pthread', 'pthreadVC2']): if not conf.CheckLib(['pthread', 'pthreadVC2']):
FatalError("pthreads development library not found or not installed") FatalError("pthreads development library not found or not installed")

File diff suppressed because it is too large Load Diff

View File

@ -1,54 +0,0 @@
/**
* Powder Toy - HTTP Library (Header)
*
* Copyright (c) 2008 - 2010 Stanislaw Skowronek.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
*/
#ifndef HTTP_H
#define HTTP_H
#include <map>
#include "common/String.h"
static const char hexChars[] = "0123456789abcdef";
static const long http_timeout = 15;
void http_init(char *proxy);
void http_done(void);
char *http_simple_get(const char *uri, int *ret, int *len);
char *http_auth_get(const char *uri, const char *user, const char *pass, const char *session_id, int *ret, int *len);
char *http_simple_post(const char *uri, const char *data, int dlen, int *ret, int *len);
void http_auth_headers(void *ctx, const char *user, const char *pass, const char *session_id);
void *http_async_req_start(void *ctx, const char *uri, const char *data, int dlen, int keep);
void http_async_add_header(void *ctx, const char *name, const char *data);
int http_async_req_status(void *ctx);
void http_async_get_length(void *ctx, int *total, int *done);
char *http_async_req_stop(void *ctx, int *ret, int *len);
void http_async_req_close(void *ctx);
void http_force_close(void *ctx);
ByteString FindBoundary(std::map<ByteString, ByteString>, ByteString boundary);
ByteString GetMultipartMessage(std::map<ByteString, ByteString>, ByteString boundary);
void http_add_multipart_header(void *ctx, ByteString boundary);
char *http_multipart_post(const char *uri, const char *const *names, const char *const *parts, size_t *plens, const char *user, const char *pass, const char * session_id, int *ret, int *len);
void *http_multipart_post_async(const char *uri, const char *const *names, const char *const *parts, int *plens, const char *user, const char *pass, const char *session_id);
const char *http_ret_text(int ret);
#endif

View File

@ -1,174 +1,300 @@
#include <cstdlib>
#include "Request.h" #include "Request.h"
#include "RequestManager.h" #include "RequestManager.h"
#include "HTTP.h"
#include "Platform.h" #include "Platform.h"
namespace http namespace http
{ {
Request::Request(ByteString uri_, bool keepAlive): Request::Request(ByteString uri_):
http(NULL), uri(uri_),
keepAlive(keepAlive), rm_total(0),
requestData(NULL), rm_done(0),
requestSize(0), rm_finished(false),
requestStatus(0), rm_canceled(false),
postData(""), rm_started(false),
postDataBoundary(""), added_to_multi(false),
userID(""), status(0),
userSession(""), headers(NULL),
requestFinished(false), post_fields(NULL)
requestCanceled(false),
requestStarted(false)
{
uri = ByteString(uri_);
RequestManager::Ref().AddRequest(this);
}
// called by request thread itself if request was canceled
Request::~Request()
{
if (http && (keepAlive || requestCanceled))
http_async_req_close(http);
if (requestData)
free(requestData);
}
// add post data to a request
void Request::AddPostData(std::map<ByteString, ByteString> data)
{
postDataBoundary = FindBoundary(data, "");
postData = GetMultipartMessage(data, postDataBoundary);
}
void Request::AddPostData(std::pair<ByteString, ByteString> data)
{
std::map<ByteString, ByteString> postData;
postData.insert(data);
AddPostData(postData);
}
// add userID and sessionID headers to the request. Must be done after request starts for some reason
void Request::AuthHeaders(ByteString ID, ByteString session)
{
if (ID != "0")
userID = ID;
userSession = session;
}
// start the request thread
void Request::Start()
{
if (CheckStarted() || CheckDone())
return;
http = http_async_req_start(http, uri.c_str(), postData.c_str(), postData.length(), keepAlive ? 1 : 0);
// add the necessary headers
if (userID.length() || userSession.length())
http_auth_headers(http, userID.c_str(), NULL, userSession.c_str());
if (postDataBoundary.length())
http_add_multipart_header(http, postDataBoundary);
RequestManager::Ref().Lock();
requestStarted = true;
RequestManager::Ref().Unlock();
}
// finish the request (if called before the request is done, this will block)
ByteString Request::Finish(int *status)
{
if (CheckCanceled())
return ""; // shouldn't happen but just in case
while (!CheckDone()); // block
RequestManager::Ref().Lock();
requestStarted = false;
if (status)
*status = requestStatus;
ByteString ret;
if (requestData)
{ {
ret = ByteString(requestData, requestData + requestSize); pthread_cond_init(&done_cv, NULL);
free(requestData); pthread_mutex_init(&rm_mutex, NULL);
easy = curl_easy_init();
RequestManager::Ref().AddRequest(this);
} }
requestData = NULL;
if (!keepAlive)
requestCanceled = true;
RequestManager::Ref().Unlock();
return ret;
}
// returns the request size and progress (if the request has the correct length headers) Request::~Request()
void Request::CheckProgress(int *total, int *done)
{
RequestManager::Ref().Lock();
if (!requestFinished && http)
http_async_get_length(http, total, done);
else
*total = *done = 0;
RequestManager::Ref().Unlock();
}
// returns true if the request has finished
bool Request::CheckDone()
{
RequestManager::Ref().Lock();
bool ret = requestFinished;
RequestManager::Ref().Unlock();
return ret;
}
// returns true if the request was canceled
bool Request::CheckCanceled()
{
RequestManager::Ref().Lock();
bool ret = requestCanceled;
RequestManager::Ref().Unlock();
return ret;
}
// returns true if the request is running
bool Request::CheckStarted()
{
RequestManager::Ref().Lock();
bool ret = requestStarted;
RequestManager::Ref().Unlock();
return ret;
}
// cancels the request, the request thread will delete the Request* when it finishes (do not use Request in any way after canceling)
void Request::Cancel()
{
RequestManager::Ref().Lock();
requestCanceled = true;
RequestManager::Ref().Unlock();
}
ByteString Request::Simple(ByteString uri, int *status, std::map<ByteString, ByteString> post_data)
{
Request *request = new Request(uri);
request->AddPostData(post_data);
request->Start();
while(!request->CheckDone())
{ {
Platform::Millisleep(1); curl_easy_cleanup(easy);
curl_mime_free(post_fields);
curl_slist_free_all(headers);
pthread_mutex_destroy(&rm_mutex);
pthread_cond_destroy(&done_cv);
} }
return request->Finish(status);
}
ByteString Request::SimpleAuth(ByteString uri, int *status, ByteString ID, ByteString session, std::map<ByteString, ByteString> post_data) void Request::AddHeader(ByteString name, ByteString value)
{
Request *request = new Request(uri);
request->AddPostData(post_data);
request->AuthHeaders(ID, session);
request->Start();
while(!request->CheckDone())
{ {
Platform::Millisleep(1); headers = curl_slist_append(headers, (name + ": " + value).c_str());
}
// add post data to a request
void Request::AddPostData(std::map<ByteString, ByteString> data)
{
if (!data.size())
{
return;
}
if (easy)
{
if (!post_fields)
{
post_fields = curl_mime_init(easy);
}
for (auto &field : data)
{
curl_mimepart *part = curl_mime_addpart(post_fields);
curl_mime_data(part, &field.second[0], field.second.size());
if (auto split = field.first.SplitBy(':'))
{
curl_mime_name(part, split.Before().c_str());
curl_mime_filename(part, split.After().c_str());
}
else
{
curl_mime_name(part, field.first.c_str());
}
}
}
}
// add userID and sessionID headers to the request
void Request::AuthHeaders(ByteString ID, ByteString session)
{
if (ID.size())
{
if (session.size())
{
AddHeader("X-Auth-User-Id", ID);
AddHeader("X-Auth-Session-Key", session);
}
else
{
AddHeader("X-Auth-User", ID);
}
}
}
// start the request thread
void Request::Start()
{
if (CheckStarted() || CheckDone())
{
return;
}
if (easy)
{
if (post_fields)
{
curl_easy_setopt(easy, CURLOPT_MIMEPOST, post_fields);
}
else
{
curl_easy_setopt(easy, CURLOPT_HTTPGET, 1);
}
curl_easy_setopt(easy, CURLOPT_TIMEOUT, timeout);
curl_easy_setopt(easy, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(easy, CURLOPT_URL, uri.c_str());
if (proxy.size())
{
curl_easy_setopt(easy, CURLOPT_PROXY, proxy.c_str());
}
curl_easy_setopt(easy, CURLOPT_PRIVATE, this);
curl_easy_setopt(easy, CURLOPT_USERAGENT, user_agent.c_str());
curl_easy_setopt(easy, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(easy, CURLOPT_WRITEDATA, this);
curl_easy_setopt(easy, CURLOPT_WRITEFUNCTION, (size_t (*)(char *ptr, size_t size, size_t count, void *userdata))([](char *ptr, size_t size, size_t count, void *userdata) -> size_t {
Request *req = (Request *)userdata;
auto actual_size = size * count;
req->response_body.append(ptr, actual_size);
return actual_size;
})); // curl_easy_setopt does something really ugly with parameters; I have to cast the lambda explicitly to the right kind of function pointer for some reason
}
pthread_mutex_lock(&rm_mutex);
rm_started = true;
pthread_mutex_unlock(&rm_mutex);
}
// finish the request (if called before the request is done, this will block)
ByteString Request::Finish(int *status_out)
{
if (CheckCanceled())
{
return ""; // shouldn't happen but just in case
}
pthread_mutex_lock(&rm_mutex);
while (!rm_finished)
{
pthread_cond_wait(&done_cv, &rm_mutex);
}
rm_started = false;
rm_canceled = true; // signals to RequestManager that the Request can be deleted
ByteString response_out = std::move(response_body);
if (status_out)
{
*status_out = status;
}
pthread_mutex_unlock(&rm_mutex);
return response_out;
}
void Request::CheckProgress(int *total, int *done)
{
pthread_mutex_lock(&rm_mutex);
if (total)
{
*total = rm_total;
}
if (done)
{
*done = rm_done;
}
pthread_mutex_unlock(&rm_mutex);
}
// returns true if the request has finished
bool Request::CheckDone()
{
pthread_mutex_lock(&rm_mutex);
bool ret = rm_finished;
pthread_mutex_unlock(&rm_mutex);
return ret;
}
// returns true if the request was canceled
bool Request::CheckCanceled()
{
pthread_mutex_lock(&rm_mutex);
bool ret = rm_canceled;
pthread_mutex_unlock(&rm_mutex);
return ret;
}
// returns true if the request is running
bool Request::CheckStarted()
{
pthread_mutex_lock(&rm_mutex);
bool ret = rm_started;
pthread_mutex_unlock(&rm_mutex);
return ret;
}
// cancels the request, the request thread will delete the Request* when it finishes (do not use Request in any way after canceling)
void Request::Cancel()
{
pthread_mutex_lock(&rm_mutex);
rm_canceled = true;
pthread_mutex_unlock(&rm_mutex);
}
ByteString Request::Simple(ByteString uri, int *status, std::map<ByteString, ByteString> post_data)
{
return SimpleAuth(uri, status, "", "", post_data);
}
ByteString Request::SimpleAuth(ByteString uri, int *status, ByteString ID, ByteString session, std::map<ByteString, ByteString> post_data)
{
Request *request = new Request(uri);
request->AddPostData(post_data);
request->AuthHeaders(ID, session);
request->Start();
return request->Finish(status);
}
const char *StatusText(int ret)
{
switch (ret)
{
case 0: return "Status code 0 (bug?)";
case 100: return "Continue";
case 101: return "Switching Protocols";
case 102: return "Processing";
case 200: return "OK";
case 201: return "Created";
case 202: return "Accepted";
case 203: return "Non-Authoritative Information";
case 204: return "No Content";
case 205: return "Reset Content";
case 206: return "Partial Content";
case 207: return "Multi-Status";
case 300: return "Multiple Choices";
case 301: return "Moved Permanently";
case 302: return "Found";
case 303: return "See Other";
case 304: return "Not Modified";
case 305: return "Use Proxy";
case 306: return "Switch Proxy";
case 307: return "Temporary Redirect";
case 400: return "Bad Request";
case 401: return "Unauthorized";
case 402: return "Payment Required";
case 403: return "Forbidden";
case 404: return "Not Found";
case 405: return "Method Not Allowed";
case 406: return "Not Acceptable";
case 407: return "Proxy Authentication Required";
case 408: return "Request Timeout";
case 409: return "Conflict";
case 410: return "Gone";
case 411: return "Length Required";
case 412: return "Precondition Failed";
case 413: return "Request Entity Too Large";
case 414: return "Request URI Too Long";
case 415: return "Unsupported Media Type";
case 416: return "Requested Range Not Satisfiable";
case 417: return "Expectation Failed";
case 418: return "I'm a teapot";
case 422: return "Unprocessable Entity";
case 423: return "Locked";
case 424: return "Failed Dependency";
case 425: return "Unordered Collection";
case 426: return "Upgrade Required";
case 444: return "No Response";
case 450: return "Blocked by Windows Parental Controls";
case 499: return "Client Closed Request";
case 500: return "Internal Server Error";
case 501: return "Not Implemented";
case 502: return "Bad Gateway";
case 503: return "Service Unavailable";
case 504: return "Gateway Timeout";
case 505: return "HTTP Version Not Supported";
case 506: return "Variant Also Negotiates";
case 507: return "Insufficient Storage";
case 509: return "Bandwidth Limit Exceeded";
case 510: return "Not Extended";
case 600: return "Internal Client Error";
case 601: return "Unsupported Protocol";
case 602: return "Server Not Found";
case 603: return "Malformed Response";
case 604: return "Network Not Available";
case 605: return "Request Timed Out";
case 606: return "Malformed URL";
case 607: return "Connection Refused";
case 608: return "Proxy Server Not Found";
case 609: return "SSL Failure";
case 610: return "Cancelled by Shutdown";
default: return "Unknown Status Code";
}
} }
return request->Finish(status);
}
const char *StatusText(int code)
{
return http_ret_text(code);
}
} }

View File

@ -1,54 +1,63 @@
#ifndef REQUEST_H #ifndef REQUEST_H
#define REQUEST_H #define REQUEST_H
#include <map> #include <map>
#include <curl/curl.h>
#include "common/String.h" #include "common/String.h"
namespace http namespace http
{ {
class RequestManager; class RequestManager;
class Request class Request
{ {
ByteString uri; ByteString uri;
void *http; ByteString response_body;
bool keepAlive;
char *requestData; CURL *easy;
int requestSize;
int requestStatus;
ByteString postData; volatile curl_off_t rm_total;
ByteString postDataBoundary; volatile curl_off_t rm_done;
volatile bool rm_finished;
volatile bool rm_canceled;
volatile bool rm_started;
pthread_mutex_t rm_mutex;
ByteString userID; bool added_to_multi;
ByteString userSession; int status;
volatile bool requestFinished; struct curl_slist *headers;
volatile bool requestCanceled; curl_mime *post_fields;
volatile bool requestStarted;
public: pthread_cond_t done_cv;
Request(ByteString uri, bool keepAlive = false);
virtual ~Request();
void AddPostData(std::map<ByteString, ByteString> data); public:
void AddPostData(std::pair<ByteString, ByteString> data); Request(ByteString uri);
void AuthHeaders(ByteString ID, ByteString session); virtual ~Request();
void Start();
ByteString Finish(int *status);
void Cancel();
void CheckProgress(int *total, int *done); void AddHeader(ByteString name, ByteString value);
bool CheckDone(); void AddPostData(std::map<ByteString, ByteString> data);
bool CheckCanceled(); void AuthHeaders(ByteString ID, ByteString session);
bool CheckStarted();
friend class RequestManager; void Start();
ByteString Finish(int *status);
void Cancel();
static ByteString Simple(ByteString uri, int *status, std::map<ByteString, ByteString> post_data = std::map<ByteString, ByteString>{}); void CheckProgress(int *total, int *done);
static ByteString SimpleAuth(ByteString uri, int *status, ByteString ID, ByteString session, std::map<ByteString, ByteString> post_data = std::map<ByteString, ByteString>{}); bool CheckDone();
}; bool CheckCanceled();
bool CheckStarted();
const char *StatusText(int code); friend class RequestManager;
static ByteString Simple(ByteString uri, int *status, std::map<ByteString, ByteString> post_data = std::map<ByteString, ByteString>{});
static ByteString SimpleAuth(ByteString uri, int *status, ByteString ID, ByteString session, std::map<ByteString, ByteString> post_data = std::map<ByteString, ByteString>{});
};
const char *StatusText(int code);
extern const long timeout;
extern ByteString proxy;
extern ByteString user_agent;
} }
#endif #endif // REQUEST_H

View File

@ -1,164 +1,212 @@
#include "RequestManager.h" #include "RequestManager.h"
#include "Request.h" #include "Request.h"
#include "HTTP.h"
#include "Config.h" #include "Config.h"
#include "Platform.h" #include "Platform.h"
const int curl_multi_wait_timeout_ms = 100;
const long curl_max_host_connections = 6;
namespace http namespace http
{ {
RequestManager::RequestManager(): const long timeout = 15;
threadStarted(false), ByteString proxy;
lastUsed(time(NULL)), ByteString user_agent;
managerRunning(false),
managerShutdown(false),
requests(std::vector<Request*>()),
requestsAddQueue(std::vector<Request*>())
{
pthread_mutex_init(&requestLock, NULL);
pthread_mutex_init(&requestAddLock, NULL);
}
RequestManager::~RequestManager() RequestManager::RequestManager():
{ rt_shutting_down(false),
multi(NULL)
}
void RequestManager::Shutdown()
{
pthread_mutex_lock(&requestLock);
pthread_mutex_lock(&requestAddLock);
for (std::vector<Request*>::iterator iter = requests.begin(); iter != requests.end(); ++iter)
{ {
Request *request = (*iter); pthread_cond_init(&rt_cv, NULL);
if (request->http) pthread_mutex_init(&rt_mutex, NULL);
http_force_close(request->http);
request->requestCanceled = true;
delete request;
} }
requests.clear();
requestsAddQueue.clear();
managerShutdown = true;
pthread_mutex_unlock(&requestAddLock);
pthread_mutex_unlock(&requestLock);
if (threadStarted)
pthread_join(requestThread, NULL);
http_done(); RequestManager::~RequestManager()
}
//helper function for request
TH_ENTRY_POINT void* RequestManagerHelper(void* obj)
{
RequestManager *temp = (RequestManager*)obj;
temp->Update();
return NULL;
}
void RequestManager::Initialise(ByteString Proxy)
{
proxy = Proxy;
if (proxy.length())
{ {
http_init((char *)proxy.c_str()); pthread_mutex_destroy(&rt_mutex);
pthread_cond_destroy(&rt_cv);
} }
else
void RequestManager::Shutdown()
{ {
http_init(NULL); pthread_mutex_lock(&rt_mutex);
rt_shutting_down = true;
pthread_cond_signal(&rt_cv);
pthread_mutex_unlock(&rt_mutex);
pthread_join(worker_thread, NULL);
curl_multi_cleanup(multi);
multi = NULL;
curl_global_cleanup();
} }
}
void RequestManager::Start() TH_ENTRY_POINT void *RequestManager::RequestManagerHelper(void *obj)
{
managerRunning = true;
lastUsed = time(NULL);
pthread_create(&requestThread, NULL, &RequestManagerHelper, this);
}
void RequestManager::Update()
{
unsigned int numActiveRequests = 0;
while (!managerShutdown)
{ {
pthread_mutex_lock(&requestAddLock); ((RequestManager *)obj)->Worker();
if (requestsAddQueue.size()) return NULL;
}
void RequestManager::Initialise(ByteString Proxy)
{
curl_global_init(CURL_GLOBAL_DEFAULT);
multi = curl_multi_init();
if (multi)
{ {
for (size_t i = 0; i < requestsAddQueue.size(); i++) curl_multi_setopt(multi, CURLMOPT_MAX_HOST_CONNECTIONS, curl_max_host_connections);
{
requests.push_back(requestsAddQueue[i]);
}
requestsAddQueue.clear();
} }
pthread_mutex_unlock(&requestAddLock);
if (requests.size()) proxy = Proxy;
user_agent = ByteString::Build("PowderToy/", SAVE_VERSION, ".", MINOR_VERSION, " (", IDENT_PLATFORM, "; ", IDENT_BUILD, "; M", MOD_ID, ") TPTPP/", SAVE_VERSION, ".", MINOR_VERSION, ".", BUILD_NUM, IDENT_RELTYPE, ".", SNAPSHOT_ID);
pthread_create(&worker_thread, NULL, &RequestManager::RequestManagerHelper, this);
}
void RequestManager::Worker()
{
bool shutting_down = false;
while (!shutting_down)
{ {
numActiveRequests = 0; for (Request *request : requests_to_remove)
pthread_mutex_lock(&requestLock);
for (size_t i = 0; i < requests.size(); i++)
{ {
Request *request = requests[i]; requests.erase(request);
if (request->requestCanceled) if (multi && request->easy && request->added_to_multi)
{ {
if (request->http && request->requestStarted) curl_multi_remove_handle(multi, request->easy);
http_force_close(request->http); request->added_to_multi = false;
delete request;
requests.erase(requests.begin()+i);
i--;
} }
else if (request->requestStarted && !request->requestFinished) delete request;
}
requests_to_remove.clear();
pthread_mutex_lock(&rt_mutex);
shutting_down = rt_shutting_down;
for (Request *request : requests_to_add)
{
request->status = 0;
requests.insert(request);
}
requests_to_add.clear();
if (requests.empty())
{
while (!rt_shutting_down && requests_to_add.empty())
{ {
if (http_async_req_status(request->http) != 0) pthread_cond_wait(&rt_cv, &rt_mutex);
}
}
pthread_mutex_unlock(&rt_mutex);
if (multi && !requests.empty())
{
int dontcare;
struct CURLMsg *msg;
curl_multi_wait(multi, nullptr, 0, curl_multi_wait_timeout_ms, &dontcare);
curl_multi_perform(multi, &dontcare);
while ((msg = curl_multi_info_read(multi, &dontcare)))
{
if (msg->msg == CURLMSG_DONE)
{ {
request->requestData = http_async_req_stop(request->http, &request->requestStatus, &request->requestSize); Request *request;
request->requestFinished = true; curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &request);
if (!request->keepAlive)
request->http = NULL; int finish_with = 600;
switch (msg->data.result)
{
case CURLE_OK:
long code;
curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &code);
finish_with = (int)code;
break;
case CURLE_UNSUPPORTED_PROTOCOL: finish_with = 601; break;
case CURLE_COULDNT_RESOLVE_HOST: finish_with = 602; break;
case CURLE_OPERATION_TIMEDOUT: finish_with = 605; break;
case CURLE_URL_MALFORMAT: finish_with = 606; break;
case CURLE_COULDNT_CONNECT: finish_with = 607; break;
case CURLE_COULDNT_RESOLVE_PROXY: finish_with = 608; break;
case CURLE_SSL_CONNECT_ERROR:
case CURLE_SSL_ENGINE_NOTFOUND:
case CURLE_SSL_ENGINE_SETFAILED:
case CURLE_SSL_CERTPROBLEM:
case CURLE_SSL_CIPHER:
case CURLE_SSL_ENGINE_INITFAILED:
case CURLE_SSL_CACERT_BADFILE:
case CURLE_SSL_CRL_BADFILE:
case CURLE_SSL_ISSUER_ERROR:
case CURLE_SSL_PINNEDPUBKEYNOTMATCH:
case CURLE_SSL_INVALIDCERTSTATUS: finish_with = 609; break;
case CURLE_HTTP2:
case CURLE_HTTP2_STREAM:
case CURLE_FAILED_INIT:
case CURLE_NOT_BUILT_IN:
default:
break;
}
request->status = finish_with;
} }
lastUsed = time(NULL); };
numActiveRequests++;
}
} }
pthread_mutex_unlock(&requestLock);
}
if (time(NULL) > lastUsed+http_timeout*2 && !numActiveRequests)
{
pthread_mutex_lock(&requestLock);
managerRunning = false;
pthread_mutex_unlock(&requestLock);
return;
}
Platform::Millisleep(1);
}
}
void RequestManager::EnsureRunning() for (Request *request : requests)
{ {
pthread_mutex_lock(&requestLock); pthread_mutex_lock(&request->rm_mutex);
if (!managerRunning)
if (shutting_down)
{
// In the weird case that a http::Request::Simple* call is
// waiting on this Request, we should fail the request
// instead of cancelling it ourselves.
request->status = 610;
}
if (request->rm_canceled)
{
requests_to_remove.insert(request);
}
if (!request->rm_canceled && request->rm_started && !request->added_to_multi)
{
if (multi && request->easy)
{
curl_multi_add_handle(multi, request->easy);
request->added_to_multi = true;
}
else
{
request->status = 604;
}
}
if (!request->rm_canceled && request->rm_started && !request->rm_finished)
{
if (multi && request->easy)
{
curl_easy_getinfo(request->easy, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &request->rm_total);
curl_easy_getinfo(request->easy, CURLINFO_SIZE_DOWNLOAD_T, &request->rm_done);
}
if (request->status)
{
request->rm_finished = true;
pthread_cond_signal(&request->done_cv);
}
}
pthread_mutex_unlock(&request->rm_mutex);
}
}
}
void RequestManager::AddRequest(Request *request)
{ {
if (threadStarted) pthread_mutex_lock(&rt_mutex);
pthread_join(requestThread, NULL); requests_to_add.insert(request);
else pthread_cond_signal(&rt_cv);
threadStarted = true; pthread_mutex_unlock(&rt_mutex);
Start();
} }
pthread_mutex_unlock(&requestLock);
}
void RequestManager::AddRequest(Request *request)
{
pthread_mutex_lock(&requestAddLock);
requestsAddQueue.push_back(request);
pthread_mutex_unlock(&requestAddLock);
EnsureRunning();
}
void RequestManager::Lock()
{
pthread_mutex_lock(&requestAddLock);
}
void RequestManager::Unlock()
{
pthread_mutex_unlock(&requestAddLock);
}
} }

View File

@ -1,46 +1,48 @@
#ifndef REQUESTMANAGER_H #ifndef REQUESTMANAGER_H
#define REQUESTMANAGER_H #define REQUESTMANAGER_H
#include "common/tpt-thread.h" #include "common/tpt-thread.h"
#include <ctime> #include <ctime>
#include <vector> #include <set>
#include <curl/curl.h>
#include "common/Singleton.h" #include "common/Singleton.h"
#include "common/String.h" #include "common/String.h"
namespace http namespace http
{ {
class Request; class Request;
class RequestManager : public Singleton<RequestManager> class RequestManager : public Singleton<RequestManager>
{ {
private: pthread_t worker_thread;
pthread_t requestThread; std::set<Request *> requests;
pthread_mutex_t requestLock;
pthread_mutex_t requestAddLock;
bool threadStarted;
ByteString proxy;
int lastUsed; std::set<Request *> requests_to_add;
volatile bool managerRunning; std::set<Request *> requests_to_remove;
volatile bool managerShutdown; bool rt_shutting_down;
std::vector<Request*> requests; pthread_mutex_t rt_mutex;
std::vector<Request*> requestsAddQueue; pthread_cond_t rt_cv;
void Start(); CURLM *multi;
public:
RequestManager();
~RequestManager();
void Initialise(ByteString proxy); void Start();
void Worker();
void AddRequest(Request *request);
void Shutdown(); static TH_ENTRY_POINT void *RequestManagerHelper(void *obj);
void Update();
void EnsureRunning();
void AddRequest(Request *request); public:
void RemoveRequest(int id); RequestManager();
~RequestManager();
void Lock(); void Initialise(ByteString proxy);
void Unlock(); void Shutdown();
};
friend class Request;
};
extern const long timeout;
extern ByteString proxy;
extern ByteString user_agent;
} }
#endif // REQUESTMANAGER_H #endif // REQUESTMANAGER_H

View File

@ -42,7 +42,6 @@ void AbandonableTask::Start()
thAbandoned = false; thAbandoned = false;
progress = 0; progress = 0;
status = ""; status = "";
//taskMutex = PTHREAD_MUTEX_INITIALIZER;
before(); before();
pthread_mutex_init (&taskMutex, NULL); pthread_mutex_init (&taskMutex, NULL);
pthread_create(&doWorkThread, 0, &AbandonableTask::doWork_helper, this); pthread_create(&doWorkThread, 0, &AbandonableTask::doWork_helper, this);
@ -79,6 +78,10 @@ TH_ENTRY_POINT void * AbandonableTask::doWork_helper(void * ref)
void AbandonableTask::Finish() void AbandonableTask::Finish()
{ {
// note to self: if you make this wait for a condition variable,
// lock the corresponding mutex before calling GetDone, otherwise
// the CV may be signalled between the call and the locking of the
// mutex. -- LBPHacker
while (!GetDone()) while (!GetDone())
{ {
Poll(); Poll();