@@ -24,6 +24,7 @@ import (
24
24
"context"
25
25
"errors"
26
26
"fmt"
27
+ "slices"
27
28
"sync"
28
29
"sync/atomic"
29
30
"time"
@@ -1558,20 +1559,34 @@ func (h *handlerImpl) GetReplicationMessages(
1558
1559
return nil , constants .ErrShuttingDown
1559
1560
}
1560
1561
1562
+ msgs := h .getReplicationShardMessages (ctx , request )
1563
+ response := h .buildGetReplicationMessagesResponse (metricsScope , msgs )
1564
+
1565
+ h .GetLogger ().Debug ("GetReplicationMessages succeeded." )
1566
+ return response , nil
1567
+ }
1568
+
1569
+ // getReplicationShardMessages gets replication messages from all the shards of the request
1570
+ // it queries the replication tasks from each shard in parallel
1571
+ // and returns the replication tasks in the order of the request tokens
1572
+ func (h * handlerImpl ) getReplicationShardMessages (
1573
+ ctx context.Context ,
1574
+ request * types.GetReplicationMessagesRequest ,
1575
+ ) []replicationShardMessages {
1561
1576
var wg sync.WaitGroup
1562
- wg .Add (len (request .Tokens ))
1563
- result := new (sync.Map )
1577
+ var results = make ([]replicationShardMessages , len (request .Tokens ))
1564
1578
1565
- for _ , token := range request .Tokens {
1566
- go func (token * types.ReplicationToken ) {
1579
+ wg .Add (len (request .Tokens ))
1580
+ for i , token := range request .Tokens {
1581
+ go func (i int , token * types.ReplicationToken ) {
1567
1582
defer wg .Done ()
1568
1583
1569
1584
engine , err := h .controller .GetEngineForShard (int (token .GetShardID ()))
1570
1585
if err != nil {
1571
1586
h .GetLogger ().Warn ("History engine not found for shard" , tag .Error (err ))
1572
1587
return
1573
1588
}
1574
- tasks , err := engine .GetReplicationMessages (
1589
+ msgs , err := engine .GetReplicationMessages (
1575
1590
ctx ,
1576
1591
request .GetClusterName (),
1577
1592
token .GetLastRetrievedMessageID (),
@@ -1581,42 +1596,109 @@ func (h *handlerImpl) GetReplicationMessages(
1581
1596
return
1582
1597
}
1583
1598
1584
- result .Store (token .GetShardID (), tasks )
1585
- }(token )
1599
+ results [i ] = replicationShardMessages {
1600
+ ReplicationMessages : msgs ,
1601
+ shardID : token .GetShardID (),
1602
+ size : proto .FromReplicationMessages (msgs ).Size (),
1603
+ earliestCreationTime : msgs .GetEarliestCreationTime (),
1604
+ }
1605
+ }(i , token )
1586
1606
}
1587
1607
1588
1608
wg .Wait ()
1609
+ return results
1610
+ }
1589
1611
1590
- responseSize := 0
1591
- maxResponseSize := h .config .MaxResponseSize
1592
-
1593
- messagesByShard := make (map [int32 ]* types.ReplicationMessages )
1594
- result .Range (func (key , value interface {}) bool {
1595
- shardID := key .(int32 )
1596
- tasks := value .(* types.ReplicationMessages )
1612
+ // buildGetReplicationMessagesResponse builds a new GetReplicationMessagesResponse from shard results
1613
+ // The response can be partial if the total size of the response exceeds the max size.
1614
+ // In this case, responses with oldest replication tasks will be returned
1615
+ func (h * handlerImpl ) buildGetReplicationMessagesResponse (metricsScope metrics.Scope , msgs []replicationShardMessages ) * types.GetReplicationMessagesResponse {
1616
+ // Shards with large maessages can cause the response to exceed the max size.
1617
+ // In this case, we need to skip some shard messages to make sure the result response size is within the limit.
1618
+ // To prevent a replication lag in the future, we should return the messages with the oldest replication task.
1619
+ // So we sort the shard messages by the earliest creation time of the replication task.
1620
+ // If the earliest creation time is the same, we compare the size of the message.
1621
+ // This will sure that shards with the oldest replication tasks will be processed first.
1622
+ sortReplicationShardMessages (msgs )
1623
+
1624
+ var (
1625
+ responseSize = 0
1626
+ maxResponseSize = h .config .MaxResponseSize
1627
+ messagesByShard = make (map [int32 ]* types.ReplicationMessages , len (msgs ))
1628
+ )
1597
1629
1598
- size := proto . FromReplicationMessages ( tasks ). Size ()
1599
- if (responseSize + size ) >= maxResponseSize {
1600
- metricsScope .Tagged (metrics .ShardIDTag (int (shardID ))).IncCounter (metrics .ReplicationMessageTooLargePerShard )
1630
+ for _ , m := range msgs {
1631
+ if (responseSize + m . size ) >= maxResponseSize {
1632
+ metricsScope .Tagged (metrics .ShardIDTag (int (m . shardID ))).IncCounter (metrics .ReplicationMessageTooLargePerShard )
1601
1633
1602
1634
// Log shards that did not fit for debugging purposes
1603
1635
h .GetLogger ().Warn ("Replication messages did not fit in the response (history host)" ,
1604
- tag .ShardID (int (shardID )),
1605
- tag .ResponseSize (size ),
1636
+ tag .ShardID (int (m . shardID )),
1637
+ tag .ResponseSize (m . size ),
1606
1638
tag .ResponseTotalSize (responseSize ),
1607
1639
tag .ResponseMaxSize (maxResponseSize ),
1608
1640
)
1609
- } else {
1610
- responseSize += size
1611
- messagesByShard [shardID ] = tasks
1641
+
1642
+ continue
1612
1643
}
1644
+ responseSize += m .size
1645
+ messagesByShard [m .shardID ] = m .ReplicationMessages
1646
+ }
1647
+ return & types.GetReplicationMessagesResponse {MessagesByShard : messagesByShard }
1648
+ }
1613
1649
1614
- return true
1615
- })
1650
+ // replicationShardMessages wraps types.ReplicationMessages
1651
+ // and contains some metadata of the ReplicationMessages
1652
+ type replicationShardMessages struct {
1653
+ * types.ReplicationMessages
1654
+ // shardID of the ReplicationMessages
1655
+ shardID int32
1656
+ // size of proto payload of ReplicationMessages
1657
+ size int
1658
+ // earliestCreationTime of ReplicationMessages
1659
+ earliestCreationTime * int64
1660
+ }
1616
1661
1617
- h .GetLogger ().Debug ("GetReplicationMessages succeeded." )
1662
+ // sortReplicationShardMessages sorts the peer responses by the earliest creation time of the replication tasks
1663
+ func sortReplicationShardMessages (msgs []replicationShardMessages ) {
1664
+ slices .SortStableFunc (msgs , cmpReplicationShardMessages )
1665
+ }
1666
+
1667
+ // cmpReplicationShardMessages compares
1668
+ // two replicationShardMessages objects by earliest creation time
1669
+ // it can be used as a comparison func for slices.SortStableFunc
1670
+ // if a's or b's earliestCreationTime is nil, slices.SortStableFunc will put them to the end of a slice
1671
+ // otherwise it will compare the earliestCreationTime of the replication tasks
1672
+ // if earliestCreationTime is equal, it will compare the size of the response
1673
+ func cmpReplicationShardMessages (a , b replicationShardMessages ) int {
1674
+ // a > b
1675
+ if a .earliestCreationTime == nil {
1676
+ return 1
1677
+ }
1678
+ // a < b
1679
+ if b .earliestCreationTime == nil {
1680
+ return - 1
1681
+ }
1682
+
1683
+ // if both are not nil, compare the creation time
1684
+ if * a .earliestCreationTime < * b .earliestCreationTime {
1685
+ return - 1
1686
+ }
1687
+
1688
+ if * a .earliestCreationTime > * b .earliestCreationTime {
1689
+ return 1
1690
+ }
1691
+
1692
+ // if both equal, compare the size
1693
+ if a .size < b .size {
1694
+ return - 1
1695
+ }
1696
+
1697
+ if a .size > b .size {
1698
+ return 1
1699
+ }
1618
1700
1619
- return & types. GetReplicationMessagesResponse { MessagesByShard : messagesByShard }, nil
1701
+ return 0
1620
1702
}
1621
1703
1622
1704
// GetDLQReplicationMessages is called by remote peers to get replicated messages for DLQ merging
0 commit comments