Skip to content

[ISSUE #8979] Add configurable switch for timer message retry logic #8980

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public class MessageStoreConfig {
private boolean timerSkipUnknownError = false;
private boolean timerWarmEnable = false;
private boolean timerStopDequeue = false;
private boolean timerEnableRetryUntilSuccess = false;
private int timerCongestNumEachSlot = Integer.MAX_VALUE;

private int timerMetricSmallThreshold = 1000000;
Expand Down Expand Up @@ -1681,6 +1682,14 @@ public void setTimerSkipUnknownError(boolean timerSkipUnknownError) {
this.timerSkipUnknownError = timerSkipUnknownError;
}

public boolean isTimerEnableRetryUntilSuccess() {
return timerEnableRetryUntilSuccess;
}

public void setTimerEnableRetryUntilSuccess(boolean timerEnableRetryUntilSuccess) {
this.timerEnableRetryUntilSuccess = timerEnableRetryUntilSuccess;
}

public boolean isTimerWarmEnable() {
return timerWarmEnable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1084,46 +1084,44 @@ public int doPut(MessageExtBrokerInner message, boolean roll) throws Exception {
putMessageResult = messageStore.putMessage(message);
}

int retryNum = 0;
while (retryNum < 3) {
if (null == putMessageResult || null == putMessageResult.getPutMessageStatus()) {
retryNum++;
} else {
switch (putMessageResult.getPutMessageStatus()) {
case PUT_OK:
if (brokerStatsManager != null) {
this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1);
if (putMessageResult.getAppendMessageResult() != null) {
this.brokerStatsManager.incTopicPutSize(message.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
}
this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1);
if (putMessageResult != null && putMessageResult.getPutMessageStatus() != null) {
switch (putMessageResult.getPutMessageStatus()) {
case PUT_OK:
if (brokerStatsManager != null) {
brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1);
if (putMessageResult.getAppendMessageResult() != null) {
brokerStatsManager.incTopicPutSize(message.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
}
return PUT_OK;
case SERVICE_NOT_AVAILABLE:
return PUT_NEED_RETRY;
case MESSAGE_ILLEGAL:
case PROPERTIES_SIZE_EXCEEDED:
brokerStatsManager.incBrokerPutNums(message.getTopic(), 1);
}
return PUT_OK;

case MESSAGE_ILLEGAL:
case PROPERTIES_SIZE_EXCEEDED:
case WHEEL_TIMER_NOT_ENABLE:
case WHEEL_TIMER_MSG_ILLEGAL:
return PUT_NO_RETRY;

case SERVICE_NOT_AVAILABLE:
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case OS_PAGE_CACHE_BUSY:
case CREATE_MAPPED_FILE_FAILED:
case SLAVE_NOT_AVAILABLE:
return PUT_NEED_RETRY;

case UNKNOWN_ERROR:
default:
if (storeConfig.isTimerSkipUnknownError()) {
LOGGER.warn("Skipping message due to unknown error, msg: {}", message);
return PUT_NO_RETRY;
case CREATE_MAPPED_FILE_FAILED:
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case OS_PAGE_CACHE_BUSY:
case SLAVE_NOT_AVAILABLE:
case UNKNOWN_ERROR:
default:
retryNum++;
}
}
Thread.sleep(50);
if (escapeBridgeHook != null) {
putMessageResult = escapeBridgeHook.apply(message);
} else {
putMessageResult = messageStore.putMessage(message);
} else {
holdMomentForUnknownError();
return PUT_NEED_RETRY;
}
}
LOGGER.warn("Retrying to do put timer msg retryNum:{} putRes:{} msg:{}", retryNum, putMessageResult, message);
}
return PUT_NO_RETRY;
return PUT_NEED_RETRY;
}

public MessageExtBrokerInner convertMessage(MessageExt msgExt, boolean needRoll) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be beneficial to include a unit test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve added a unit test to cover this function. Please take a look👀

Expand Down Expand Up @@ -1458,7 +1456,6 @@ protected boolean isState(int state) {
}

public class TimerDequeuePutMessageService extends AbstractStateService {

@Override
public String getServiceName() {
return getServiceThreadName() + this.getClass().getSimpleName();
Expand All @@ -1468,48 +1465,71 @@ public String getServiceName() {
public void run() {
setState(AbstractStateService.START);
TimerMessageStore.LOGGER.info(this.getServiceName() + " service start");

while (!this.isStopped() || dequeuePutQueue.size() != 0) {
try {
setState(AbstractStateService.WAITING);
TimerRequest tr = dequeuePutQueue.poll(10, TimeUnit.MILLISECONDS);
if (null == tr) {
continue;
}

setState(AbstractStateService.RUNNING);
boolean doRes = false;
boolean tmpDequeueChangeFlag = false;

try {
while (!isStopped() && !doRes) {
while (!isStopped()) {
if (!isRunningDequeue()) {
dequeueStatusChangeFlag = true;
tmpDequeueChangeFlag = true;
break;
}

try {
perfCounterTicks.startTick(DEQUEUE_PUT);

MessageExt msgExt = tr.getMsg();
DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(msgExt));

if (tr.getEnqueueTime() == Long.MAX_VALUE) {
// never enqueue, mark it.
// Never enqueue, mark it.
MessageAccessor.putProperty(msgExt, TIMER_ENQUEUE_MS, String.valueOf(Long.MAX_VALUE));
}

addMetric(msgExt, -1);
MessageExtBrokerInner msg = convert(msgExt, tr.getEnqueueTime(), needRoll(tr.getMagic()));
doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
while (!doRes && !isStopped()) {
if (!isRunningDequeue()) {
dequeueStatusChangeFlag = true;
tmpDequeueChangeFlag = true;
break;

boolean processed = false;
int retryCount = 0;

while (!processed && !isStopped()) {
int result = doPut(msg, needRoll(tr.getMagic()));

if (result == PUT_OK) {
processed = true;
} else if (result == PUT_NO_RETRY) {
TimerMessageStore.LOGGER.warn("Skipping message due to unrecoverable error. Msg: {}", msg);
processed = true;
} else {
retryCount++;
// Without enabling TimerEnableRetryUntilSuccess, messages will retry up to 3 times before being discarded
if (!storeConfig.isTimerEnableRetryUntilSuccess() && retryCount >= 3) {
TimerMessageStore.LOGGER.error("Message processing failed after {} retries. Msg: {}", retryCount, msg);
processed = true;
} else {
Thread.sleep(500L * precisionMs / 1000);
TimerMessageStore.LOGGER.warn("Retrying to process message. Retry count: {}, Msg: {}", retryCount, msg);
}
}
doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
Thread.sleep(500L * precisionMs / 1000);
}

perfCounterTicks.endTick(DEQUEUE_PUT);
break;

} catch (Throwable t) {
LOGGER.info("Unknown error", t);
TimerMessageStore.LOGGER.info("Unknown error", t);
if (storeConfig.isTimerSkipUnknownError()) {
doRes = true;
break;
} else {
holdMomentForUnknownError();
}
Expand All @@ -1518,7 +1538,6 @@ public void run() {
} finally {
tr.idempotentRelease(!tmpDequeueChangeFlag);
}

} catch (Throwable e) {
TimerMessageStore.LOGGER.error("Error occurred in " + getServiceName(), e);
}
Expand Down
Loading
Loading