Skip to content

Reprioritize responses of GetReplicationMessagesResponse in frontend #6696

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

82 changes: 66 additions & 16 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ package history
import (
"context"
"fmt"
"math/rand"
"slices"
"sync"
"time"

"go.uber.org/yarpc"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -54,6 +53,9 @@ type (
response *types.GetReplicationMessagesResponse
size int
peer string

// earliestCreationTime of replication tasks of response
earliestCreationTime *int64
}
)

Expand Down Expand Up @@ -774,9 +776,10 @@ func (c *clientImpl) GetReplicationMessages(
}
responseMutex.Lock()
peerResponses = append(peerResponses, &getReplicationMessagesWithSize{
response: resp,
size: responseInfo.Size,
peer: peer,
response: resp,
size: responseInfo.Size,
peer: peer,
earliestCreationTime: resp.GetEarliestCreationTime(),
})
responseMutex.Unlock()
return nil
Expand All @@ -787,16 +790,20 @@ func (c *clientImpl) GetReplicationMessages(
return nil, err
}

// Peers with largest responses can be slowest to return data.
// They end up in the end of array and have a possibility of not fitting in the response message.
// Skipped peers grow their responses even more and next they will be even slower and end up in the end again.
// This can lead to starving peers.
// Shuffle the slice of responses to prevent such scenario. All peer will have equal chance to be pick up first.
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := range peerResponses {
j := r.Intn(i + 1)
peerResponses[i], peerResponses[j] = peerResponses[j], peerResponses[i]
}
return c.buildGetReplicationMessagesResponse(peerResponses), nil
}

// buildGetReplicationMessagesResponse builds a new GetReplicationMessagesResponse from peer responses
// The response can be partial if the total size of the response exceeds the max size.
// In this case, responses with oldest replication tasks will be returned
func (c *clientImpl) buildGetReplicationMessagesResponse(peerResponses []*getReplicationMessagesWithSize) *types.GetReplicationMessagesResponse {
// Peers with large response can cause the response to exceed the max size.
// In this case, we need to skip some peer responses to make sure the result response size is within the limit.
// To prevent a replication lag in the future, we should return the response with the oldest replication task.
// So we sort the peer responses by the earliest creation time of the replication task.
// If the earliest creation time is the same, we compare the size of the response.
// This will sure that shards with the oldest replication tasks will be processed first.
sortGetReplicationMessageWithSize(peerResponses)

response := &types.GetReplicationMessagesResponse{MessagesByShard: make(map[int32]*types.ReplicationMessages)}
responseTotalSize := 0
Expand Down Expand Up @@ -825,7 +832,50 @@ func (c *clientImpl) GetReplicationMessages(
response.MessagesByShard[shardID] = tasks
}
}
return response, nil

return response
}

// sortGetReplicationMessageWithSize sorts the peer responses by the earliest creation time of the replication tasks
func sortGetReplicationMessageWithSize(peerResponses []*getReplicationMessagesWithSize) {
slices.SortStableFunc(peerResponses, cmpGetReplicationMessagesWithSize)
}

// cmpGetReplicationMessagesWithSize compares
// two getReplicationMessagesWithSize objects by earliest creation time
// it can be used as a comparison func for slices.SortStableFunc
// if a, b, or their earliestCreationTime is nil, slices.SortStableFunc will put them to the end of a slice
// otherwise it will compare the earliestCreationTime of the replication tasks
// if earliestCreationTime is equal, it will compare the size of the response
func cmpGetReplicationMessagesWithSize(a, b *getReplicationMessagesWithSize) int {
// a > b
if a == nil || a.earliestCreationTime == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the case when earliestCreationTime is nil?
If we often have this case, then shouldn't we sort randomly instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! As I understand from the code, CreationTime is initially non-pointer, but internally it changed to pointer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, in general, we do not expect nil-s and its more like precaution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we don't expect nil-s there, but want to be sure that a NPE will never happen

return 1
}
// a < b
if b == nil || b.earliestCreationTime == nil {
return -1
}

// if both are not nil, compare the creation time
if *a.earliestCreationTime < *b.earliestCreationTime {
return -1
}

if *a.earliestCreationTime > *b.earliestCreationTime {
return 1
}

// if both equal, compare the size
if a.size < b.size {
return -1
}

if a.size > b.size {
return 1
}

return 0
}

func (c *clientImpl) GetDLQReplicationMessages(
Expand Down
95 changes: 95 additions & 0 deletions client/history/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1297,3 +1297,98 @@ func TestClient_withNoResponse(t *testing.T) {
})
}
}

func Test_cmpGetReplicationMessagesWithSize(t *testing.T) {
for name, c := range map[string]struct {
a, b *getReplicationMessagesWithSize
want int
}{
"both nil": {
a: nil, b: nil, want: 1,
},
"a time is nil, b is nil": {
a: &getReplicationMessagesWithSize{earliestCreationTime: nil}, b: nil, want: 1,
},
"a time is not nil, b is nil": {
a: &getReplicationMessagesWithSize{earliestCreationTime: common.Int64Ptr(10)}, b: nil, want: -1,
},
"a time is not nil, b time is nil": {
a: &getReplicationMessagesWithSize{earliestCreationTime: common.Int64Ptr(10)},
b: &getReplicationMessagesWithSize{earliestCreationTime: nil},
want: -1,
},
"a time less b time": {
a: &getReplicationMessagesWithSize{earliestCreationTime: common.Int64Ptr(10)},
b: &getReplicationMessagesWithSize{earliestCreationTime: common.Int64Ptr(20)},
want: -1,
},
"a time greater b time": {
a: &getReplicationMessagesWithSize{earliestCreationTime: common.Int64Ptr(20)},
b: &getReplicationMessagesWithSize{earliestCreationTime: common.Int64Ptr(10)},
want: 1,
},
"a size less b size": {
a: &getReplicationMessagesWithSize{earliestCreationTime: common.Int64Ptr(10), size: 10},
b: &getReplicationMessagesWithSize{earliestCreationTime: common.Int64Ptr(10), size: 20},
want: -1,
},
"a size greater b size": {
a: &getReplicationMessagesWithSize{earliestCreationTime: common.Int64Ptr(10), size: 20},
b: &getReplicationMessagesWithSize{earliestCreationTime: common.Int64Ptr(10), size: 10},
want: 1,
},
"a equal b": {
a: &getReplicationMessagesWithSize{earliestCreationTime: common.Int64Ptr(10)},
b: &getReplicationMessagesWithSize{earliestCreationTime: common.Int64Ptr(10)},
want: 0,
},
} {
t.Run(name, func(t *testing.T) {
assert.Equal(t, c.want, cmpGetReplicationMessagesWithSize(c.a, c.b))
})
}
}

func Test_sortGetReplicationMessageWithSize(t *testing.T) {
for name, c := range map[string]struct {
responses []*getReplicationMessagesWithSize
want []*getReplicationMessagesWithSize
}{
"empty": {},
"multiple nil, non nil earliestCreationTime": {
responses: []*getReplicationMessagesWithSize{
{earliestCreationTime: nil},
{earliestCreationTime: nil},
{earliestCreationTime: common.Int64Ptr(20)},
{earliestCreationTime: common.Int64Ptr(10)},
},
want: []*getReplicationMessagesWithSize{
{earliestCreationTime: common.Int64Ptr(10)},
{earliestCreationTime: common.Int64Ptr(20)},
{earliestCreationTime: nil},
{earliestCreationTime: nil},
},
},
"multiple nil, non nil same earliestCreationTime, different size": {
responses: []*getReplicationMessagesWithSize{
{earliestCreationTime: nil},
{earliestCreationTime: nil},
{earliestCreationTime: common.Int64Ptr(100), size: 50},
{earliestCreationTime: common.Int64Ptr(100), size: 30},
{earliestCreationTime: common.Int64Ptr(20)},
},
want: []*getReplicationMessagesWithSize{
{earliestCreationTime: common.Int64Ptr(20)},
{earliestCreationTime: common.Int64Ptr(100), size: 30},
{earliestCreationTime: common.Int64Ptr(100), size: 50},
{earliestCreationTime: nil},
{earliestCreationTime: nil},
},
},
} {
t.Run(name, func(t *testing.T) {
sortGetReplicationMessageWithSize(c.responses)
assert.Equal(t, c.want, c.responses)
})
}
}
57 changes: 57 additions & 0 deletions common/types/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,35 @@ func (v *GetReplicationMessagesResponse) GetMessagesByShard() (o map[int32]*Repl
return
}

// GetEarliestCreationTime returns the earliest creation time of replication tasks if it exists, otherwise return nil
func (v *GetReplicationMessagesResponse) GetEarliestCreationTime() *int64 {
if v == nil {
return nil
}
Comment on lines +310 to +312
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very unusual to check for reciver-ptr == nil. Is there an expected code-path which continues if * GetReplicationMessagesResponse is nil and operates with it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All other methods of the structure do the same check. I assume that the structure can be nil in this case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Makes total sense for consistency.


var earliestTime *int64
for _, messages := range v.MessagesByShard {

creationTime := messages.GetEarliestCreationTime()
if creationTime == nil {
continue
}

if earliestTime == nil || *creationTime < *earliestTime {
earliestTime = creationTime
}
}

if earliestTime == nil {
return nil
}

// avoid returning a pointer to the internal value
// for immutability
result := *earliestTime
return &result
}

// HistoryTaskV2Attributes is an internal type (TBD...)
type HistoryTaskV2Attributes struct {
DomainID string `json:"domainId,omitempty"`
Expand Down Expand Up @@ -592,6 +621,34 @@ func (v *ReplicationMessages) GetSyncShardStatus() (o *SyncShardStatus) {
return
}

// GetEarliestCreationTime returns the earliest message time in the replication tasks if it exists
// otherwise return nil
func (v *ReplicationMessages) GetEarliestCreationTime() *int64 {
if v == nil {
return nil
}

var earliestTime *int64
for _, task := range v.GetReplicationTasks() {
if task.CreationTime == nil {
continue
}

if earliestTime == nil || *task.CreationTime < *earliestTime {
earliestTime = task.CreationTime
}
}

if earliestTime == nil {
return nil
}

// avoid returning a pointer to the internal value
// for immutability
result := *earliestTime
return &result
}

// ReplicationTask is an internal type (TBD...)
type ReplicationTask struct {
TaskType *ReplicationTaskType `json:"taskType,omitempty"`
Expand Down
68 changes: 68 additions & 0 deletions common/types/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/thriftrw/ptr"
)

func TestDLQType_Ptr(t *testing.T) {
Expand Down Expand Up @@ -383,6 +384,40 @@ func TestGetReplicationMessagesResponse_GetMessagesByShard(t *testing.T) {
assert.Nil(t, res)
}

func TestGetReplicationMessagesResponse_GetEarliestCreationTime(t *testing.T) {
for name, c := range map[string]struct {
response *GetReplicationMessagesResponse
want *int64
}{
"nil": {response: nil, want: nil},
"zero messages": {response: &GetReplicationMessagesResponse{}, want: nil},
"no messages with creation time": {
response: &GetReplicationMessagesResponse{
MessagesByShard: map[int32]*ReplicationMessages{
1: {ReplicationTasks: []*ReplicationTask{{}, {}, {}}},
2: {ReplicationTasks: []*ReplicationTask{{}, {}, {}}},
},
},
},
"a few messages with creation time": {
response: &GetReplicationMessagesResponse{
MessagesByShard: map[int32]*ReplicationMessages{
1: {ReplicationTasks: []*ReplicationTask{
{}, {CreationTime: ptr.Int64(20)}, {CreationTime: ptr.Int64(1000)}},
},
2: {ReplicationTasks: []*ReplicationTask{
{CreationTime: ptr.Int64(10)}, {}, {CreationTime: ptr.Int64(12000)}}},
},
},
want: ptrInt64(10),
},
} {
t.Run(name, func(t *testing.T) {
assert.Equal(t, c.want, c.response.GetEarliestCreationTime())
})
}
}

func TestCountDLQMessagesResponse(t *testing.T) {
history := map[HistoryDLQCountKey]int64{
{ShardID: 1, SourceCluster: "cluster-1"}: 100,
Expand Down Expand Up @@ -634,6 +669,39 @@ func TestReplicationMessages_GetSyncShardStatus(t *testing.T) {
assert.Nil(t, res)
}

func TestReplicationMessages_GetEarliestCreationTime(t *testing.T) {
for name, c := range map[string]struct {
msgs *ReplicationMessages
want *int64
}{
"nil": {msgs: nil, want: nil},
"zero tasks": {msgs: &ReplicationMessages{}, want: nil},
"no tasks with creation time": {
msgs: &ReplicationMessages{
ReplicationTasks: []*ReplicationTask{
{}, {}, {},
},
},
want: nil,
},
"a few tasks with creation time": {
msgs: &ReplicationMessages{
ReplicationTasks: []*ReplicationTask{
{CreationTime: ptr.Int64(50)},
{CreationTime: ptr.Int64(20)},
{}, {}, {},
{CreationTime: ptr.Int64(100)},
},
},
want: ptr.Int64(20),
},
} {
t.Run(name, func(t *testing.T) {
assert.Equal(t, c.want, c.msgs.GetEarliestCreationTime())
})
}
}

func TestReplicationTask_GetTaskType(t *testing.T) {
taskType := ReplicationTaskTypeDomain
testStruct := ReplicationTask{
Expand Down