Skip to content

Commit 02cb37d

Browse files
authored
feat(fxgcppubsub): Added retryable client factory (#33)
* feat(fxgcppubsub): Added retryable client factory * feat(fxgcppubsub): Added retryable client factory * feat(fxgcppubsub): Added retryable client factory * feat(fxgcppubsub): Added retryable client factory * feat(fxgcppubsub): Added retryable client factory
1 parent c27dc15 commit 02cb37d

19 files changed

+244
-24
lines changed

fxgcppubsub/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ modules:
6666
gcppubsub:
6767
project:
6868
id: ${GCP_PROJECT_ID} # GCP project id
69+
factory:
70+
attempts: 3 # number of attempts to perform to create the pubsub client, disabled by default
71+
interval: 1 # duration in seconds to wait between each pubsub client creation attempt
6972
healthcheck:
7073
topics: # list of topics to check for the topics probe
7174
- some-topic # refers to projects/${GCP_PROJECT_ID}/topics/some-topic

fxgcppubsub/client/factory.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"cloud.google.com/go/pubsub"
9+
"github.com/ankorstore/yokai/config"
10+
"github.com/ankorstore/yokai/log"
11+
"google.golang.org/api/option"
12+
)
13+
14+
var _ ClientFactory = (*DefaultClientFactory)(nil)
15+
16+
type ClientFactory interface {
17+
Create(ctx context.Context, projectID string, opts ...option.ClientOption) (*pubsub.Client, error)
18+
}
19+
20+
type DefaultClientFactory struct {
21+
config *config.Config
22+
logger *log.Logger
23+
}
24+
25+
func NewDefaultClientFactory(config *config.Config, logger *log.Logger) *DefaultClientFactory {
26+
return &DefaultClientFactory{
27+
config: config,
28+
logger: logger,
29+
}
30+
}
31+
32+
func (f *DefaultClientFactory) Create(ctx context.Context, projectID string, opts ...option.ClientOption) (*pubsub.Client, error) {
33+
attempts := f.config.GetInt("modules.gcppubsub.factory.attempts")
34+
interval := time.Duration(f.config.GetInt("modules.gcppubsub.factory.interval")) * time.Second
35+
36+
for attempt := 0; attempt <= attempts; attempt++ {
37+
client, err := pubsub.NewClient(ctx, projectID, opts...)
38+
if err == nil {
39+
f.logger.
40+
Debug().
41+
Int("attempt", attempt+1).
42+
Msg("pubsub client creation success")
43+
44+
return client, nil
45+
}
46+
47+
if attempt < attempts {
48+
f.logger.
49+
Warn().
50+
Err(err).
51+
Int("attempt", attempt+1).
52+
Msgf("pubsub client creation error, attempting again in %d seconds", int(interval.Seconds()))
53+
54+
time.Sleep(interval)
55+
}
56+
}
57+
58+
return nil, fmt.Errorf("pubsub client creation error after %d attempts", attempts)
59+
}

fxgcppubsub/client/factory_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package client_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"cloud.google.com/go/pubsub/pstest"
8+
"github.com/ankorstore/yokai-contrib/fxgcppubsub/client"
9+
"github.com/ankorstore/yokai/config"
10+
"github.com/ankorstore/yokai/log"
11+
"github.com/ankorstore/yokai/log/logtest"
12+
"github.com/rs/zerolog"
13+
"github.com/stretchr/testify/assert"
14+
"google.golang.org/api/option"
15+
"google.golang.org/grpc"
16+
"google.golang.org/grpc/credentials/insecure"
17+
)
18+
19+
func TestDefaultClientFactory(t *testing.T) {
20+
t.Parallel()
21+
22+
createFactory := func(tb testing.TB) (*client.DefaultClientFactory, logtest.TestLogBuffer) {
23+
tb.Helper()
24+
25+
cfg, err := config.NewDefaultConfigFactory().Create(config.WithFilePaths("../testdata/config"))
26+
assert.NoError(tb, err)
27+
28+
logBuffer := logtest.NewDefaultTestLogBuffer()
29+
logger, err := log.NewDefaultLoggerFactory().Create(
30+
log.WithLevel(zerolog.DebugLevel),
31+
log.WithOutputWriter(logBuffer),
32+
)
33+
assert.NoError(tb, err)
34+
35+
return client.NewDefaultClientFactory(cfg, logger), logBuffer
36+
}
37+
38+
t.Run("creation success", func(t *testing.T) {
39+
t.Parallel()
40+
41+
factory, logBuffer := createFactory(t)
42+
43+
server := pstest.NewServer()
44+
defer server.Close()
45+
46+
psClient, psErr := factory.Create(
47+
context.Background(),
48+
"test-project",
49+
option.WithEndpoint(server.Addr),
50+
option.WithoutAuthentication(),
51+
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
52+
)
53+
54+
assert.NoError(t, psErr)
55+
assert.NotNil(t, psClient)
56+
57+
logtest.AssertHasLogRecord(t, logBuffer, map[string]interface{}{
58+
"level": "debug",
59+
"attempt": 1,
60+
"message": "pubsub client creation success",
61+
})
62+
})
63+
64+
t.Run("creation error with retries", func(t *testing.T) {
65+
t.Parallel()
66+
67+
factory, logBuffer := createFactory(t)
68+
69+
server := pstest.NewServer()
70+
defer server.Close()
71+
72+
psClient, psErr := factory.Create(
73+
context.Background(),
74+
"test-project",
75+
option.WithEndpoint(server.Addr),
76+
// option.WithoutAuthentication(), <- removed to make the client fail
77+
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
78+
)
79+
80+
assert.Error(t, psErr)
81+
assert.Equal(t, "pubsub client creation error after 3 attempts", psErr.Error())
82+
assert.Nil(t, psClient)
83+
84+
logtest.AssertContainLogRecord(t, logBuffer, map[string]interface{}{
85+
"level": "warn",
86+
"attempt": 1,
87+
"message": "pubsub client creation error, attempting again in 1 seconds",
88+
})
89+
90+
logtest.AssertContainLogRecord(t, logBuffer, map[string]interface{}{
91+
"level": "warn",
92+
"attempt": 2,
93+
"message": "pubsub client creation error, attempting again in 1 seconds",
94+
})
95+
96+
logtest.AssertContainLogRecord(t, logBuffer, map[string]interface{}{
97+
"level": "warn",
98+
"attempt": 3,
99+
"message": "pubsub client creation error, attempting again in 1 seconds",
100+
})
101+
})
102+
}

fxgcppubsub/go.mod

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,16 @@ go 1.20
44

55
require (
66
cloud.google.com/go/pubsub v1.40.0
7-
github.com/ankorstore/yokai/config v1.3.0
8-
github.com/ankorstore/yokai/fxconfig v1.1.0
7+
github.com/ankorstore/yokai/config v1.5.0
8+
github.com/ankorstore/yokai/fxconfig v1.3.0
9+
github.com/ankorstore/yokai/fxlog v1.1.0
910
github.com/ankorstore/yokai/healthcheck v1.1.0
1011
github.com/ankorstore/yokai/log v1.2.0
1112
github.com/hamba/avro/v2 v2.22.1
1213
github.com/linkedin/goavro/v2 v2.13.0
1314
github.com/rs/zerolog v1.32.0
1415
github.com/stretchr/testify v1.9.0
15-
go.uber.org/fx v1.22.0
16+
go.uber.org/fx v1.22.2
1617
google.golang.org/api v0.186.0
1718
google.golang.org/grpc v1.64.0
1819
google.golang.org/protobuf v1.34.2
@@ -45,7 +46,7 @@ require (
4546
github.com/mitchellh/mapstructure v1.5.0 // indirect
4647
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
4748
github.com/modern-go/reflect2 v1.0.2 // indirect
48-
github.com/pelletier/go-toml/v2 v2.1.1 // indirect
49+
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
4950
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
5051
github.com/rogpeppe/go-internal v1.12.0 // indirect
5152
github.com/sagikazarmark/locafero v0.4.0 // indirect
@@ -54,7 +55,7 @@ require (
5455
github.com/spf13/afero v1.11.0 // indirect
5556
github.com/spf13/cast v1.6.0 // indirect
5657
github.com/spf13/pflag v1.0.5 // indirect
57-
github.com/spf13/viper v1.18.2 // indirect
58+
github.com/spf13/viper v1.19.0 // indirect
5859
github.com/stretchr/objx v0.5.2 // indirect
5960
github.com/subosito/gotenv v1.6.0 // indirect
6061
go.einride.tech/aip v0.67.1 // indirect
@@ -65,7 +66,7 @@ require (
6566
go.opentelemetry.io/otel/metric v1.24.0 // indirect
6667
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
6768
go.opentelemetry.io/otel/trace v1.24.0 // indirect
68-
go.uber.org/dig v1.17.1 // indirect
69+
go.uber.org/dig v1.18.0 // indirect
6970
go.uber.org/multierr v1.11.0 // indirect
7071
go.uber.org/zap v1.27.0 // indirect
7172
golang.org/x/crypto v0.24.0 // indirect

fxgcppubsub/go.sum

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@ cloud.google.com/go/longrunning v0.5.7 h1:WLbHekDbjK1fVFD3ibpFFVoyizlLRl73I7YKuA
1414
cloud.google.com/go/pubsub v1.40.0 h1:0LdP+zj5XaPAGtWr2V6r88VXJlmtaB/+fde1q3TU8M0=
1515
cloud.google.com/go/pubsub v1.40.0/go.mod h1:BVJI4sI2FyXp36KFKvFwcfDRDfR8MiLT8mMhmIhdAeA=
1616
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
17-
github.com/ankorstore/yokai/config v1.3.0 h1:si2h4mESPN5pj14CBMT/VGFgFn0voKEVylr8hQeIgEk=
18-
github.com/ankorstore/yokai/config v1.3.0/go.mod h1:OV2QiL2dyNLCxhcGO+GcSa8Wm20+00H03VBHm9SPVuE=
19-
github.com/ankorstore/yokai/fxconfig v1.1.0 h1:QgRDrZPpSy4wlnzNN37sWniRRAszerBb6WpvMa3hTB0=
20-
github.com/ankorstore/yokai/fxconfig v1.1.0/go.mod h1:dU8W3eJtioegWEB7X5C+B40Ud+M+vRa5d2UdbAJr9Os=
17+
github.com/ankorstore/yokai/config v1.5.0 h1:vL/l0dcnq34FtxE+Up1NvzgcRB0G/vI4Yo/H5PccfN0=
18+
github.com/ankorstore/yokai/config v1.5.0/go.mod h1:C8ggYvcrG+J0Ra2vTtcDCANa8HMf3FdrC0Ek8o3tTEw=
19+
github.com/ankorstore/yokai/fxconfig v1.3.0 h1:kk+RkpgECjZYciN2E3lnVj1dpewRy54JN7k8zErpX88=
20+
github.com/ankorstore/yokai/fxconfig v1.3.0/go.mod h1:NTF2TbT+xZNEzI/iTCQLtY+oS/AJSDAPAqouPgAYzbE=
21+
github.com/ankorstore/yokai/fxlog v1.1.0 h1:vLI8Qd9KfCzAH9IvzGJTvFYmlE1jtMnjvA4z/vxJpYg=
22+
github.com/ankorstore/yokai/fxlog v1.1.0/go.mod h1:VHlj/FNGAuLNqTyRCCx3iGUi9IZXv7qVNrDLUQng1cE=
2123
github.com/ankorstore/yokai/healthcheck v1.1.0 h1:PXkEccym7iaVnQltpM5UFi0Xl0n+5rZDzlQju6HmGms=
2224
github.com/ankorstore/yokai/healthcheck v1.1.0/go.mod h1:IiYgjRa4G3OLZMwAuacuryZZAfDHsBH8PQoK4PgRdZ4=
2325
github.com/ankorstore/yokai/log v1.2.0 h1:jiuDiC0dtqIGIOsFQslUHYoFJ1qjI+rOMa6dI1LBf2Y=
@@ -110,8 +112,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
110112
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
111113
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
112114
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
113-
github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOStowAI=
114-
github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
115+
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
116+
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
115117
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
116118
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
117119
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
@@ -134,8 +136,8 @@ github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0=
134136
github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
135137
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
136138
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
137-
github.com/spf13/viper v1.18.2 h1:LUXCnvUvSM6FXAsj6nnfc8Q2tp1dIgUfY9Kc8GsSOiQ=
138-
github.com/spf13/viper v1.18.2/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMVB+yk=
139+
github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI=
140+
github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg=
139141
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
140142
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
141143
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
@@ -167,10 +169,10 @@ go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucg
167169
go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg=
168170
go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI=
169171
go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU=
170-
go.uber.org/dig v1.17.1 h1:Tga8Lz8PcYNsWsyHMZ1Vm0OQOUaJNDyvPImgbAu9YSc=
171-
go.uber.org/dig v1.17.1/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE=
172-
go.uber.org/fx v1.22.0 h1:pApUK7yL0OUHMd8vkunWSlLxZVFFk70jR2nKde8X2NM=
173-
go.uber.org/fx v1.22.0/go.mod h1:HT2M7d7RHo+ebKGh9NRcrsrHHfpZ60nW3QRubMRfv48=
172+
go.uber.org/dig v1.18.0 h1:imUL1UiY0Mg4bqbFfsRQO5G4CGRBec/ZujWTvSVp3pw=
173+
go.uber.org/dig v1.18.0/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE=
174+
go.uber.org/fx v1.22.2 h1:iPW+OPxv0G8w75OemJ1RAnTUrF55zOJlXlo1TbJ0Buw=
175+
go.uber.org/fx v1.22.2/go.mod h1:o/D9n+2mLP6v1EG+qsdT1O8wKopYAsqZasju97SDFCU=
174176
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
175177
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
176178
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=

fxgcppubsub/healthcheck/subscription_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/ankorstore/yokai-contrib/fxgcppubsub/healthcheck"
1010
"github.com/ankorstore/yokai/config"
1111
"github.com/ankorstore/yokai/fxconfig"
12+
"github.com/ankorstore/yokai/fxlog"
1213
"github.com/stretchr/testify/assert"
1314
"go.uber.org/fx"
1415
"go.uber.org/fx/fxtest"
@@ -35,6 +36,7 @@ func TestGcpPubSubSubscriptionsProbe(t *testing.T) {
3536
t,
3637
fx.NopLogger,
3738
fxconfig.FxConfigModule,
39+
fxlog.FxLogModule,
3840
fxgcppubsub.FxGcpPubSubModule,
3941
fx.Supply(fx.Annotate(ctx, fx.As(new(context.Context)))),
4042
fxgcppubsub.PrepareTopicAndSubscription(fxgcppubsub.PrepareTopicAndSubscriptionParams{
@@ -56,6 +58,7 @@ func TestGcpPubSubSubscriptionsProbe(t *testing.T) {
5658
t,
5759
fx.NopLogger,
5860
fxconfig.FxConfigModule,
61+
fxlog.FxLogModule,
5962
fxgcppubsub.FxGcpPubSubModule,
6063
fx.Supply(fx.Annotate(ctx, fx.As(new(context.Context)))),
6164
fx.Populate(&config, &client),

fxgcppubsub/healthcheck/topic_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/ankorstore/yokai-contrib/fxgcppubsub/healthcheck"
1010
"github.com/ankorstore/yokai/config"
1111
"github.com/ankorstore/yokai/fxconfig"
12+
"github.com/ankorstore/yokai/fxlog"
1213
"github.com/stretchr/testify/assert"
1314
"go.uber.org/fx"
1415
"go.uber.org/fx/fxtest"
@@ -35,6 +36,7 @@ func TestGcpPubSubTopicsProbe(t *testing.T) {
3536
t,
3637
fx.NopLogger,
3738
fxconfig.FxConfigModule,
39+
fxlog.FxLogModule,
3840
fxgcppubsub.FxGcpPubSubModule,
3941
fx.Supply(fx.Annotate(ctx, fx.As(new(context.Context)))),
4042
fxgcppubsub.PrepareTopic(fxgcppubsub.PrepareTopicParams{
@@ -55,6 +57,7 @@ func TestGcpPubSubTopicsProbe(t *testing.T) {
5557
t,
5658
fx.NopLogger,
5759
fxconfig.FxConfigModule,
60+
fxlog.FxLogModule,
5861
fxgcppubsub.FxGcpPubSubModule,
5962
fx.Supply(fx.Annotate(ctx, fx.As(new(context.Context)))),
6063
fx.Populate(&config, &client),

0 commit comments

Comments
 (0)