Skip to content

Commit 585a2d7

Browse files
committed
[patch] exported trigger type to be able to configure processor from external packages
[-] updated README with example
1 parent a33d970 commit 585a2d7

File tree

3 files changed

+114
-14
lines changed

3 files changed

+114
-14
lines changed

README.md

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ The same concept can also be extended to handle events processing. So, we have a
1818

1919
The processing of a single micro batch can be triggered in two ways, based on a time ticker or if the micro batch size is full. i.e. process a non empty batch if duration X has passed or if the batch size is full
2020

21-
## How to use nibbler?
22-
2321
### Config
2422

2523
```golang
@@ -44,7 +42,109 @@ type Config[T any] struct {
4442
}
4543
```
4644

47-
You can find usage details in the tests.
45+
## How to use nibbler?
46+
47+
```golang
48+
package main
49+
50+
import (
51+
"context"
52+
"fmt"
53+
"sync"
54+
"time"
55+
56+
"github.com/naughtygopher/nibbler"
57+
)
58+
59+
type db struct {
60+
data sync.Map
61+
totalBalance int
62+
}
63+
64+
func (d *db) BulkAddAccountsAndBalance(pp []AccStatement) error {
65+
// assume we are doing a bulk insert/update into the database instead of inserting one by one.
66+
// Bulk operations reduce the number of I/O required between your application and the database.
67+
// Thereby making it better in most cases.
68+
for _, p := range pp {
69+
d.data.Store(p.AccountID, p.Balance)
70+
d.totalBalance += p.Balance
71+
}
72+
return nil
73+
}
74+
75+
type Bank struct {
76+
db *db
77+
}
78+
79+
func (bnk *Bank) ProcessAccountsBatch(
80+
ctx context.Context,
81+
trigger nibbler.Trigger,
82+
batch []AccStatement,
83+
) error {
84+
err := bnk.db.BulkAddAccountsAndBalance(batch)
85+
if err != nil {
86+
return err
87+
}
88+
89+
return nil
90+
}
91+
92+
func (bnk *Bank) TotalBalance() int {
93+
return bnk.db.totalBalance
94+
}
95+
96+
func (bnk *Bank) TotalAccounts() int {
97+
counter := 0
98+
bnk.db.data.Range(func(key, value any) bool {
99+
counter++
100+
return true
101+
})
102+
return counter
103+
}
104+
105+
type AccStatement struct {
106+
AccountID string
107+
Balance int
108+
}
109+
110+
func main() {
111+
bnk := Bank{
112+
db: &db{
113+
data: sync.Map{},
114+
},
115+
}
116+
117+
nib, err := nibbler.Start(&nibbler.Config[AccStatement]{
118+
Size: 10,
119+
TickerDuration: time.Second,
120+
Processor: bnk.ProcessAccountsBatch,
121+
})
122+
if err != nil {
123+
panic(err)
124+
}
125+
126+
receiver := nib.Receiver()
127+
for i := range 100 {
128+
accID := fmt.Sprintf("account_id_%d", i)
129+
receiver <- AccStatement{
130+
AccountID: accID,
131+
Balance: 50000 / (i + 1),
132+
}
133+
}
134+
135+
// wait for batches to be processed. Ideally this wouldn't be required as our application
136+
// would not exit, instead just keep listening to the events stream.
137+
time.Sleep(time.Second)
138+
139+
fmt.Printf(
140+
"Number of accounts %d, total balance: %d\n",
141+
bnk.TotalAccounts(),
142+
bnk.TotalBalance(),
143+
)
144+
}
145+
```
146+
147+
You can find all usage details in the tests.
48148

49149
## The gopher
50150

nibbler.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ import (
1515

1616
var ErrValidation = errors.New("validation failed")
1717

18-
type trigger string
18+
type Trigger string
1919

2020
const (
21-
TriggerTicker trigger = "TICKER"
22-
TriggerFull trigger = "BATCH_FULL"
21+
TriggerTicker Trigger = "TICKER"
22+
TriggerFull Trigger = "BATCH_FULL"
2323
)
2424

25-
type BatchProcessor[T any] func(ctx context.Context, trigger trigger, batch []T) error
25+
type BatchProcessor[T any] func(ctx context.Context, trigger Trigger, batch []T) error
2626

2727
type Config[T any] struct {
2828
// ProcessingTimeout is context timeout for processing a single batch
@@ -88,7 +88,7 @@ func (bat *Nibbler[T]) panicRecovery(rec any, err error) error {
8888
return err
8989
}
9090

91-
func (bat *Nibbler[T]) processBatch(trigger trigger) (err error) {
91+
func (bat *Nibbler[T]) processBatch(trigger Trigger) (err error) {
9292
defer func() {
9393
err = bat.panicRecovery(recover(), err)
9494
}()

nibbler_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func TestNibbler(tt *testing.T) {
2828
Size: batchSize,
2929
ProcessingTimeout: time.Millisecond,
3030
TickerDuration: batchFreq,
31-
Processor: func(_ context.Context, _ trigger, batch []string) error {
31+
Processor: func(_ context.Context, _ Trigger, batch []string) error {
3232
gotchan <- slices.Clone(batch)
3333

3434
// last val is used to identify the end of all batches so that
@@ -99,7 +99,7 @@ func TestProcessorErr(tt *testing.T) {
9999

100100
nib, err := Start(&Config[string]{
101101
TickerDuration: time.Second,
102-
Processor: func(_ context.Context, _ trigger, _ []string) error {
102+
Processor: func(_ context.Context, _ Trigger, _ []string) error {
103103
return errProcessing
104104
},
105105
ProcessorErr: func(failedBatch []string, err error) {
@@ -128,7 +128,7 @@ func TestProcessorErr(tt *testing.T) {
128128
assertElement := "hello"
129129
nib, err := Start(&Config[string]{
130130
TickerDuration: time.Second,
131-
Processor: func(_ context.Context, _ trigger, _ []string) error {
131+
Processor: func(_ context.Context, _ Trigger, _ []string) error {
132132
return errProcessing
133133
},
134134
ProcessorErr: func(failedBatch []string, err error) {
@@ -160,7 +160,7 @@ func TestProcessorErr(tt *testing.T) {
160160

161161
nib, err := Start(&Config[string]{
162162
TickerDuration: time.Second,
163-
Processor: func(_ context.Context, _ trigger, _ []string) error {
163+
Processor: func(_ context.Context, _ Trigger, _ []string) error {
164164
panic(errProcessing)
165165
},
166166
ProcessorErr: func(failedBatch []string, err error) {
@@ -192,7 +192,7 @@ func TestProcessorErr(tt *testing.T) {
192192
assertElement := "hello"
193193
nib, err := Start(&Config[string]{
194194
TickerDuration: time.Second,
195-
Processor: func(_ context.Context, _ trigger, _ []string) error {
195+
Processor: func(_ context.Context, _ Trigger, _ []string) error {
196196
panic("processor panic")
197197
},
198198
ProcessorErr: func(failedBatch []string, err error) {
@@ -214,7 +214,7 @@ func TestProcessorErr(tt *testing.T) {
214214

215215
func TestSanitizeValidate(tt *testing.T) {
216216
var (
217-
processor = func(ctx context.Context, trigger trigger, batch []string) error { return nil }
217+
processor = func(ctx context.Context, trigger Trigger, batch []string) error { return nil }
218218
)
219219

220220
tt.Run("all valid", func(t *testing.T) {

0 commit comments

Comments
 (0)