Skip to content

Commit 7910f6b

Browse files
committed
kgo: retry connection reset by peer from ApiVersions to work around EventHubs
See commit and #1022 for more details. Closes #1022.
1 parent d310cab commit 7910f6b

File tree

2 files changed

+35
-11
lines changed

2 files changed

+35
-11
lines changed

pkg/kgo/broker.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,10 @@ func (b *broker) loadConnection(ctx context.Context, req kmsg.Request) (*brokerC
550550
return *pcxn, nil
551551
}
552552

553+
var tries int
553554
start := time.Now()
555+
doConnect:
556+
tries++
554557
conn, err := b.connect(ctx)
555558
defer func() {
556559
since := time.Since(start)
@@ -572,7 +575,17 @@ func (b *broker) loadConnection(ctx context.Context, req kmsg.Request) (*brokerC
572575
conn: conn,
573576
deadCh: make(chan struct{}),
574577
}
575-
if err = cxn.init(isProduceCxn); err != nil {
578+
if err = cxn.init(isProduceCxn, tries); err != nil {
579+
// EventHubs does not handle v4 and resets the connection. We
580+
// retry twice. On the first and second attempt, we try our max
581+
// version possible (as should be allowed). On the third try,
582+
// we downgrade to v0.
583+
if er := (*errApiVersionsReset)(nil); errors.As(err, &er) {
584+
if tries < 3 {
585+
tries++
586+
goto doConnect
587+
}
588+
}
576589
b.cl.cfg.logger.Log(LogLevelDebug, "connection initialization failed", "addr", b.addr, "broker", logID(b.meta.NodeID), "err", err)
577590
cxn.closeConn()
578591
return nil, err
@@ -719,11 +732,11 @@ type brokerCxn struct {
719732
deadCh chan struct{}
720733
}
721734

722-
func (cxn *brokerCxn) init(isProduceCxn bool) error {
735+
func (cxn *brokerCxn) init(isProduceCxn bool, tries int) error {
723736
hasVersions := cxn.b.loadVersions() != nil
724737
if !hasVersions {
725738
if cxn.b.cl.cfg.maxVersions == nil || cxn.b.cl.cfg.maxVersions.HasKey(18) {
726-
if err := cxn.requestAPIVersions(); err != nil {
739+
if err := cxn.requestAPIVersions(tries); err != nil {
727740
if !errors.Is(err, ErrClientClosed) && !isRetryableBrokerErr(err) {
728741
cxn.cl.cfg.logger.Log(LogLevelError, "unable to request api versions", "broker", logID(cxn.b.meta.NodeID), "err", err)
729742
}
@@ -749,14 +762,15 @@ func (cxn *brokerCxn) init(isProduceCxn bool) error {
749762
return nil
750763
}
751764

752-
func (cxn *brokerCxn) requestAPIVersions() error {
753-
maxVersion := int16(3)
754-
755-
// If the user configured a max versions, we check that the key exists
756-
// before entering this function. Thus, we expect exists to be true,
757-
// but we still doubly check it for sanity (as well as userMax, which
758-
// can only be non-negative based off of LookupMaxKeyVersion's API).
759-
if cxn.cl.cfg.maxVersions != nil {
765+
func (cxn *brokerCxn) requestAPIVersions(tries int) error {
766+
maxVersion := int16(4)
767+
if tries >= 3 { // on the third try, we pin to v0; see above in cxn initialization
768+
maxVersion = 0
769+
} else if cxn.cl.cfg.maxVersions != nil {
770+
// If the user configured a max versions, we check that the key exists
771+
// before entering this function. Thus, we expect exists to be true,
772+
// but we still doubly check it for sanity (as well as userMax, which
773+
// can only be non-negative based off of LookupMaxKeyVersion's API).
760774
userMax, exists := cxn.cl.cfg.maxVersions.LookupMaxKeyVersion(18) // 18 == api versions
761775
if exists && userMax >= 0 {
762776
maxVersion = userMax
@@ -779,6 +793,9 @@ start:
779793
// api versions does *not* use flexible response headers; see comment in promisedResp
780794
rawResp, err := cxn.readResponse(nil, req.Key(), req.GetVersion(), corrID, false, rt, bytesWritten, writeWait, timeToWrite, readEnqueue)
781795
if err != nil {
796+
if strings.HasSuffix(err.Error(), "connection reset by peer") {
797+
return &errApiVersionsReset{err}
798+
}
782799
return err
783800
}
784801
if len(rawResp) < 2 {

pkg/kgo/errors.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,3 +369,10 @@ func errCodeMessage(code int16, errMessage *string) error {
369369
}
370370
return nil
371371
}
372+
373+
type errApiVersionsReset struct {
374+
err error
375+
}
376+
377+
func (e *errApiVersionsReset) Error() string { return e.err.Error() }
378+
func (e *errApiVersionsReset) Unwrap() error { return e.err }

0 commit comments

Comments
 (0)