Skip to content

Commit b7ce3cd

Browse files
authored
Merge branch 'master' into ndyakov/token-based-auth
2 parents 45e5ee9 + 28a3c97 commit b7ce3cd

File tree

8 files changed

+79
-15
lines changed

8 files changed

+79
-15
lines changed

.github/workflows/spellcheck.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ jobs:
88
- name: Checkout
99
uses: actions/checkout@v4
1010
- name: Check Spelling
11-
uses: rojopolis/spellcheck-github-actions@0.48.0
11+
uses: rojopolis/spellcheck-github-actions@0.49.0
1212
with:
1313
config_path: .github/spellcheck-settings.yml
1414
task_name: Markdown

command.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2104,7 +2104,9 @@ type XInfoGroup struct {
21042104
Pending int64
21052105
LastDeliveredID string
21062106
EntriesRead int64
2107-
Lag int64
2107+
// Lag represents the number of pending messages in the stream not yet
2108+
// delivered to this consumer group. Returns -1 when the lag cannot be determined.
2109+
Lag int64
21082110
}
21092111

21102112
var _ Cmder = (*XInfoGroupsCmd)(nil)
@@ -2187,8 +2189,11 @@ func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error {
21872189

21882190
// lag: the number of entries in the stream that are still waiting to be delivered
21892191
// to the group's consumers, or a NULL(Nil) when that number can't be determined.
2192+
// In that case, we return -1.
21902193
if err != nil && err != Nil {
21912194
return err
2195+
} else if err == Nil {
2196+
group.Lag = -1
21922197
}
21932198
default:
21942199
return fmt.Errorf("redis: unexpected key %q in XINFO GROUPS reply", key)

commands_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6772,6 +6772,36 @@ var _ = Describe("Commands", func() {
67726772
}))
67736773
})
67746774

6775+
It("should return -1 for nil lag in XINFO GROUPS", func() {
6776+
_, err := client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-1", Values: []string{"foo", "1"}}).Result()
6777+
Expect(err).NotTo(HaveOccurred())
6778+
6779+
client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-2", Values: []string{"foo", "2"}})
6780+
Expect(err).NotTo(HaveOccurred())
6781+
client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-3", Values: []string{"foo", "3"}})
6782+
Expect(err).NotTo(HaveOccurred())
6783+
6784+
err = client.XGroupCreate(ctx, "s", "g", "0").Err()
6785+
Expect(err).NotTo(HaveOccurred())
6786+
err = client.XReadGroup(ctx, &redis.XReadGroupArgs{Group: "g", Consumer: "c", Streams: []string{"s", ">"}, Count: 1, Block: -1, NoAck: false}).Err()
6787+
Expect(err).NotTo(HaveOccurred())
6788+
6789+
client.XDel(ctx, "s", "0-2")
6790+
6791+
res, err := client.XInfoGroups(ctx, "s").Result()
6792+
Expect(err).NotTo(HaveOccurred())
6793+
Expect(res).To(Equal([]redis.XInfoGroup{
6794+
{
6795+
Name: "g",
6796+
Consumers: 1,
6797+
Pending: 1,
6798+
LastDeliveredID: "0-1",
6799+
EntriesRead: 1,
6800+
Lag: -1, // nil lag from Redis is reported as -1
6801+
},
6802+
}))
6803+
})
6804+
67756805
It("should XINFO CONSUMERS", func() {
67766806
res, err := client.XInfoConsumers(ctx, "stream", "group1").Result()
67776807
Expect(err).NotTo(HaveOccurred())

options.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,9 @@ func (opt *Options) init() {
229229
opt.Network = "tcp"
230230
}
231231
}
232+
if opt.Protocol < 2 {
233+
opt.Protocol = 3
234+
}
232235
if opt.DialTimeout == 0 {
233236
opt.DialTimeout = 5 * time.Second
234237
}

options_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,3 +222,26 @@ func TestReadTimeoutOptions(t *testing.T) {
222222
}
223223
}
224224
}
225+
226+
func TestProtocolOptions(t *testing.T) {
227+
testCasesMap := map[int]int{
228+
0: 3,
229+
1: 3,
230+
2: 2,
231+
3: 3,
232+
}
233+
234+
o := &Options{}
235+
o.init()
236+
if o.Protocol != 3 {
237+
t.Errorf("got %d instead of %d as protocol option", o.Protocol, 3)
238+
}
239+
240+
for set, want := range testCasesMap {
241+
o := &Options{Protocol: set}
242+
o.init()
243+
if o.Protocol != want {
244+
t.Errorf("got %d instead of %d as protocol option", o.Protocol, want)
245+
}
246+
}
247+
}

osscluster.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,14 @@ func (n *clusterNode) SetLastLatencyMeasurement(t time.Time) {
448448
}
449449
}
450450

451+
func (n *clusterNode) Loading() bool {
452+
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
453+
defer cancel()
454+
455+
err := n.Client.Ping(ctx).Err()
456+
return err != nil && isLoadingError(err)
457+
}
458+
451459
//------------------------------------------------------------------------------
452460

453461
type clusterNodes struct {
@@ -757,7 +765,8 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
757765
case 1:
758766
return nodes[0], nil
759767
case 2:
760-
if slave := nodes[1]; !slave.Failing() {
768+
slave := nodes[1]
769+
if !slave.Failing() && !slave.Loading() {
761770
return slave, nil
762771
}
763772
return nodes[0], nil
@@ -766,7 +775,7 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
766775
for i := 0; i < 10; i++ {
767776
n := rand.Intn(len(nodes)-1) + 1
768777
slave = nodes[n]
769-
if !slave.Failing() {
778+
if !slave.Failing() && !slave.Loading() {
770779
return slave, nil
771780
}
772781
}
@@ -1501,7 +1510,7 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
15011510
return err
15021511
}
15031512

1504-
cmdsMap := c.mapCmdsBySlot(ctx, cmds)
1513+
cmdsMap := c.mapCmdsBySlot(cmds)
15051514
for slot, cmds := range cmdsMap {
15061515
node, err := state.slotMasterNode(slot)
15071516
if err != nil {
@@ -1540,7 +1549,7 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
15401549
return cmdsFirstErr(cmds)
15411550
}
15421551

1543-
func (c *ClusterClient) mapCmdsBySlot(ctx context.Context, cmds []Cmder) map[int][]Cmder {
1552+
func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
15441553
cmdsMap := make(map[int][]Cmder)
15451554
for _, cmd := range cmds {
15461555
slot := c.cmdSlot(cmd)

redis.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -360,12 +360,6 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
360360
connPool := pool.NewSingleConnPool(c.connPool, cn)
361361
conn := newConn(c.opt, connPool, &c.hooksMixin)
362362

363-
protocol := c.opt.Protocol
364-
// By default, use RESP3 in current version.
365-
if protocol < 2 {
366-
protocol = 3
367-
}
368-
369363
username, password := "", ""
370364
if c.opt.StreamingCredentialsProvider != nil {
371365
credentials, unsubscribeFromCredentialsProvider, err := c.opt.StreamingCredentialsProvider.
@@ -389,7 +383,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
389383

390384
// for redis-server versions that do not support the HELLO command,
391385
// RESP2 will continue to be used.
392-
if err = conn.Hello(ctx, protocol, username, password, c.opt.ClientName).Err(); err == nil {
386+
if err = conn.Hello(ctx, c.opt.Protocol, username, password, c.opt.ClientName).Err(); err == nil {
393387
// Authentication successful with HELLO command
394388
} else if !isRedisError(err) {
395389
// When the server responds with the RESP protocol and the result is not a normal

ring.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,7 @@ func (c *Ring) cmdsInfo(ctx context.Context) (map[string]*CommandInfo, error) {
702702
return nil, firstErr
703703
}
704704

705-
func (c *Ring) cmdShard(ctx context.Context, cmd Cmder) (*ringShard, error) {
705+
func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
706706
pos := cmdFirstKeyPos(cmd)
707707
if pos == 0 {
708708
return c.sharding.Random()
@@ -720,7 +720,7 @@ func (c *Ring) process(ctx context.Context, cmd Cmder) error {
720720
}
721721
}
722722

723-
shard, err := c.cmdShard(ctx, cmd)
723+
shard, err := c.cmdShard(cmd)
724724
if err != nil {
725725
return err
726726
}

0 commit comments

Comments
 (0)