Skip to content

Commit f30e6b6

Browse files
committed
migrate cpp task related apis to thrift
1 parent 38f9ae6 commit f30e6b6

38 files changed

+70684
-42
lines changed

presto-native-execution/presto_cpp/main/TaskResource.cpp

+50-19
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "presto_cpp/main/thrift/ThriftIO.h"
2020
#include "presto_cpp/main/thrift/gen-cpp2/PrestoThrift.h"
2121
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
22+
#include "presto_cpp/main/thrift/experimental/ThriftUtils.h"
2223

2324
namespace facebook::presto {
2425

@@ -210,25 +211,35 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
210211
const protocol::TaskId& taskId,
211212
const std::string& updateJson,
212213
const bool summarize,
213-
long startProcessCpuTime)>& createOrUpdateFunc) {
214+
long startProcessCpuTime,
215+
bool receiveThrift)>& createOrUpdateFunc) {
214216
protocol::TaskId taskId = pathMatch[1];
215217
bool summarize = message->hasQueryParam("summarize");
218+
219+
auto& headers = message->getHeaders();
220+
auto acceptHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
221+
auto sendThrift =
222+
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
223+
auto contentHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_CONTENT_TYPE);
224+
auto receiveThrift =
225+
contentHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
226+
216227
return new http::CallbackRequestHandler(
217-
[this, taskId, summarize, createOrUpdateFunc](
228+
[this, taskId, summarize, createOrUpdateFunc, sendThrift, receiveThrift](
218229
proxygen::HTTPMessage* /*message*/,
219230
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
220231
proxygen::ResponseHandler* downstream,
221232
std::shared_ptr<http::CallbackRequestHandlerState> handlerState) {
222233
folly::via(
223234
httpSrvCpuExecutor_,
224-
[this, &body, taskId, summarize, createOrUpdateFunc]() {
235+
[this, &body, taskId, summarize, createOrUpdateFunc, receiveThrift]() {
225236
const auto startProcessCpuTimeNs = util::getProcessCpuTimeNs();
226237
std::string updateJson = util::extractMessageBody(body);
227238

228239
std::unique_ptr<protocol::TaskInfo> taskInfo;
229240
try {
230241
taskInfo = createOrUpdateFunc(
231-
taskId, updateJson, summarize, startProcessCpuTimeNs);
242+
taskId, updateJson, summarize, startProcessCpuTimeNs, receiveThrift);
232243
} catch (const velox::VeloxException& e) {
233244
// Creating an empty task, putting errors inside so that next
234245
// status fetch from coordinator will catch the error and well
@@ -243,12 +254,19 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
243254
throw;
244255
}
245256
}
246-
return json(*taskInfo);
257+
return taskInfo;
247258
})
248259
.via(folly::EventBaseManager::get()->getEventBase())
249-
.thenValue([downstream, handlerState](auto&& taskInfoJson) {
260+
.thenValue([downstream, handlerState, sendThrift](std::unique_ptr<protocol::TaskInfo> taskInfo) {
250261
if (!handlerState->requestExpired()) {
251-
http::sendOkResponse(downstream, taskInfoJson);
262+
if (sendThrift) {
263+
protocol::cpp2::TaskInfo thriftTaskInfo;
264+
protocol::cpp2::toThrift(*taskInfo, thriftTaskInfo);
265+
http::sendOkThriftResponse(
266+
downstream, thriftWrite(thriftTaskInfo));
267+
} else {
268+
http::sendOkResponse(downstream, json(*taskInfo));
269+
}
252270
}
253271
})
254272
.thenError(
@@ -277,7 +295,8 @@ proxygen::RequestHandler* TaskResource::createOrUpdateBatchTask(
277295
[&](const protocol::TaskId& taskId,
278296
const std::string& updateJson,
279297
const bool summarize,
280-
long startProcessCpuTime) {
298+
long startProcessCpuTime,
299+
bool receiveThrift) {
281300
protocol::BatchTaskUpdateRequest batchUpdateRequest =
282301
json::parse(updateJson);
283302
auto updateRequest = batchUpdateRequest.taskUpdateRequest;
@@ -329,13 +348,25 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTask(
329348
[&](const protocol::TaskId& taskId,
330349
const std::string& updateJson,
331350
const bool summarize,
332-
long startProcessCpuTime) {
333-
protocol::TaskUpdateRequest updateRequest = json::parse(updateJson);
351+
long startProcessCpuTime,
352+
bool receiveThrift) {
353+
protocol::TaskUpdateRequest updateRequest;
354+
if (receiveThrift) {
355+
auto thriftTaskUpdateRequest = std::make_shared<protocol::cpp2::TaskUpdateRequest>();
356+
thriftRead(updateJson, thriftTaskUpdateRequest);
357+
protocol::cpp2::fromThrift(*thriftTaskUpdateRequest, updateRequest);
358+
} else {
359+
updateRequest = json::parse(updateJson);
360+
}
334361
velox::core::PlanFragment planFragment;
335362
std::shared_ptr<velox::core::QueryCtx> queryCtx;
336363
if (updateRequest.fragment) {
337-
auto fragment =
338-
velox::encoding::Base64::decode(*updateRequest.fragment);
364+
std::string fragment;
365+
if (receiveThrift) {
366+
fragment = *updateRequest.fragment;
367+
} else {
368+
fragment = velox::encoding::Base64::decode(*updateRequest.fragment);
369+
}
339370
protocol::PlanFragment prestoPlan = json::parse(fragment);
340371

341372
queryCtx =
@@ -511,11 +542,11 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
511542

512543
auto& headers = message->getHeaders();
513544
auto acceptHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
514-
auto useThrift =
545+
auto sendThrift =
515546
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
516547

517548
return new http::CallbackRequestHandler(
518-
[this, useThrift, taskId, currentState, maxWait](
549+
[this, sendThrift, taskId, currentState, maxWait](
519550
proxygen::HTTPMessage* /*message*/,
520551
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
521552
proxygen::ResponseHandler* downstream,
@@ -525,7 +556,7 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
525556
httpSrvCpuExecutor_,
526557
[this,
527558
evb,
528-
useThrift,
559+
sendThrift,
529560
taskId,
530561
currentState,
531562
maxWait,
@@ -535,12 +566,12 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
535566
.getTaskStatus(taskId, currentState, maxWait, handlerState)
536567
.via(evb)
537568
.thenValue(
538-
[useThrift, downstream, taskId, handlerState](
569+
[sendThrift, downstream, taskId, handlerState](
539570
std::unique_ptr<protocol::TaskStatus> taskStatus) {
540571
if (!handlerState->requestExpired()) {
541-
if (useThrift) {
542-
thrift::TaskStatus thriftTaskStatus;
543-
toThrift(*taskStatus, thriftTaskStatus);
572+
if (sendThrift) {
573+
protocol::cpp2::TaskStatus thriftTaskStatus;
574+
protocol::cpp2::toThrift(*taskStatus, thriftTaskStatus);
544575
http::sendOkThriftResponse(
545576
downstream, thriftWrite(thriftTaskStatus));
546577
} else {

presto-native-execution/presto_cpp/main/TaskResource.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ class TaskResource {
7676
const protocol::TaskId&,
7777
const std::string&,
7878
const bool,
79-
long)>& createOrUpdateFunc);
79+
long,
80+
const bool)>& createOrUpdateFunc);
8081

8182
proxygen::RequestHandler* deleteTask(
8283
proxygen::HTTPMessage* message,

presto-native-execution/presto_cpp/main/common/tests/test_json.h

+20-1
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414
#pragma once
1515

1616
#include <fstream>
17-
#include <ios>
1817
#include <iosfwd>
18+
#include <boost/filesystem.hpp>
19+
#include <boost/algorithm/string.hpp>
1920

2021
#include "presto_cpp/presto_protocol/core/presto_protocol_core.h"
2122

23+
namespace fs = boost::filesystem;
24+
2225
namespace nlohmann {
2326

2427
// This is required avoid stack overflow when a gtest error printer is invoked.
@@ -48,3 +51,19 @@ inline std::string slurp(const std::string& path) {
4851
buf << input.rdbuf();
4952
return buf.str();
5053
}
54+
55+
inline std::string getDataPath(const std::string& dirUnderFbcode, const std::string& fileName) {
56+
std::string currentPath = fs::current_path().c_str();
57+
if (boost::algorithm::ends_with(currentPath, "fbcode")) {
58+
return currentPath + dirUnderFbcode + fileName;
59+
}
60+
61+
// CLion runs the tests from cmake-build-release/ or cmake-build-debug/
62+
// directory. Hard-coded json files are not copied there and test fails with
63+
// file not found. Fixing the path so that we can trigger these tests from
64+
// CLion.
65+
boost::algorithm::replace_all(currentPath, "cmake-build-release/", "");
66+
boost::algorithm::replace_all(currentPath, "cmake-build-debug/", "");
67+
68+
return currentPath + "/data/" + fileName;
69+
}

presto-native-execution/presto_cpp/main/thrift/ThriftIO.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
#include <thrift/lib/cpp2/protocol/BinaryProtocol.h>
1717

1818
template <typename T>
19-
void thriftRead(std::string data, std::shared_ptr<T>& buffer) {
20-
auto inBuf = folly::IOBuf::copyBuffer(data);
19+
void thriftRead(const std::string& data, std::shared_ptr<T>& buffer) {
20+
auto inBuf = folly::IOBuf::wrapBuffer(data.data(), data.size());
2121
apache::thrift::BinaryProtocolReader reader;
2222
reader.setInput(inBuf.get());
2323
buffer->read(&reader);

0 commit comments

Comments
 (0)