Skip to content

Commit 7a5ddce

Browse files
committed
kgo bugfix: guard sink batch field access more
1.19.0 introduced access to `batch.tries` while appending a request (inside the batch mutex). For some reason, this was the one field in a batch that I accessed outside the batch mutex in `bumpRepeatedLoadErr`. That field has been moved inside the mutex. Closes #1024.
1 parent 604ad6e commit 7a5ddce

File tree

1 file changed

+7
-2
lines changed

1 file changed

+7
-2
lines changed

pkg/kgo/sink.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -835,7 +835,12 @@ func (s *sink) handleReqRespBatch(
835835
// Since we have received a response and we are the first batch, we can
836836
// at this point re-enable failing from load errors.
837837
//
838-
// We do not need a lock since the owner is locked.
838+
// We do not need the mutex lock on the batch. We already have the
839+
// recBuf mu (guarding most concurrency). The only place batch fields
840+
// are accessed & modified without the recBuf mu is when writing a
841+
// batch, and we only ever use a batch in inflight request at a time
842+
// (regardless of the partition being canceled or moving to a
843+
// different sink).
839844
batch.canFailFromLoadErrs = true
840845

841846
// By default, we assume we errored. Non-error updates this back
@@ -1475,12 +1480,12 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {
14751480
return
14761481
}
14771482
batch0 := recBuf.batches[0]
1478-
batch0.tries++
14791483

14801484
// We need to lock the batch as well because there could be a buffered
14811485
// request about to be written. Writing requests only grabs the batch
14821486
// mu, not the recBuf mu.
14831487
batch0.mu.Lock()
1488+
batch0.tries++
14841489
var (
14851490
canFail = !recBuf.cl.idempotent() || batch0.canFailFromLoadErrs // we can only fail if we are not idempotent or if we have no outstanding requests
14861491
batch0Fail = batch0.maybeFailErr(&recBuf.cl.cfg) != nil // timeout, retries, or aborting

0 commit comments

Comments
 (0)