Skip to content

Commit ebe95d1

Browse files
committed
Support waiting for ACK response
TODO: - Reuse the logic to convert vector<pair<MessageId, ResultCallback> to a map. - Fix the callback in AckGroupingTrackerEnabled not triggered in time Refactor the grouping tracker API
1 parent 998c73d commit ebe95d1

13 files changed

+312
-217
lines changed

include/pulsar/ConsumerConfiguration.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,22 @@ class PULSAR_PUBLIC ConsumerConfiguration {
629629

630630
const std::vector<ConsumerInterceptorPtr>& getInterceptors() const;
631631

632+
/**
633+
* Whether to receive the ACK receipt from broker.
634+
*
635+
* By default, when Consumer::acknowledge is called, it won't wait until the corresponding response from
636+
* broker. After it's enabled, the `acknowledge` method will return a Result that indicates if the
637+
* acknowledgment succeeded.
638+
*
639+
* Default: false
640+
*/
641+
ConsumerConfiguration& setAckReceiptEnabled(bool ackReceiptEnabled);
642+
643+
/**
644+
* The associated getter of setAckReceiptEnabled.
645+
*/
646+
bool isAckReceiptEnabled() const;
647+
632648
friend class PulsarWrapper;
633649
friend class PulsarFriend;
634650

lib/AckGroupingTracker.cc

Lines changed: 65 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,66 +21,89 @@
2121

2222
#include "BitSet.h"
2323
#include "ClientConnection.h"
24+
#include "ClientImpl.h"
2425
#include "Commands.h"
26+
#include "HandlerBase.h"
2527
#include "LogUtils.h"
2628
#include "MessageIdImpl.h"
2729

2830
namespace pulsar {
2931

3032
DECLARE_LOG_OBJECT();
3133

32-
inline void sendAck(ClientConnectionPtr cnx, uint64_t consumerId, const MessageId& msgId,
33-
CommandAck_AckType ackType) {
34-
const auto& bitSet = Commands::getMessageIdImpl(msgId)->getBitSet();
35-
auto cmd = Commands::newAck(consumerId, msgId.ledgerId(), msgId.entryId(), bitSet, ackType, -1);
36-
cnx->sendCommand(cmd);
37-
LOG_DEBUG("ACK request is sent for message - [" << msgId.ledgerId() << ", " << msgId.entryId() << "]");
38-
}
39-
40-
bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
41-
const MessageId& msgId, CommandAck_AckType ackType) {
42-
auto cnx = connWeakPtr.lock();
43-
if (cnx == nullptr) {
44-
LOG_DEBUG("Connection is not ready, ACK failed for message - [" << msgId.ledgerId() << ", "
45-
<< msgId.entryId() << "]");
46-
return false;
34+
std::pair<std::shared_ptr<ClientConnection>, uint64_t> AckGroupingTracker::generateAckInfo() const {
35+
auto handler = handler_.lock();
36+
if (!handler) {
37+
LOG_DEBUG("Reference to the HandlerBase is not valid in generateAckInfo");
38+
return std::make_pair(nullptr, 0);
4739
}
48-
sendAck(cnx, consumerId, msgId, ackType);
49-
return true;
50-
}
51-
52-
static std::ostream& operator<<(std::ostream& os, const std::set<MessageId>& msgIds) {
53-
bool first = true;
54-
for (auto&& msgId : msgIds) {
55-
if (first) {
56-
first = false;
57-
} else {
58-
os << ", ";
40+
auto cnx = handler->getCnx().lock();
41+
if (!cnx) {
42+
LOG_DEBUG("Connection is not ready in generateAckInfo");
43+
return std::make_pair(nullptr, 0);
44+
}
45+
if (waitResponse_) {
46+
auto client = client_.lock();
47+
if (!client) {
48+
LOG_DEBUG("Reference to the ClientImpl is not valid in generateAckInfo");
49+
return std::make_pair(nullptr, 0);
5950
}
60-
os << "[" << msgId << "]";
51+
return std::make_pair(cnx, client->newRequestId());
52+
} else {
53+
return std::make_pair(cnx, 0);
6154
}
62-
return os;
6355
}
6456

65-
bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
66-
const std::set<MessageId>& msgIds) {
67-
auto cnx = connWeakPtr.lock();
68-
if (cnx == nullptr) {
69-
LOG_DEBUG("Connection is not ready, ACK failed.");
70-
return false;
57+
void AckGroupingTracker::doImmediateAck(ClientConnection& cnx, uint64_t requestId, const MessageId& msgId,
58+
ResultCallback callback, CommandAck_AckType ackType) {
59+
const auto& bitSet = Commands::getMessageIdImpl(msgId)->getBitSet();
60+
auto cmd = Commands::newAck(consumerId_, msgId.ledgerId(), msgId.entryId(), bitSet, ackType, -1);
61+
if (waitResponse_) {
62+
cnx.sendRequestWithId(cmd, requestId).addListener([callback](Result result, const ResponseData&) {
63+
if (callback) {
64+
callback(result);
65+
}
66+
});
67+
} else {
68+
cnx.sendCommand(cmd);
69+
callback(ResultOk);
7170
}
71+
}
72+
73+
void AckGroupingTracker::doImmediateAck(ClientConnection& cnx, uint64_t requestId,
74+
const std::map<MessageId, ResultCallback>& msgIdToCallback) {
75+
using Callbacks = std::vector<ResultCallback>;
76+
if (Commands::peerSupportsActiveConsumerListener(cnx.getServerProtocolVersion())) {
77+
std::set<MessageId> msgIds;
78+
std::unique_ptr<Callbacks> callbacks{new Callbacks};
7279

73-
if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
74-
auto cmd = Commands::newMultiMessageAck(consumerId, msgIds);
75-
cnx->sendCommand(cmd);
76-
LOG_DEBUG("ACK request is sent for " << msgIds.size() << " messages: " << msgIds);
80+
for (auto&& kv : msgIdToCallback) {
81+
msgIds.emplace(kv.first);
82+
if (kv.second) {
83+
callbacks->emplace_back(kv.second);
84+
}
85+
}
86+
87+
auto cmd = Commands::newMultiMessageAck(consumerId_, msgIds);
88+
if (waitResponse_) {
89+
auto rawPtr = callbacks.release();
90+
cnx.sendRequestWithId(cmd, requestId).addListener([rawPtr](Result result, const ResponseData&) {
91+
std::unique_ptr<Callbacks> callbacks{rawPtr};
92+
for (auto&& callback : *callbacks) {
93+
callback(result);
94+
}
95+
});
96+
} else {
97+
cnx.sendCommand(cmd);
98+
for (auto&& callback : *callbacks) {
99+
callback(ResultOk);
100+
}
101+
}
77102
} else {
78-
// Broker does not support multi-message ACK, use multiple individual ACKs instead.
79-
for (const auto& msgId : msgIds) {
80-
sendAck(cnx, consumerId, msgId, CommandAck_AckType_Individual);
103+
for (auto&& kv : msgIdToCallback) {
104+
doImmediateAck(cnx, requestId, kv.first, kv.second, CommandAck_AckType_Individual);
81105
}
82106
}
83-
return true;
84107
}
85108

86109
} // namespace pulsar

lib/AckGroupingTracker.h

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,21 @@
1919
#ifndef LIB_ACKGROUPINGTRACKER_H_
2020
#define LIB_ACKGROUPINGTRACKER_H_
2121

22+
#include <pulsar/Consumer.h>
2223
#include <pulsar/MessageId.h>
2324

2425
#include <cstdint>
25-
#include <set>
26+
#include <map>
27+
#include <utility> // std::pair
2628

2729
#include "ProtoApiEnums.h"
2830

2931
namespace pulsar {
3032

33+
class ClientImpl;
3134
class ClientConnection;
3235
using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
36+
class HandlerBase;
3337

3438
/**
3539
* @class AckGroupingTracker
@@ -38,7 +42,10 @@ using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
3842
*/
3943
class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracker> {
4044
public:
41-
AckGroupingTracker() = default;
45+
AckGroupingTracker(std::shared_ptr<ClientImpl> client, std::shared_ptr<HandlerBase> handler,
46+
uint64_t consumerId, bool waitResponse)
47+
: client_(client), handler_(handler), consumerId_(consumerId), waitResponse_(waitResponse) {}
48+
4249
virtual ~AckGroupingTracker() = default;
4350

4451
/**
@@ -59,20 +66,23 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
5966
/**
6067
* Adding message ID into ACK group for individual ACK.
6168
* @param[in] msgId ID of the message to be ACKed.
69+
* @param[in] callback the callback that is triggered when the message is acknowledged
6270
*/
63-
virtual void addAcknowledge(const MessageId& msgId) {}
71+
virtual void addAcknowledge(const MessageId& msgId, ResultCallback callback) {}
6472

6573
/**
6674
* Adding message ID list into ACK group for individual ACK.
6775
* @param[in] msgIds of the message to be ACKed.
76+
* @param[in] callback the callback that is triggered when these messages are acknowledged
6877
*/
69-
virtual void addAcknowledgeList(const MessageIdList& msgIds) {}
78+
virtual void addAcknowledgeList(const std::vector<MessageId>& msgIds, ResultCallback callback) {}
7079

7180
/**
7281
* Adding message ID into ACK group for cumulative ACK.
7382
* @param[in] msgId ID of the message to be ACKed.
83+
* @param[in] callback the callback that is triggered when the message is acknowledged
7484
*/
75-
virtual void addAcknowledgeCumulative(const MessageId& msgId) {}
85+
virtual void addAcknowledgeCumulative(const MessageId& msgId, ResultCallback callback) {}
7686

7787
/**
7888
* Flush all the pending grouped ACKs (as flush() does), and stop period ACKs sending.
@@ -91,27 +101,18 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
91101
virtual void flushAndClean() {}
92102

93103
protected:
94-
/**
95-
* Immediately send ACK request to broker.
96-
* @param[in] connWeakPtr weak pointer of the client connection.
97-
* @param[in] consumerId ID of the consumer that performs this ACK.
98-
* @param[in] msgId message ID to be ACKed.
99-
* @param[in] ackType ACK type, e.g. cumulative.
100-
* @return true if the ACK is sent successfully, otherwise false.
101-
*/
102-
bool doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId, const MessageId& msgId,
103-
CommandAck_AckType ackType);
104+
void doImmediateAck(ClientConnection& cnx, uint64_t requestId, const MessageId& msgId,
105+
ResultCallback callback, CommandAck_AckType ackType);
106+
void doImmediateAck(ClientConnection& cnx, uint64_t requestId,
107+
const std::map<MessageId, ResultCallback>& msgIdToCallback);
108+
std::pair<std::shared_ptr<ClientConnection>, uint64_t /* request id */> generateAckInfo() const;
109+
110+
private:
111+
const std::weak_ptr<ClientImpl> client_;
112+
const std::weak_ptr<HandlerBase> handler_;
113+
const uint64_t consumerId_;
114+
const bool waitResponse_;
104115

105-
/**
106-
* Immediately send a set of ACK requests one by one to the broker, it only supports individual
107-
* ACK.
108-
* @param[in] connWeakPtr weak pointer of the client connection.
109-
* @param[in] consumerId ID of the consumer that performs this ACK.
110-
* @param[in] msgIds message IDs to be ACKed.
111-
* @return true if the ACK is sent successfully, otherwise false.
112-
*/
113-
bool doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
114-
const std::set<MessageId>& msgIds);
115116
}; // class AckGroupingTracker
116117

117118
using AckGroupingTrackerPtr = std::shared_ptr<AckGroupingTracker>;

lib/AckGroupingTrackerDisabled.cc

Lines changed: 61 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,76 @@
2020
#include "AckGroupingTrackerDisabled.h"
2121

2222
#include "HandlerBase.h"
23-
#include "LogUtils.h"
2423
#include "ProtoApiEnums.h"
2524

2625
namespace pulsar {
2726

28-
DECLARE_LOG_OBJECT();
29-
30-
AckGroupingTrackerDisabled::AckGroupingTrackerDisabled(HandlerBase& handler, uint64_t consumerId)
31-
: AckGroupingTracker(), handler_(handler), consumerId_(consumerId) {
32-
LOG_INFO("ACK grouping is disabled.");
27+
void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId, ResultCallback callback) {
28+
auto pair = generateAckInfo();
29+
auto cnx = pair.first;
30+
if (!cnx) {
31+
if (callback) {
32+
callback(ResultNotConnected);
33+
}
34+
return;
35+
}
36+
auto requestId = pair.second;
37+
doImmediateAck(*cnx, requestId, msgId, callback, CommandAck_AckType_Individual);
3338
}
3439

35-
void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId) {
36-
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Individual);
40+
void AckGroupingTrackerDisabled::addAcknowledgeList(const std::vector<MessageId>& msgIds,
41+
ResultCallback callback) {
42+
// TODO:
3743
}
44+
// void AckGroupingTrackerDisabled::addAcknowledgeList(
45+
// const std::vector<std::pair<MessageId, ResultCallback>>& msgIdAndCallbacks) {
46+
// auto pair = generateAckInfo();
47+
// auto cnx = pair.first;
48+
// if (!cnx) {
49+
// for (auto&& kv : msgIdAndCallbacks) {
50+
// auto callback = kv.second;
51+
// if (callback) {
52+
// callback(ResultNotConnected);
53+
// }
54+
// }
55+
// return;
56+
// }
57+
//
58+
// // TODO: reuse the same logic in AckGroupingTrackerEnabled
59+
// auto requestId = pair.second;
60+
// std::map<MessageId, ResultCallback> msgIdToCallback;
61+
// for (auto&& kv : msgIdAndCallbacks) {
62+
// auto&& msgId = kv.first;
63+
// auto callback = kv.second;
64+
// auto pair = msgIdToCallback.emplace(msgId, callback);
65+
//
66+
// // `msgId` is already cached, combine `callback` with the existing callback
67+
// auto previousCallback = pair.first->second;
68+
// if (callback) {
69+
// auto previousCallback = pair.first->second;
70+
// pair.first->second = [previousCallback, callback](Result result) {
71+
// if (previousCallback) {
72+
// previousCallback(result);
73+
// }
74+
// callback(result);
75+
// };
76+
// }
77+
// }
78+
//
79+
// doImmediateAck(*cnx, requestId, msgIdToCallback);
80+
//}
3881

39-
void AckGroupingTrackerDisabled::addAcknowledgeList(const MessageIdList& msgIds) {
40-
std::set<MessageId> msgIdSet;
41-
for (auto&& msgId : msgIds) {
42-
msgIdSet.emplace(msgId);
82+
void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& msgId, ResultCallback callback) {
83+
auto pair = generateAckInfo();
84+
auto cnx = pair.first;
85+
if (!cnx) {
86+
if (callback) {
87+
callback(ResultNotConnected);
88+
}
89+
return;
4390
}
44-
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgIdSet);
45-
}
46-
47-
void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& msgId) {
48-
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Cumulative);
91+
auto requestId = pair.second;
92+
doImmediateAck(*cnx, requestId, msgId, callback, CommandAck_AckType_Cumulative);
4993
}
5094

5195
} // namespace pulsar

lib/AckGroupingTrackerDisabled.h

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,10 @@
1919
#ifndef LIB_ACKGROUPINGTRACKERDISABLED_H_
2020
#define LIB_ACKGROUPINGTRACKERDISABLED_H_
2121

22-
#include <cstdint>
23-
2422
#include "AckGroupingTracker.h"
2523

2624
namespace pulsar {
2725

28-
class HandlerBase;
29-
3026
/**
3127
* @class AckGroupingTrackerDisabled
3228
* ACK grouping tracker that does not tracker or group ACK requests. The ACK requests are diretly
@@ -36,23 +32,11 @@ class AckGroupingTrackerDisabled : public AckGroupingTracker {
3632
public:
3733
virtual ~AckGroupingTrackerDisabled() = default;
3834

39-
/**
40-
* Constructing ACK grouping tracker for peresistent topics that disabled ACK grouping.
41-
* @param[in] handler the connection handler.
42-
* @param[in] consumerId consumer ID that this tracker belongs to.
43-
*/
44-
AckGroupingTrackerDisabled(HandlerBase& handler, uint64_t consumerId);
45-
46-
void addAcknowledge(const MessageId& msgId) override;
47-
void addAcknowledgeList(const MessageIdList& msgIds) override;
48-
void addAcknowledgeCumulative(const MessageId& msgId) override;
49-
50-
private:
51-
//! The connection handler.
52-
HandlerBase& handler_;
35+
using AckGroupingTracker::AckGroupingTracker;
5336

54-
//! ID of the consumer that this tracker belongs to.
55-
uint64_t consumerId_;
37+
void addAcknowledge(const MessageId&, ResultCallback) override;
38+
void addAcknowledgeList(const std::vector<MessageId>& msgIds, ResultCallback callback) override;
39+
void addAcknowledgeCumulative(const MessageId&, ResultCallback) override;
5640
}; // class AckGroupingTrackerDisabled
5741

5842
} // namespace pulsar

0 commit comments

Comments
 (0)