Skip to content

Support waiting for ACK response #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions include/pulsar/ConsumerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,22 @@ class PULSAR_PUBLIC ConsumerConfiguration {

const std::vector<ConsumerInterceptorPtr>& getInterceptors() const;

/**
* Whether to receive the ACK receipt from broker.
*
* By default, when Consumer::acknowledge is called, it won't wait until the corresponding response from
* broker. After it's enabled, the `acknowledge` method will return a Result that indicates if the
* acknowledgment succeeded.
*
* Default: false
*/
ConsumerConfiguration& setAckReceiptEnabled(bool ackReceiptEnabled);

/**
* The associated getter of setAckReceiptEnabled.
*/
bool isAckReceiptEnabled() const;

friend class PulsarWrapper;
friend class PulsarFriend;

Expand Down
89 changes: 59 additions & 30 deletions lib/AckGroupingTracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

#include "AckGroupingTracker.h"

#include <atomic>
#include <limits>

#include "BitSet.h"
#include "ClientConnection.h"
#include "Commands.h"
Expand All @@ -29,24 +32,33 @@ namespace pulsar {

DECLARE_LOG_OBJECT();

inline void sendAck(ClientConnectionPtr cnx, uint64_t consumerId, const MessageId& msgId,
CommandAck_AckType ackType) {
const auto& bitSet = Commands::getMessageIdImpl(msgId)->getBitSet();
auto cmd = Commands::newAck(consumerId, msgId.ledgerId(), msgId.entryId(), bitSet, ackType, -1);
cnx->sendCommand(cmd);
LOG_DEBUG("ACK request is sent for message - [" << msgId.ledgerId() << ", " << msgId.entryId() << "]");
}

bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
const MessageId& msgId, CommandAck_AckType ackType) {
auto cnx = connWeakPtr.lock();
if (cnx == nullptr) {
LOG_DEBUG("Connection is not ready, ACK failed for message - [" << msgId.ledgerId() << ", "
<< msgId.entryId() << "]");
return false;
void AckGroupingTracker::doImmediateAck(const MessageId& msgId, ResultCallback callback,
CommandAck_AckType ackType) const {
const auto cnx = connectionSupplier_();
if (!cnx) {
LOG_DEBUG("Connection is not ready, ACK failed for " << msgId);
if (callback) {
callback(ResultAlreadyClosed);
}
return;
}
const auto& ackSet = Commands::getMessageIdImpl(msgId)->getBitSet();
if (waitResponse_) {
const auto requestId = requestIdSupplier_();
cnx->sendRequestWithId(
Commands::newAck(consumerId_, msgId.ledgerId(), msgId.entryId(), ackSet, ackType, requestId),
requestId)
.addListener([callback](Result result, const ResponseData&) {
if (callback) {
callback(result);
}
});
} else {
cnx->sendCommand(Commands::newAck(consumerId_, msgId.ledgerId(), msgId.entryId(), ackSet, ackType));
if (callback) {
callback(ResultOk);
}
}
sendAck(cnx, consumerId, msgId, ackType);
return true;
}

static std::ostream& operator<<(std::ostream& os, const std::set<MessageId>& msgIds) {
Expand All @@ -62,25 +74,42 @@ static std::ostream& operator<<(std::ostream& os, const std::set<MessageId>& msg
return os;
}

bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
const std::set<MessageId>& msgIds) {
auto cnx = connWeakPtr.lock();
if (cnx == nullptr) {
LOG_DEBUG("Connection is not ready, ACK failed.");
return false;
void AckGroupingTracker::doImmediateAck(const std::set<MessageId>& msgIds, ResultCallback callback) const {
const auto cnx = connectionSupplier_();
if (!cnx) {
LOG_DEBUG("Connection is not ready, ACK failed for " << msgIds);
if (callback) {
callback(ResultAlreadyClosed);
}
return;
}

if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
auto cmd = Commands::newMultiMessageAck(consumerId, msgIds);
cnx->sendCommand(cmd);
LOG_DEBUG("ACK request is sent for " << msgIds.size() << " messages: " << msgIds);
if (waitResponse_) {
const auto requestId = requestIdSupplier_();
cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, msgIds, requestId), requestId)
.addListener([callback](Result result, const ResponseData&) {
if (callback) {
callback(result);
}
});
} else {
cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, msgIds));
if (callback) {
callback(ResultOk);
}
}
} else {
// Broker does not support multi-message ACK, use multiple individual ACKs instead.
for (const auto& msgId : msgIds) {
sendAck(cnx, consumerId, msgId, CommandAck_AckType_Individual);
auto count = std::make_shared<std::atomic<size_t>>(msgIds.size());
auto wrappedCallback = [callback, count](Result result) {
if (--*count == 0 && callback) {
callback(result);
}
};
for (auto&& msgId : msgIds) {
doImmediateAck(msgId, wrappedCallback, CommandAck_AckType_Individual);
}
}
return true;
}

} // namespace pulsar
51 changes: 27 additions & 24 deletions lib/AckGroupingTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@
#define LIB_ACKGROUPINGTRACKER_H_

#include <pulsar/MessageId.h>
#include <pulsar/Result.h>

#include <cstdint>
#include <functional>
#include <set>

#include "ProtoApiEnums.h"

namespace pulsar {

class ClientConnection;
using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
using ResultCallback = std::function<void(Result)>;

/**
* @class AckGroupingTracker
Expand All @@ -38,7 +42,13 @@ using ClientConnectionWeakPtr = std::weak_ptr<ClientConnection>;
*/
class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracker> {
public:
AckGroupingTracker() = default;
AckGroupingTracker(std::function<ClientConnectionPtr()> connectionSupplier,
std::function<uint64_t()> requestIdSupplier, uint64_t consumerId, bool waitResponse)
: connectionSupplier_(connectionSupplier),
requestIdSupplier_(requestIdSupplier),
consumerId_(consumerId),
waitResponse_(waitResponse) {}

virtual ~AckGroupingTracker() = default;

/**
Expand All @@ -59,20 +69,23 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
/**
* Adding message ID into ACK group for individual ACK.
* @param[in] msgId ID of the message to be ACKed.
* @param[in] callback the callback that is triggered when the message is acknowledged
*/
virtual void addAcknowledge(const MessageId& msgId) {}
virtual void addAcknowledge(const MessageId& msgId, ResultCallback callback) {}

/**
* Adding message ID list into ACK group for individual ACK.
* @param[in] msgIds of the message to be ACKed.
* @param[in] callback the callback that is triggered when the messages are acknowledged
*/
virtual void addAcknowledgeList(const MessageIdList& msgIds) {}
virtual void addAcknowledgeList(const MessageIdList& msgIds, ResultCallback callback) {}

/**
* Adding message ID into ACK group for cumulative ACK.
* @param[in] msgId ID of the message to be ACKed.
* @param[in] callback the callback that is triggered when the message is acknowledged
*/
virtual void addAcknowledgeCumulative(const MessageId& msgId) {}
virtual void addAcknowledgeCumulative(const MessageId& msgId, ResultCallback callback) {}

/**
* Flush all the pending grouped ACKs (as flush() does), and stop period ACKs sending.
Expand All @@ -91,27 +104,17 @@ class AckGroupingTracker : public std::enable_shared_from_this<AckGroupingTracke
virtual void flushAndClean() {}

protected:
/**
* Immediately send ACK request to broker.
* @param[in] connWeakPtr weak pointer of the client connection.
* @param[in] consumerId ID of the consumer that performs this ACK.
* @param[in] msgId message ID to be ACKed.
* @param[in] ackType ACK type, e.g. cumulative.
* @return true if the ACK is sent successfully, otherwise false.
*/
bool doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId, const MessageId& msgId,
CommandAck_AckType ackType);
void doImmediateAck(const MessageId& msgId, ResultCallback callback, CommandAck_AckType ackType) const;
void doImmediateAck(const std::set<MessageId>& msgIds, ResultCallback callback) const;

private:
const std::function<ClientConnectionPtr()> connectionSupplier_;
const std::function<uint64_t()> requestIdSupplier_;
const uint64_t consumerId_;

protected:
const bool waitResponse_;

/**
* Immediately send a set of ACK requests one by one to the broker, it only supports individual
* ACK.
* @param[in] connWeakPtr weak pointer of the client connection.
* @param[in] consumerId ID of the consumer that performs this ACK.
* @param[in] msgIds message IDs to be ACKed.
* @return true if the ACK is sent successfully, otherwise false.
*/
bool doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
const std::set<MessageId>& msgIds);
}; // class AckGroupingTracker

using AckGroupingTrackerPtr = std::shared_ptr<AckGroupingTracker>;
Expand Down
20 changes: 6 additions & 14 deletions lib/AckGroupingTrackerDisabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,24 @@
#include "AckGroupingTrackerDisabled.h"

#include "HandlerBase.h"
#include "LogUtils.h"
#include "ProtoApiEnums.h"

namespace pulsar {

DECLARE_LOG_OBJECT();

AckGroupingTrackerDisabled::AckGroupingTrackerDisabled(HandlerBase& handler, uint64_t consumerId)
: AckGroupingTracker(), handler_(handler), consumerId_(consumerId) {
LOG_INFO("ACK grouping is disabled.");
}

void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId) {
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Individual);
void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId, ResultCallback callback) {
doImmediateAck(msgId, callback, CommandAck_AckType_Individual);
}

void AckGroupingTrackerDisabled::addAcknowledgeList(const MessageIdList& msgIds) {
void AckGroupingTrackerDisabled::addAcknowledgeList(const MessageIdList& msgIds, ResultCallback callback) {
std::set<MessageId> msgIdSet;
for (auto&& msgId : msgIds) {
msgIdSet.emplace(msgId);
}
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgIdSet);
doImmediateAck(msgIdSet, callback);
}

void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& msgId) {
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Cumulative);
void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& msgId, ResultCallback callback) {
doImmediateAck(msgId, callback, CommandAck_AckType_Cumulative);
}

} // namespace pulsar
21 changes: 4 additions & 17 deletions lib/AckGroupingTrackerDisabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,12 @@ class HandlerBase;
*/
class AckGroupingTrackerDisabled : public AckGroupingTracker {
public:
using AckGroupingTracker::AckGroupingTracker;
virtual ~AckGroupingTrackerDisabled() = default;

/**
* Constructing ACK grouping tracker for peresistent topics that disabled ACK grouping.
* @param[in] handler the connection handler.
* @param[in] consumerId consumer ID that this tracker belongs to.
*/
AckGroupingTrackerDisabled(HandlerBase& handler, uint64_t consumerId);

void addAcknowledge(const MessageId& msgId) override;
void addAcknowledgeList(const MessageIdList& msgIds) override;
void addAcknowledgeCumulative(const MessageId& msgId) override;

private:
//! The connection handler.
HandlerBase& handler_;

//! ID of the consumer that this tracker belongs to.
uint64_t consumerId_;
void addAcknowledge(const MessageId& msgId, ResultCallback callback) override;
void addAcknowledgeList(const MessageIdList& msgIds, ResultCallback callback) override;
void addAcknowledgeCumulative(const MessageId& msgId, ResultCallback callback) override;
}; // class AckGroupingTrackerDisabled

} // namespace pulsar
Expand Down
Loading