Skip to content

Commit 83363a1

Browse files
committed
revert tryagain error
Signed-off-by: jyjiangkai <[email protected]>
1 parent 2278c46 commit 83363a1

File tree

19 files changed

+417
-274
lines changed

19 files changed

+417
-274
lines changed

client/internal/vanus/store/block_store.go

Lines changed: 251 additions & 120 deletions
Large diffs are not rendered by default.

client/pkg/eventbus/eventbus.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,23 @@ import (
2323
"io"
2424
"sync"
2525

26-
"github.com/linkall-labs/vanus/observability/tracing"
2726
"go.opentelemetry.io/otel/trace"
2827

2928
// third-party libraries.
3029
ce "github.com/cloudevents/sdk-go/v2"
3130
"github.com/scylladb/go-set/u64set"
3231

33-
// this project.
34-
"github.com/linkall-labs/vanus/client/pkg/api"
35-
"github.com/linkall-labs/vanus/client/pkg/eventlog"
36-
"github.com/linkall-labs/vanus/client/pkg/policy"
32+
// first-party libraries
3733
vlog "github.com/linkall-labs/vanus/observability/log"
34+
"github.com/linkall-labs/vanus/observability/tracing"
3835
"github.com/linkall-labs/vanus/pkg/errors"
3936

37+
// this project.
4038
eb "github.com/linkall-labs/vanus/client/internal/vanus/eventbus"
4139
el "github.com/linkall-labs/vanus/client/internal/vanus/eventlog"
40+
"github.com/linkall-labs/vanus/client/pkg/api"
41+
"github.com/linkall-labs/vanus/client/pkg/eventlog"
42+
"github.com/linkall-labs/vanus/client/pkg/policy"
4243
)
4344

4445
func NewEventbus(cfg *eb.Config) *eventbus {

client/pkg/eventlog/eventlog_impl.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,9 @@ func (r *logReader) Read(ctx context.Context, size int16) ([]*ce.Event, error) {
482482
if err != nil {
483483
if errors.Is(err, errors.ErrOffsetOverflow) {
484484
r.elog.refreshReadableSegments(ctx)
485-
r.switchSegment(ctx)
485+
if r.switchSegment(ctx) {
486+
return nil, errors.ErrTryAgain
487+
}
486488
}
487489
return nil, err
488490
}
@@ -505,7 +507,9 @@ func (r *logReader) ReadStream(ctx context.Context, size int16) ([]*ce.Event, er
505507
if err != nil {
506508
if errors.Is(err, errors.ErrOffsetOverflow) {
507509
r.elog.refreshReadableSegments(ctx)
508-
r.switchSegment(ctx)
510+
if r.switchSegment(ctx) {
511+
return nil, errors.ErrTryAgain
512+
}
509513
}
510514
return nil, err
511515
}
@@ -522,10 +526,7 @@ func (r *logReader) selectReadableSegment(ctx context.Context) (*segment, error)
522526
segment := func() *segment {
523527
r.mu.RLock()
524528
defer r.mu.RUnlock()
525-
if r.cur != nil {
526-
return r.cur
527-
}
528-
return nil
529+
return r.cur
529530
}()
530531

531532
if segment == nil {

client/pkg/eventlog/log_segment.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func (s *segment) Append(ctx context.Context, event *ce.Event) (int64, error) {
162162

163163
b := s.preferSegmentBlock()
164164
if b == nil {
165-
return -1, errors.ErrNotLeader.WithMessage("the block is not leader")
165+
return -1, errors.ErrNoLeader.WithMessage("block no leader")
166166
}
167167
off, err := b.Append(_ctx, event)
168168
if err != nil {
@@ -177,7 +177,7 @@ func (s *segment) AppendManyStream(ctx context.Context, events []*ce.Event) ([]i
177177

178178
b := s.preferSegmentBlock()
179179
if b == nil {
180-
return nil, errors.ErrNotLeader.WithMessage("the block is not leader")
180+
return nil, errors.ErrNoLeader.WithMessage("block no leader")
181181
}
182182
offsets, err := b.AppendManyStream(_ctx, events)
183183
if err != nil {

internal/store/block/raft/appender.go

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,13 @@ type peer struct {
6767

6868
type LeaderChangedListener func(block, leader vanus.ID, term uint64)
6969

70-
type CommitWaiter struct {
71-
seqs []int64
70+
type commitWaiter struct {
7271
offset int64
72+
seqs []int64
7373
err error
7474
callback func([]int64, error)
7575
}
7676

77-
func (w CommitWaiter) Do() {
78-
w.callback(w.seqs, w.err)
79-
}
80-
8177
type Appender interface {
8278
block.Appender
8379

@@ -92,8 +88,8 @@ type appender struct {
9288
actx block.AppendContext
9389
appendMu sync.RWMutex
9490

95-
waiters []CommitWaiter
96-
waiterC chan CommitWaiter
91+
waiters []commitWaiter
92+
callbackC chan func()
9793
commitIndex uint64
9894
commitOffset int64
9995
waitMu sync.Mutex
@@ -122,21 +118,21 @@ func NewAppender(
122118
raftLog *raftlog.Log,
123119
host transport.Host,
124120
listener LeaderChangedListener,
125-
waiterC chan CommitWaiter,
121+
callbackC chan func(),
126122
) Appender {
127123
ctx, cancel := context.WithCancel(ctx)
128124

129125
a := &appender{
130-
raw: raw,
131-
waiters: make([]CommitWaiter, 0),
132-
waiterC: waiterC,
133-
listener: listener,
134-
log: raftLog,
135-
host: host,
136-
hint: make([]peer, 0, defaultHintCapacity),
137-
cancel: cancel,
138-
doneC: make(chan struct{}),
139-
tracer: tracing.NewTracer("store.block.raft.appender", trace.SpanKindInternal),
126+
raw: raw,
127+
waiters: make([]commitWaiter, 0),
128+
callbackC: callbackC,
129+
listener: listener,
130+
log: raftLog,
131+
host: host,
132+
hint: make([]peer, 0, defaultHintCapacity),
133+
cancel: cancel,
134+
doneC: make(chan struct{}),
135+
tracer: tracing.NewTracer("store.block.raft.appender", trace.SpanKindInternal),
140136
}
141137
a.actx = a.raw.NewAppendContext(nil)
142138
a.commitOffset = a.actx.WriteOffset()
@@ -489,9 +485,9 @@ func (a *appender) Append(ctx context.Context, entries []block.Entry, cb func([]
489485
}
490486

491487
// register callback and wait until entries is committed.
492-
a.registerCommitWaiter(ctx, CommitWaiter{
493-
seqs: seqs,
488+
a.registerCommitWaiter(ctx, commitWaiter{
494489
offset: offset,
490+
seqs: seqs,
495491
err: err,
496492
callback: cb,
497493
})
@@ -538,20 +534,23 @@ func (a *appender) doAppend(ctx context.Context, frags ...block.Fragment) {
538534
_, _ = a.raw.CommitAppend(ctx, frags...)
539535
}
540536

541-
func (a *appender) registerCommitWaiter(ctx context.Context, waiter CommitWaiter) {
537+
func (a *appender) registerCommitWaiter(ctx context.Context, waiter commitWaiter) {
542538
_, span := a.tracer.Start(ctx, "waitCommit")
543539
defer span.End()
544540

545541
span.AddEvent("Acquiring wait lock")
546542
a.waitMu.Lock()
547-
defer a.waitMu.Unlock()
548543
span.AddEvent("Got wait lock")
549544

550545
if waiter.offset <= a.commitOffset {
551-
waiter.callback(waiter.seqs, waiter.err)
546+
a.waitMu.Unlock()
547+
a.callbackC <- func() {
548+
waiter.callback(waiter.seqs, waiter.err)
549+
}
552550
return
553551
}
554552
a.waiters = append(a.waiters, waiter)
553+
a.waitMu.Unlock()
555554
}
556555

557556
func (a *appender) doWakeup(ctx context.Context, commit int64) {
@@ -569,7 +568,9 @@ func (a *appender) doWakeup(ctx context.Context, commit int64) {
569568
if waiter.offset > commit {
570569
break
571570
}
572-
a.waiterC <- waiter
571+
a.callbackC <- func() {
572+
waiter.callback(waiter.seqs, waiter.err)
573+
}
573574
a.waiters = a.waiters[1:]
574575
}
575576
a.commitOffset = commit

internal/store/segment/api.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func (s *segmentServer) AppendToBlock(
130130
s.srv.AppendToBlock(ctx, blockID, events, func(offs []int64, e error) {
131131
offsets = offs
132132
err = e
133-
donec <- struct{}{}
133+
close(donec)
134134
})
135135
<-donec
136136
return &segpb.AppendToBlockResponse{Offsets: offsets}, err
@@ -163,10 +163,8 @@ func (s *segmentServer) AppendToBlockStream(stream segpb.SegmentServer_AppendToB
163163
})
164164
}
165165

166-
s.srv.NewMessageArrived(ctx, vanus.ID(request.BlockId))
167-
168166
err = stream.Send(&segpb.AppendToBlockStreamResponse{
169-
ResponseId: request.RequestId,
167+
Id: request.Id,
170168
ResponseCode: errCode,
171169
ResponseMsg: errMsg,
172170
Offsets: offsets,
@@ -221,7 +219,7 @@ func (s *segmentServer) ReadFromBlockStream(stream segpb.SegmentServer_ReadFromB
221219
}
222220

223221
err = stream.Send(&segpb.ReadFromBlockStreamResponse{
224-
ResponseId: request.RequestId,
222+
Id: request.Id,
225223
ResponseCode: errCode,
226224
ResponseMsg: errMsg,
227225
Events: &cepb.CloudEventBatch{Events: events},

internal/store/segment/recovery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (s *server) recoverBlocks(ctx context.Context, logs map[vanus.ID]*raftlog.L
7878
return err
7979
}
8080
}
81-
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged, s.waiterC)
81+
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged, s.callbackC)
8282
s.replicas.Store(id, &replica{
8383
id: id,
8484
idStr: id.String(),

internal/store/segment/replica.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func (s *server) createBlock(ctx context.Context, id vanus.ID, size int64) (Repl
116116

117117
// Create replica.
118118
l := raftlog.NewLog(id, s.wal, s.metaStore, s.offsetStore, nil)
119-
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged, s.waiterC)
119+
a := raft.NewAppender(context.TODO(), r, l, s.host, s.leaderChanged, s.callbackC)
120120

121121
return &replica{
122122
id: id,

internal/store/segment/server.go

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,6 @@ type Server interface {
9393
AppendToBlock(ctx context.Context, id vanus.ID, events []*cepb.CloudEvent, cb func([]int64, error))
9494
ReadFromBlock(ctx context.Context, id vanus.ID, seq int64, num int, pollingTimeout uint32) ([]*cepb.CloudEvent, error)
9595
LookupOffsetInBlock(ctx context.Context, id vanus.ID, stime int64) (int64, error)
96-
97-
NewMessageArrived(ctx context.Context, id vanus.ID)
9896
}
9997

10098
func NewServer(cfg store.Config) Server {
@@ -112,7 +110,7 @@ func NewServer(cfg store.Config) Server {
112110
srv := &server{
113111
state: primitive.ServerStateCreated,
114112
cfg: cfg,
115-
waiterC: make(chan raft.CommitWaiter),
113+
callbackC: make(chan func()),
116114
isDebugMode: debugModel,
117115
localAddress: localAddress,
118116
volumeID: uint64(cfg.Volume.ID),
@@ -163,7 +161,7 @@ type server struct {
163161
ctrl cluster.Cluster
164162
cc ctrlpb.SegmentControllerClient
165163
leaderC chan leaderInfo
166-
waiterC chan raft.CommitWaiter
164+
callbackC chan func()
167165

168166
grpcSrv *grpc.Server
169167
closeC chan struct{}
@@ -218,19 +216,20 @@ func (s *server) preGrpcStream(ctx context.Context, info *tap.Info) (context.Con
218216
return ctx, nil
219217
}
220218

221-
func (s *server) runWaiterWorker(ctx context.Context) {
219+
func (s *server) runCallbackWorkPool(ctx context.Context) {
222220
for i := 0; i < defaultWaiterWorker; i++ {
223-
go func() {
224-
for {
225-
select {
226-
case waiter := <-s.waiterC:
227-
waiter.Do()
228-
case <-ctx.Done():
229-
close(s.waiterC)
230-
return
231-
}
232-
}
233-
}()
221+
go s.runCallbackWorker(ctx)
222+
}
223+
}
224+
225+
func (s *server) runCallbackWorker(_ context.Context) {
226+
for {
227+
select {
228+
case cb := <-s.callbackC:
229+
cb()
230+
case <-s.closeC:
231+
return
232+
}
234233
}
235234
}
236235

@@ -265,7 +264,7 @@ func (s *server) Initialize(ctx context.Context) error {
265264
s.state = primitive.ServerStateRunning
266265
}
267266

268-
s.runWaiterWorker(ctx)
267+
s.runCallbackWorkPool(ctx)
269268

270269
return nil
271270
}
@@ -688,14 +687,12 @@ func (s *server) AppendToBlock(ctx context.Context, id vanus.ID, events []*cepb.
688687
metrics.WriteTPSCounterVec.WithLabelValues(s.volumeIDStr, b.IDStr()).Add(float64(len(events)))
689688
metrics.WriteThroughputCounterVec.WithLabelValues(s.volumeIDStr, b.IDStr()).Add(float64(size))
690689

691-
b.Append(ctx, entries, cb)
692-
693-
// TODO(weihe.yin) make this method deep to code
694-
// s.pm.NewMessageArrived(id)
695-
}
696-
697-
func (s *server) NewMessageArrived(_ context.Context, id vanus.ID) {
698-
s.pm.NewMessageArrived(id)
690+
b.Append(ctx, entries, func(offsets []int64, err error) {
691+
if err == nil {
692+
s.pm.NewMessageArrived(id)
693+
}
694+
cb(offsets, err)
695+
})
699696
}
700697

701698
func (s *server) onBlockArchived(stat block.Statistics) {

internal/timer/timingwheel/bucket.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func (b *bucket) run(ctx context.Context) {
199199
// batch read
200200
events, err := b.getEvent(ctx, defaultNumberOfEventsRead)
201201
if err != nil {
202-
if !errors.Is(err, errors.ErrOffsetOnEnd) {
202+
if !errors.Is(err, errors.ErrOffsetOnEnd) && !errors.Is(err, errors.ErrTryAgain) {
203203
log.Error(ctx, "get event failed when bucket running", map[string]interface{}{
204204
"eventbus": b.getEventbus(),
205205
log.KeyError: err,

internal/trigger/reader/reader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ func (elReader *eventLogReader) run(ctx context.Context) {
286286
for {
287287
err = elReader.readEvent(ctx, lr)
288288
switch {
289-
case err == nil, errors.Is(err, errors.ErrOffsetOnEnd), errors.Is(err, errors.ErrOffsetOverflow):
289+
case err == nil, errors.Is(err, errors.ErrOffsetOnEnd), errors.Is(err, errors.ErrTryAgain):
290290
case stderr.Is(err, context.Canceled):
291291
return
292292
case errors.Is(err, errors.ErrOffsetUnderflow):

pkg/cluster/raw_client/segment_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestCtrlClientIsNeedRetry(t *testing.T) {
4141
err = errors.New("xxxxx: 1111111111 ")
4242
So(isNeedRetry(err), ShouldBeFalse)
4343

44-
err = errors.New("balabala, please connect to: 127.0.0.1:2048 ").WithGRPCCode(errpb.ErrorCode_NOT_LEADER)
44+
err = errors.New("balabala, please connect to: 127.0.0.1:2048 ").WithCode(errpb.ErrorCode_NOT_LEADER)
4545
So(isNeedRetry(err), ShouldBeTrue)
4646
})
4747
})

pkg/errors/app.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type ErrorType struct {
4242
underlayErrors []error
4343
}
4444

45-
func (e *ErrorType) WithGRPCCode(c errpb.ErrorCode) *ErrorType {
45+
func (e *ErrorType) WithCode(c errpb.ErrorCode) *ErrorType {
4646
_e := e.copy()
4747
_e.Code = c
4848
return _e

0 commit comments

Comments
 (0)