Skip to content

Commit a46f7e6

Browse files
committed
fix review comments
Signed-off-by: jyjiangkai <[email protected]>
1 parent b4423f7 commit a46f7e6

File tree

7 files changed

+124
-96
lines changed

7 files changed

+124
-96
lines changed

client/internal/vanus/store/block_store.go

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import (
2828
"google.golang.org/grpc"
2929

3030
// first-party libraries
31-
"github.com/linkall-labs/vanus/client/pkg/codec"
31+
"github.com/linkall-labs/vanus/client/pkg/codec"
3232
"github.com/linkall-labs/vanus/observability/log"
3333
"github.com/linkall-labs/vanus/observability/tracing"
3434
"github.com/linkall-labs/vanus/pkg/errors"
@@ -371,15 +371,7 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
371371
append.releaseStream()
372372
// reset new stream connections
373373
s.connectAppendStream(ctx)
374-
c, _ := append.callbacks.LoadAndDelete(opaqueID)
375-
if c != nil {
376-
c.(appendCallback)(&segpb.AppendToBlockStreamResponse{
377-
Id: opaqueID,
378-
ResponseCode: errpb.ErrorCode_CLOSED,
379-
ResponseMsg: "append stream closed",
380-
Offsets: []int64{},
381-
})
382-
}
374+
append.callbacks.Delete(opaqueID)
383375
return nil, err
384376
}
385377

@@ -394,6 +386,8 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
394386
ResponseMsg: "append stream context canceled",
395387
Offsets: []int64{},
396388
})
389+
} else {
390+
<-donec
397391
}
398392
}
399393

@@ -489,17 +483,7 @@ func (s *BlockStore) ReadStream(
489483
})
490484
read.releaseStream()
491485
s.connectReadStream(ctx)
492-
c, _ := read.callbacks.LoadAndDelete(opaqueID)
493-
if c != nil {
494-
c.(readCallback)(&segpb.ReadFromBlockStreamResponse{
495-
Id: opaqueID,
496-
ResponseCode: errpb.ErrorCode_CLOSED,
497-
ResponseMsg: "read stream closed",
498-
Events: &cepb.CloudEventBatch{
499-
Events: []*cepb.CloudEvent{},
500-
},
501-
})
502-
}
486+
read.callbacks.Delete(opaqueID)
503487
return []*ce.Event{}, err
504488
}
505489

@@ -516,6 +500,8 @@ func (s *BlockStore) ReadStream(
516500
Events: []*cepb.CloudEvent{},
517501
},
518502
})
503+
} else {
504+
<-donec
519505
}
520506
}
521507

client/pkg/eventlog/eventlog.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,12 @@ import (
1919
// standard libraries.
2020
"context"
2121

22-
"github.com/linkall-labs/vanus/proto/pkg/cloudevents"
23-
2422
// third-party libraries.
2523
ce "github.com/cloudevents/sdk-go/v2"
2624

2725
// first-party libraries.
2826
"github.com/linkall-labs/vanus/client/pkg/api"
27+
"github.com/linkall-labs/vanus/proto/pkg/cloudevents"
2928
segpb "github.com/linkall-labs/vanus/proto/pkg/segment"
3029
// this project.
3130
)

internal/store/segment/api.go

Lines changed: 54 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ package segment
1717
import (
1818
// standard libraries.
1919
"context"
20+
stderr "errors"
21+
"io"
2022

2123
// third-party libraries.
2224

@@ -137,46 +139,59 @@ func (s *segmentServer) AppendToBlock(
137139
}
138140

139141
func (s *segmentServer) AppendToBlockStream(stream segpb.SegmentServer_AppendToBlockStreamServer) error {
140-
ctx := context.Background()
142+
ctx := stream.Context()
143+
exitc := make(chan struct{})
141144
for {
142-
request, err := stream.Recv()
143-
if err != nil {
144-
log.Error(ctx, "append stream recv failed", map[string]interface{}{
145-
log.KeyError: err,
146-
})
147-
return err
148-
}
149-
150-
callbackFunc := func(offsets []int64, err error) {
151-
errCode := errpb.ErrorCode_SUCCESS
152-
errMsg := "success"
145+
select {
146+
case <-exitc:
147+
return stderr.New("append stream send failed")
148+
default:
149+
request, err := stream.Recv()
153150
if err != nil {
154-
if errors.Is(err, errors.ErrFull) {
155-
errCode = err.(*errors.ErrorType).Code
156-
errMsg = err.(*errors.ErrorType).Message
157-
} else {
158-
errCode = errpb.ErrorCode_UNKNOWN
159-
errMsg = "unknown"
151+
if stderr.Is(err, io.EOF) {
152+
log.Warning(ctx, "append stream closed", map[string]interface{}{
153+
log.KeyError: err,
154+
})
155+
return nil
160156
}
161-
log.Error(ctx, "append to block failed", map[string]interface{}{
157+
log.Error(ctx, "append stream recv failed", map[string]interface{}{
162158
log.KeyError: err,
163159
})
160+
return err
164161
}
165162

166-
err = stream.Send(&segpb.AppendToBlockStreamResponse{
167-
Id: request.Id,
168-
ResponseCode: errCode,
169-
ResponseMsg: errMsg,
170-
Offsets: offsets,
171-
})
172-
if err != nil {
173-
log.Error(ctx, "read stream send failed", map[string]interface{}{
174-
log.KeyError: err,
163+
callbackFunc := func(offsets []int64, err error) {
164+
errCode := errpb.ErrorCode_SUCCESS
165+
errMsg := "success"
166+
if err != nil {
167+
if errors.Is(err, errors.ErrFull) {
168+
errCode = err.(*errors.ErrorType).Code
169+
errMsg = err.(*errors.ErrorType).Message
170+
} else {
171+
errCode = errpb.ErrorCode_APPEND_TO_BLOCK_FAILED
172+
errMsg = "append to block failed"
173+
}
174+
log.Error(ctx, "append to block failed", map[string]interface{}{
175+
log.KeyError: err,
176+
})
177+
}
178+
179+
err = stream.Send(&segpb.AppendToBlockStreamResponse{
180+
Id: request.Id,
181+
ResponseCode: errCode,
182+
ResponseMsg: errMsg,
183+
Offsets: offsets,
175184
})
176-
return
185+
if err != nil {
186+
log.Error(ctx, "append stream send failed", map[string]interface{}{
187+
log.KeyError: err,
188+
})
189+
close(exitc)
190+
return
191+
}
177192
}
193+
s.srv.AppendToBlock(ctx, vanus.ID(request.BlockId), request.Events.Events, callbackFunc)
178194
}
179-
s.srv.AppendToBlock(ctx, vanus.ID(request.BlockId), request.Events.Events, callbackFunc)
180195
}
181196
}
182197

@@ -195,10 +210,16 @@ func (s *segmentServer) ReadFromBlock(
195210
}
196211

197212
func (s *segmentServer) ReadFromBlockStream(stream segpb.SegmentServer_ReadFromBlockStreamServer) error {
198-
ctx := context.Background()
213+
ctx := stream.Context()
199214
for {
200215
request, err := stream.Recv()
201216
if err != nil {
217+
if stderr.Is(err, io.EOF) {
218+
log.Warning(ctx, "read stream closed", map[string]interface{}{
219+
log.KeyError: err,
220+
})
221+
return nil
222+
}
202223
log.Error(ctx, "read stream recv failed", map[string]interface{}{
203224
log.KeyError: err,
204225
})
@@ -211,8 +232,8 @@ func (s *segmentServer) ReadFromBlockStream(stream segpb.SegmentServer_ReadFromB
211232
events, err := s.srv.ReadFromBlock(
212233
ctx, blockID, request.Offset, int(request.Number), request.PollingTimeoutInMillisecond)
213234
if err != nil {
214-
errCode = errpb.ErrorCode_UNKNOWN
215-
errMsg = "unknown"
235+
errCode = errpb.ErrorCode_READ_FROM_BLOCK_FAILED
236+
errMsg = "read from block failed"
216237
log.Error(ctx, "read from block failed", map[string]interface{}{
217238
log.KeyError: err,
218239
})

internal/store/segment/server.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ const (
7272
debugModeENV = "SEGMENT_SERVER_DEBUG_MODE"
7373
defaultLeaderInfoBufferSize = 256
7474
defaultForceStopTimeout = 30 * time.Second
75-
defaultWaiterWorker = 8
75+
defaultCallbackWorker = 8
76+
defaultCallbackBufferSize = 32
7677
)
7778

7879
type Server interface {
@@ -111,7 +112,7 @@ func NewServer(cfg store.Config) Server {
111112
srv := &server{
112113
state: primitive.ServerStateCreated,
113114
cfg: cfg,
114-
callbackC: make(chan func()),
115+
callbackC: make(chan func(), defaultCallbackBufferSize),
115116
isDebugMode: debugModel,
116117
localAddress: localAddress,
117118
volumeID: uint64(cfg.Volume.ID),
@@ -218,7 +219,8 @@ func (s *server) preGrpcStream(ctx context.Context, info *tap.Info) (context.Con
218219
}
219220

220221
func (s *server) runCallbackWorkPool(ctx context.Context) {
221-
for i := 0; i < defaultWaiterWorker; i++ {
222+
// TODO(jiangkai): use dynamic work pool
223+
for i := 0; i < defaultCallbackWorker; i++ {
222224
go s.runCallbackWorker(ctx)
223225
}
224226
}
@@ -680,7 +682,9 @@ func (s *server) AppendToBlock(ctx context.Context, id vanus.ID, events []*cepb.
680682
if v, ok := s.replicas.Load(id); ok {
681683
b, _ = v.(Replica)
682684
} else {
683-
cb(nil, errors.ErrResourceNotFound.WithMessage("the block doesn't exist"))
685+
s.callbackC <- func() {
686+
cb(nil, errors.ErrResourceNotFound.WithMessage("the block doesn't exist"))
687+
}
684688
return
685689
}
686690

@@ -695,13 +699,18 @@ func (s *server) AppendToBlock(ctx context.Context, id vanus.ID, events []*cepb.
695699
metrics.WriteThroughputCounterVec.WithLabelValues(s.volumeIDStr, b.IDStr()).Add(float64(size))
696700

697701
b.Append(ctx, entries, func(seqs []int64, err error) {
698-
s.callbackC <- func() {
702+
f := func() {
699703
if err == nil {
700704
// TODO(weihe.yin) make this method deep to code
701705
s.pm.NewMessageArrived(id)
702706
}
703707
cb(seqs, s.processAppendError(ctx, b, err))
704708
}
709+
_, ok := <-s.closeC
710+
if !ok {
711+
f()
712+
}
713+
s.callbackC <- f
705714
})
706715
}
707716

pkg/errors/errors.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@ var (
2424

2525
// INVALID_REQUEST
2626
ErrInvalidRequest = New("invalid request").WithCode(errpb.ErrorCode_INVALID_REQUEST)
27-
ErrTransformInputParse = New("transform input invalid").WithCode(errpb.ErrorCode_TRANSFORM_INPUT_PARSE)
27+
ErrTransformInputParse = New("transform input invalid").WithCode(errpb.ErrorCode_PARSE_TRANSFORM_INPUT)
2828
ErrNoEndpoint = New("no endpoint").WithCode(errpb.ErrorCode_NO_ENDPOINT)
2929

3030
// INTERNAL
3131
ErrInternal = New("internal error").WithCode(errpb.ErrorCode_INTERNAL)
3232
ErrCorruptedEvent = New("corrupted event").WithCode(errpb.ErrorCode_CORRUPTED_EVENT)
33+
ErrAppendToBlock = New("append to block failed").WithCode(errpb.ErrorCode_APPEND_TO_BLOCK_FAILED)
34+
ErrReadFromBlock = New("read from block failed").WithCode(errpb.ErrorCode_READ_FROM_BLOCK_FAILED)
3335

3436
// FULL
3537
ErrFull = New("full").WithCode(errpb.ErrorCode_FULL)

proto/pkg/errors/errors.pb.go

Lines changed: 42 additions & 33 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)