18
18
*/
19
19
#include " HTTPLookupService.h"
20
20
21
- #include < curl/curl.h>
22
21
#include < pulsar/Version.h>
23
22
24
23
#include < boost/property_tree/json_parser.hpp>
25
24
#include < boost/property_tree/ptree.hpp>
26
25
26
+ #include " CurlWrapper.h"
27
27
#include " ExecutorService.h"
28
28
#include " Int64SerDes.h"
29
29
#include " LogUtils.h"
@@ -46,16 +46,6 @@ const static std::string ADMIN_PATH_V2 = "/admin/v2/";
46
46
const static std::string PARTITION_METHOD_NAME = " partitions" ;
47
47
const static int NUMBER_OF_LOOKUP_THREADS = 1 ;
48
48
49
- static inline bool needRedirection (long code) { return (code == 307 || code == 302 || code == 301 ); }
50
-
51
- HTTPLookupService::CurlInitializer::CurlInitializer () {
52
- // Once per application - https://curl.haxx.se/mail/lib-2015-11/0052.html
53
- curl_global_init (CURL_GLOBAL_ALL);
54
- }
55
- HTTPLookupService::CurlInitializer::~CurlInitializer () { curl_global_cleanup (); }
56
-
57
- HTTPLookupService::CurlInitializer HTTPLookupService::curlInitializer;
58
-
59
49
HTTPLookupService::HTTPLookupService (ServiceNameResolver &serviceNameResolver,
60
50
const ClientConfiguration &clientConfiguration,
61
51
const AuthenticationPtr &authData)
@@ -182,11 +172,6 @@ Future<Result, SchemaInfo> HTTPLookupService::getSchema(const TopicNamePtr &topi
182
172
return promise.getFuture ();
183
173
}
184
174
185
- static size_t curlWriteCallback (void *contents, size_t size, size_t nmemb, void *responseDataPtr) {
186
- ((std::string *)responseDataPtr)->append ((char *)contents, size * nmemb);
187
- return size * nmemb;
188
- }
189
-
190
175
void HTTPLookupService::handleNamespaceTopicsHTTPRequest (NamespaceTopicsPromise promise,
191
176
const std::string completeUrl) {
192
177
std::string responseData;
@@ -209,111 +194,61 @@ Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &
209
194
uint16_t reqCount = 0 ;
210
195
Result retResult = ResultOk;
211
196
while (++reqCount <= maxLookupRedirects_) {
212
- CURL *handle;
213
- CURLcode res;
214
- std::string version = std::string (" Pulsar-CPP-v" ) + PULSAR_VERSION_STR;
215
- handle = curl_easy_init ();
216
-
217
- if (!handle) {
218
- LOG_ERROR (" Unable to curl_easy_init for url " << completeUrl);
219
- // No curl_easy_cleanup required since handle not initialized
220
- return ResultLookupError;
221
- }
222
- // set URL
223
- curl_easy_setopt (handle, CURLOPT_URL, completeUrl.c_str ());
224
-
225
- // Write callback
226
- curl_easy_setopt (handle, CURLOPT_WRITEFUNCTION, curlWriteCallback);
227
- curl_easy_setopt (handle, CURLOPT_WRITEDATA, &responseData);
228
-
229
- // New connection is made for each call
230
- curl_easy_setopt (handle, CURLOPT_FRESH_CONNECT, 1L );
231
- curl_easy_setopt (handle, CURLOPT_FORBID_REUSE, 1L );
232
-
233
- // Skipping signal handling - results in timeouts not honored during the DNS lookup
234
- curl_easy_setopt (handle, CURLOPT_NOSIGNAL, 1L );
235
-
236
- // Timer
237
- curl_easy_setopt (handle, CURLOPT_TIMEOUT, lookupTimeoutInSeconds_);
238
-
239
- // Set User Agent
240
- curl_easy_setopt (handle, CURLOPT_USERAGENT, version.c_str ());
241
-
242
- // Fail if HTTP return code >=400
243
- curl_easy_setopt (handle, CURLOPT_FAILONERROR, 1L );
244
-
245
197
// Authorization data
246
198
AuthenticationDataPtr authDataContent;
247
199
Result authResult = authenticationPtr_->getAuthData (authDataContent);
248
200
if (authResult != ResultOk) {
249
201
LOG_ERROR (" Failed to getAuthData: " << authResult);
250
- curl_easy_cleanup (handle);
251
202
return authResult;
252
203
}
253
- struct curl_slist *list = NULL ;
254
- if (authDataContent->hasDataForHttp ()) {
255
- list = curl_slist_append (list, authDataContent->getHttpHeaders ().c_str ());
204
+
205
+ CurlWrapper curl;
206
+ if (!curl.init ()) {
207
+ LOG_ERROR (" Unable to curl_easy_init for url " << completeUrl);
208
+ return ResultLookupError;
256
209
}
257
- curl_easy_setopt (handle, CURLOPT_HTTPHEADER, list);
258
210
259
- // TLS
211
+ std::unique_ptr<CurlWrapper::TlsContext> tlsContext;
260
212
if (isUseTls_) {
261
- if (curl_easy_setopt (handle, CURLOPT_SSLENGINE, NULL ) != CURLE_OK) {
262
- LOG_ERROR (" Unable to load SSL engine for url " << completeUrl);
263
- curl_easy_cleanup (handle);
264
- return ResultConnectError;
265
- }
266
- if (curl_easy_setopt (handle, CURLOPT_SSLENGINE_DEFAULT, 1L ) != CURLE_OK) {
267
- LOG_ERROR (" Unable to load SSL engine as default, for url " << completeUrl);
268
- curl_easy_cleanup (handle);
269
- return ResultConnectError;
270
- }
271
- curl_easy_setopt (handle, CURLOPT_SSLCERTTYPE, " PEM" );
272
-
273
- if (tlsAllowInsecure_) {
274
- curl_easy_setopt (handle, CURLOPT_SSL_VERIFYPEER, 0L );
275
- } else {
276
- curl_easy_setopt (handle, CURLOPT_SSL_VERIFYPEER, 1L );
277
- }
278
-
279
- if (!tlsTrustCertsFilePath_.empty ()) {
280
- curl_easy_setopt (handle, CURLOPT_CAINFO, tlsTrustCertsFilePath_.c_str ());
281
- }
282
-
283
- curl_easy_setopt (handle, CURLOPT_SSL_VERIFYHOST, tlsValidateHostname_ ? 1L : 0L );
284
-
213
+ tlsContext.reset (new CurlWrapper::TlsContext);
214
+ tlsContext->trustCertsFilePath = tlsTrustCertsFilePath_;
215
+ tlsContext->validateHostname = tlsValidateHostname_;
216
+ tlsContext->allowInsecure = tlsAllowInsecure_;
285
217
if (authDataContent->hasDataForTls ()) {
286
- curl_easy_setopt (handle, CURLOPT_SSLCERT, authDataContent->getTlsCertificates (). c_str () );
287
- curl_easy_setopt (handle, CURLOPT_SSLKEY, authDataContent->getTlsPrivateKey (). c_str () );
218
+ tlsContext-> certPath = authDataContent->getTlsCertificates ();
219
+ tlsContext-> keyPath = authDataContent->getTlsPrivateKey ();
288
220
} else {
289
- if (!tlsPrivateFilePath_.empty () && !tlsCertificateFilePath_.empty ()) {
290
- curl_easy_setopt (handle, CURLOPT_SSLCERT, tlsCertificateFilePath_.c_str ());
291
- curl_easy_setopt (handle, CURLOPT_SSLKEY, tlsPrivateFilePath_.c_str ());
292
- }
221
+ tlsContext->certPath = tlsCertificateFilePath_;
222
+ tlsContext->keyPath = tlsPrivateFilePath_;
293
223
}
294
224
}
295
225
296
226
LOG_INFO (" Curl [" << reqCount << " ] Lookup Request sent for " << completeUrl);
227
+ CurlWrapper::Options options;
228
+ options.timeoutInSeconds = lookupTimeoutInSeconds_;
229
+ options.userAgent = std::string (" Pulsar-CPP-v" ) + PULSAR_VERSION_STR;
230
+ options.maxLookupRedirects = 1 ; // redirection is implemented by the outer loop
231
+ auto result = curl.get (completeUrl, authDataContent->getHttpHeaders (), options, tlsContext.get ());
232
+ const auto &error = result.error ;
233
+ if (!error.empty ()) {
234
+ LOG_ERROR (completeUrl << " failed: " << error);
235
+ return ResultConnectError;
236
+ }
297
237
298
- // Make get call to server
299
- res = curl_easy_perform (handle);
300
-
301
- curl_easy_getinfo (handle, CURLINFO_RESPONSE_CODE, &responseCode);
238
+ responseData = result.responseData ;
239
+ responseCode = result.responseCode ;
240
+ auto res = result.code ;
302
241
LOG_INFO (" Response received for url " << completeUrl << " responseCode " << responseCode
303
242
<< " curl res " << res);
304
243
305
- // Free header list
306
- curl_slist_free_all (list);
307
-
244
+ const auto &redirectUrl = result.redirectUrl ;
308
245
switch (res) {
309
246
case CURLE_OK:
310
247
if (responseCode == 200 ) {
311
248
retResult = ResultOk;
312
- } else if (needRedirection (responseCode)) {
313
- char *url = NULL ;
314
- curl_easy_getinfo (handle, CURLINFO_REDIRECT_URL, &url);
315
- LOG_INFO (" Response from url " << completeUrl << " to new url " << url);
316
- completeUrl = url;
249
+ } else if (!redirectUrl.empty ()) {
250
+ LOG_INFO (" Response from url " << completeUrl << " to new url " << redirectUrl);
251
+ completeUrl = redirectUrl;
317
252
retResult = ResultLookupError;
318
253
} else {
319
254
retResult = ResultLookupError;
@@ -342,8 +277,7 @@ Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &
342
277
retResult = ResultLookupError;
343
278
break ;
344
279
}
345
- curl_easy_cleanup (handle);
346
- if (!needRedirection (responseCode)) {
280
+ if (redirectUrl.empty ()) {
347
281
break ;
348
282
}
349
283
}
0 commit comments