Skip to content

ClusterClient does not retry TxPipeline on EOF when reading response. #2954

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
marcind opened this issue Apr 1, 2024 · 1 comment
Open

Comments

@marcind
Copy link

marcind commented Apr 1, 2024

There are occasions when a connection handling a TxPipeline gets closed unexpectedly (maybe because the remote server shut down). Despite the pool's attempts to discard bad idle connections, this validation is not 100% reliable (since the connection can be closed after it's returned from the pool and while it's being used). In my experience this results in an EOF error not while writing the commands on the connection but only once attempting to read the result.

Expected Behavior

When using a ClusterClient.TxPipeline, pipelines should be retried when an EOF (or another retryable error) is returned while reading the pipeline responses.

Current Behavior

ClusterClient.TxPipeline will retry if an EOF is observed while writing the commands to the connection:

go-redis/osscluster.go

Lines 1497 to 1505 in f3fe611

if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds)
}); err != nil {
if shouldRetry(err, true) {
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
}
setCmdsErr(cmds, err)
return err
}

However, when reading the response from the connection in

go-redis/osscluster.go

Lines 1508 to 1525 in f3fe611

statusCmd := cmds[0].(*StatusCmd)
// Trim multi and exec.
trimmedCmds := cmds[1 : len(cmds)-1]
if err := c.txPipelineReadQueued(
ctx, rd, statusCmd, trimmedCmds, failedCmds,
); err != nil {
setCmdsErr(cmds, err)
moved, ask, addr := isMovedError(err)
if moved || ask {
return c.cmdsMoved(ctx, trimmedCmds, moved, ask, addr, failedCmds)
}
return err
}
return pipelineReadCmds(rd, trimmedCmds)
and

go-redis/osscluster.go

Lines 1537 to 1547 in f3fe611

if err := statusCmd.readReply(rd); err != nil {
return err
}
for _, cmd := range cmds {
err := statusCmd.readReply(rd)
if err == nil || c.checkMovedErr(ctx, cmd, err, failedCmds) || isRedisError(err) {
continue
}
return err
}
.

In particular, there may be an EOF in these circumstances:

However, only the last case handles retryable errors and updates failedCmds to trigger the retry machinery:

go-redis/osscluster.go

Lines 1354 to 1360 in f3fe611

if !isRedisError(err) {
if shouldRetry(err, true) {
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
}
setCmdsErr(cmds[i+1:], err)
return err
}

Possible Solution

Update txPipelineReadQueued and the error handling for it in processTxPipelineNodeConn to account for retryable errors and update the failedCmds parameter appropriately.

Steps to Reproduce

Let me know if there's appetite for addressing this and I can work on a repro.

Context (Environment)

Redis Server: 7.1
go-redis client: 9.5.1

Detailed Description

Possible Implementation

@ljluestc
Copy link


package redisclient

import (
	"context"
	"errors"
	"io"
	"log"
	"time"

	"github.com/go-redis/redis/v9"
)

// CustomClusterClient wraps redis.ClusterClient with enhanced TxPipeline retry logic
type CustomClusterClient struct {
	*redis.ClusterClient
	maxRetries int
	retryDelay time.Duration
}

// NewCustomClusterClient creates a new cluster client with retry capabilities
func NewCustomClusterClient(options *redis.ClusterOptions) *CustomClusterClient {
	return &CustomClusterClient{
		ClusterClient: redis.NewClusterClient(options),
		maxRetries:    3,               // Configurable max retries
		retryDelay:    100 * time.Millisecond, // Configurable delay between retries
	}
}

// TxPipelineWithRetry enhances TxPipeline with retry logic for read failures
func (c *CustomClusterClient) TxPipelineWithRetry(ctx context.Context, fn func(*redis.TxPipe) error) ([]redis.Cmder, error) {
	var lastErr error
	for attempt := 0; attempt <= c.maxRetries; attempt++ {
		if attempt > 0 {
			log.Printf("Retrying TxPipeline, attempt %d/%d due to: %v", attempt, c.maxRetries, lastErr)
			time.Sleep(c.retryDelay)
		}

		pipe := c.TxPipeline()
		err := fn(pipe)
		if err != nil {
			return nil, err // Return immediately if the function itself fails
		}

		cmds, err := pipe.Exec(ctx)
		if err == nil {
			return cmds, nil // Success!
		}

		// Check if error is retryable
		if !isRetryableError(err) {
			return cmds, err // Non-retryable error, return immediately
		}

		lastErr = err
		// Reset pipeline state for retry
		pipe = c.TxPipeline()
	}

	return nil, lastErr // Exhausted retries
}

// isRetryableError determines if an error should trigger a retry
func isRetryableError(err error) bool {
	if err == nil {
		return false
	}
	// Handle EOF and connection-related errors
	if errors.Is(err, io.EOF) {
		return true
	}
	if redisErr, ok := err.(redis.Error); ok {
		// Add other Redis-specific errors you consider retryable
		return redisErr.Timeout() || redisErr.String() == "connection reset by peer"
	}
	return false
}

// Example usage
func main() {
	// Configure cluster options
	options := &redis.ClusterOptions{
		Addrs:       []string{"redis-cluster:6379"},
		Password:    "",
		DialTimeout: 5 * time.Second,
		ReadTimeout: 3 * time.Second,
		PoolSize:    10,
		MinIdleConns: 2,
	}

	// Create custom client
	client := NewCustomClusterClient(options)
	defer client.Close()

	ctx := context.Background()

	// Example transaction with retry
	cmds, err := client.TxPipelineWithRetry(ctx, func(pipe *redis.TxPipe) error {
		pipe.Set(ctx, "key1", "value1", 0)
		pipe.Set(ctx, "key2", "value2", 0)
		pipe.Get(ctx, "key1")
		return nil
	})

	if err != nil {
		log.Printf("Transaction failed after retries: %v", err)
		return
	}

	// Process results
	for _, cmd := range cmds {
		switch cmd := cmd.(type) {
		case *redis.StatusCmd:
			log.Printf("Status: %s", cmd.Val())
		case *redis.StringCmd:
			log.Printf("Value: %s", cmd.Val())
		}
	}
}

// Enhanced processTxPipelineNodeConn equivalent (for reference)
func processTxPipelineNodeConn(ctx context.Context, conn *redis.Conn, cmds []redis.Cmder) error {
	pipe := conn.TxPipeline()

	// Write commands
	for _, cmd := range cmds {
		err := cmd.WriteTo(pipe)
		if err != nil {
			return err
		}
	}

	// Execute and read responses
	_, err := pipe.Exec(ctx)
	if err != nil {
		return err
	}

	// Read responses
	for i, cmd := range cmds {
		err = cmd.ReadFrom(conn)
		if err != nil {
			// Instead of just returning, we could mark failed commands
			// and let the retry logic handle it in TxPipelineWithRetry
			return err
		}
		cmds[i] = cmd
	}

	return nil
}
​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants