Skip to content

Commit 545f8c0

Browse files
authored
feat(manager): add redis proxy for manager and support redis username (#4041)
feat(manager): add redis proxy for manager Signed-off-by: Gaius <[email protected]>
1 parent 530ba54 commit 545f8c0

File tree

19 files changed

+271
-89
lines changed

19 files changed

+271
-89
lines changed

go.mod

+5-6
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ require (
66
cloud.google.com/go/storage v1.50.0
77
d7y.io/api/v2 v2.1.39
88
github.com/MysteriousPotato/go-lockable v1.0.0
9-
github.com/RichardKnop/machinery v1.10.8
109
github.com/Showmax/go-fqdn v1.0.0
1110
github.com/VividCortex/mysqlerr v1.0.0
1211
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
@@ -24,6 +23,7 @@ require (
2423
github.com/docker/docker v28.1.1+incompatible
2524
github.com/docker/go-connections v0.5.0
2625
github.com/docker/go-units v0.4.0
26+
github.com/dragonflyoss/machinery v1.10.10
2727
github.com/elastic/go-freelru v0.16.0
2828
github.com/fsouza/fake-gcs-server v1.52.2
2929
github.com/gaius-qi/ping v1.0.0
@@ -42,7 +42,7 @@ require (
4242
github.com/go-sql-driver/mysql v1.9.2
4343
github.com/gofrs/flock v0.8.1
4444
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
45-
github.com/gomodule/redigo v2.0.0+incompatible
45+
github.com/gomodule/redigo v1.8.10-0.20230511231101-78e255f9bd2a
4646
github.com/google/go-github v17.0.0+incompatible
4747
github.com/google/uuid v1.6.0
4848
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
@@ -170,7 +170,6 @@ require (
170170
github.com/go-openapi/swag v0.23.0 // indirect
171171
github.com/go-playground/locales v0.14.1 // indirect
172172
github.com/go-playground/universal-translator v0.18.1 // indirect
173-
github.com/go-stack/stack v1.8.1 // indirect
174173
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
175174
github.com/goccy/go-json v0.10.5 // indirect
176175
github.com/gogo/protobuf v1.3.2 // indirect
@@ -272,14 +271,14 @@ require (
272271
github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect
273272
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
274273
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
275-
github.com/xdg-go/scram v1.0.2 // indirect
276-
github.com/xdg-go/stringprep v1.0.2 // indirect
274+
github.com/xdg-go/scram v1.1.2 // indirect
275+
github.com/xdg-go/stringprep v1.0.4 // indirect
277276
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
278277
github.com/yusufpapurcu/wmi v1.2.4 // indirect
279278
go.etcd.io/etcd/api/v3 v3.5.17 // indirect
280279
go.etcd.io/etcd/client/pkg/v3 v3.5.17 // indirect
281280
go.etcd.io/etcd/client/v3 v3.5.17 // indirect
282-
go.mongodb.org/mongo-driver v1.9.1 // indirect
281+
go.mongodb.org/mongo-driver v1.17.0 // indirect
283282
go.opencensus.io v0.24.0 // indirect
284283
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
285284
go.opentelemetry.io/contrib/detectors/gcp v1.34.0 // indirect

go.sum

+48-67
Large diffs are not rendered by default.

internal/job/job.go

+20-6
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ import (
2626
"strings"
2727
"time"
2828

29-
"github.com/RichardKnop/machinery/v1"
30-
machineryv1config "github.com/RichardKnop/machinery/v1/config"
31-
machineryv1log "github.com/RichardKnop/machinery/v1/log"
32-
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
29+
"github.com/dragonflyoss/machinery/v1"
30+
machineryv1config "github.com/dragonflyoss/machinery/v1/config"
31+
machineryv1log "github.com/dragonflyoss/machinery/v1/log"
32+
machineryv1tasks "github.com/dragonflyoss/machinery/v1/tasks"
3333
"github.com/redis/go-redis/v9"
3434

3535
logger "d7y.io/dragonfly/v2/internal/dflog"
@@ -68,10 +68,24 @@ func New(cfg *Config, queue Queue) (*Job, error) {
6868
return nil, err
6969
}
7070

71+
var broker string
72+
if cfg.Username != "" {
73+
broker = fmt.Sprintf("redis://%s:%s@%s/%d", url.QueryEscape(cfg.Username), url.QueryEscape(cfg.Password), strings.Join(cfg.Addrs, ","), cfg.BrokerDB)
74+
} else {
75+
broker = fmt.Sprintf("redis://%s@%s/%d", url.QueryEscape(cfg.Password), strings.Join(cfg.Addrs, ","), cfg.BrokerDB)
76+
}
77+
78+
var backend string
79+
if cfg.Username != "" {
80+
backend = fmt.Sprintf("redis://%s:%s@%s/%d", url.QueryEscape(cfg.Username), url.QueryEscape(cfg.Password), strings.Join(cfg.Addrs, ","), cfg.BackendDB)
81+
} else {
82+
backend = fmt.Sprintf("redis://%s@%s/%d", url.QueryEscape(cfg.Password), strings.Join(cfg.Addrs, ","), cfg.BackendDB)
83+
}
84+
7185
server, err := machinery.NewServer(&machineryv1config.Config{
72-
Broker: fmt.Sprintf("redis://%s@%s/%d", url.QueryEscape(cfg.Password), strings.Join(cfg.Addrs, ","), cfg.BrokerDB),
86+
Broker: broker,
7387
DefaultQueue: queue.String(),
74-
ResultBackend: fmt.Sprintf("redis://%s@%s/%d", url.QueryEscape(cfg.Password), strings.Join(cfg.Addrs, ","), cfg.BackendDB),
88+
ResultBackend: backend,
7589
ResultsExpireIn: DefaultResultsExpireIn,
7690
Redis: &machineryv1config.RedisConfig{
7791
MasterName: cfg.MasterName,

internal/job/job_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
"reflect"
2121
"testing"
2222

23-
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
23+
machineryv1tasks "github.com/dragonflyoss/machinery/v1/tasks"
2424
"github.com/stretchr/testify/assert"
2525
)
2626

manager/config/config.go

+26
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,14 @@ type PostgresConfig struct {
190190
Migrate bool `yaml:"migrate" mapstructure:"migrate"`
191191
}
192192

193+
type RedisProxyConfig struct {
194+
// Enable redis proxy.
195+
Enable bool `yaml:"enable" mapstructure:"enable"`
196+
197+
// Proxy address to listen on, default is ":65100".
198+
Addr string `yaml:"addr" mapstructure:"addr"`
199+
}
200+
193201
// RedisConfig is redis configuration.
194202
// see: https://redis.uptrace.dev/guide/universal.html
195203
type RedisConfig struct {
@@ -225,6 +233,14 @@ type RedisConfig struct {
225233

226234
// BackendDB is server backend DB name.
227235
BackendDB int `yaml:"backendDB" mapstructure:"backendDB"`
236+
237+
// Proxy is redis proxy configuration.
238+
// If enabled, the manager starts a TCP proxy (defaulting to port 65100) that
239+
// forwards requests to the Redis service. This allows Schedulers to connect to Redis
240+
// via the manager's local port, which is useful for network isolation as only the
241+
// manager's IP and port need to be exposed.
242+
// Note: Only a single Redis address is supported by the proxy.
243+
Proxy RedisProxyConfig `yaml:"proxy" mapstructure:"proxy"`
228244
}
229245

230246
type CacheConfig struct {
@@ -420,6 +436,10 @@ func New() *Config {
420436
DB: DefaultRedisDB,
421437
BrokerDB: DefaultRedisBrokerDB,
422438
BackendDB: DefaultRedisBackendDB,
439+
Proxy: RedisProxyConfig{
440+
Enable: false,
441+
Addr: DefaultRedisProxyAddr,
442+
},
423443
},
424444
},
425445
Cache: CacheConfig{
@@ -596,6 +616,12 @@ func (cfg *Config) Validate() error {
596616
return errors.New("redis requires parameter backendDB")
597617
}
598618

619+
if cfg.Database.Redis.Proxy.Enable {
620+
if cfg.Database.Redis.Proxy.Addr == "" {
621+
return errors.New("redis proxy requires parameter addr")
622+
}
623+
}
624+
599625
if cfg.Cache.Redis.TTL == 0 {
600626
return errors.New("redis requires parameter ttl")
601627
}

manager/config/config_test.go

+20
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@ func TestConfig_Load(t *testing.T) {
165165
DB: 0,
166166
BrokerDB: 1,
167167
BackendDB: 2,
168+
Proxy: RedisProxyConfig{
169+
Enable: true,
170+
Addr: ":65101",
171+
},
168172
},
169173
},
170174
Cache: CacheConfig{
@@ -701,6 +705,22 @@ func TestConfig_Validate(t *testing.T) {
701705
assert.EqualError(err, "redis requires parameter ttl")
702706
},
703707
},
708+
{
709+
name: "redis proxy requires parameter addr",
710+
config: New(),
711+
mock: func(cfg *Config) {
712+
cfg.Auth.JWT = mockJWTConfig
713+
cfg.Database.Type = DatabaseTypeMysql
714+
cfg.Database.Mysql = mockMysqlConfig
715+
cfg.Database.Redis = mockRedisConfig
716+
cfg.Database.Redis.Proxy.Enable = true
717+
cfg.Database.Redis.Proxy.Addr = ""
718+
},
719+
expect: func(t *testing.T, err error) {
720+
assert := assert.New(t)
721+
assert.EqualError(err, "redis proxy requires parameter addr")
722+
},
723+
},
704724
{
705725
name: "local requires parameter size",
706726
config: New(),

manager/config/constants.go

+3
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ const (
6565

6666
// DefaultRedisBackendDB is default db for redis backend.
6767
DefaultRedisBackendDB = 2
68+
69+
// DefaultRedisProxyAddr is default address for redis proxy.
70+
DefaultRedisProxyAddr = ":65100"
6871
)
6972

7073
const (

manager/config/testdata/manager.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ database:
6060
db: 0
6161
brokerDB: 1
6262
backendDB: 2
63+
proxy:
64+
enable: true
65+
addr: ":65101"
6366

6467
cache:
6568
redis:

manager/job/preheat.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"net/url"
3030
"time"
3131

32-
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
3332
"github.com/containerd/containerd/platforms"
3433
"github.com/docker/distribution"
3534
"github.com/docker/distribution/manifest/manifestlist"
@@ -41,6 +40,7 @@ import (
4140
"github.com/docker/distribution/registry/client/transport"
4241
typesregistry "github.com/docker/docker/api/types/registry"
4342
"github.com/docker/docker/registry"
43+
machineryv1tasks "github.com/dragonflyoss/machinery/v1/tasks"
4444
"github.com/google/uuid"
4545
specs "github.com/opencontainers/image-spec/specs-go/v1"
4646
"go.opentelemetry.io/otel/trace"

manager/job/sync_peers.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
"sync"
2626
"time"
2727

28-
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
28+
machineryv1tasks "github.com/dragonflyoss/machinery/v1/tasks"
2929
"github.com/google/uuid"
3030
"go.opentelemetry.io/otel/trace"
3131
"gorm.io/gorm"

manager/job/task.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
"fmt"
2424
"time"
2525

26-
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
26+
machineryv1tasks "github.com/dragonflyoss/machinery/v1/tasks"
2727
"github.com/google/uuid"
2828
"go.opentelemetry.io/otel/trace"
2929

manager/job/task_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
"errors"
2222
"testing"
2323

24-
"github.com/RichardKnop/machinery/v1"
24+
"github.com/dragonflyoss/machinery/v1"
2525
"github.com/stretchr/testify/assert"
2626

2727
"d7y.io/dragonfly/v2/internal/job"

manager/manager.go

+27
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
pkggc "d7y.io/dragonfly/v2/pkg/gc"
4747
"d7y.io/dragonfly/v2/pkg/net/ip"
4848
"d7y.io/dragonfly/v2/pkg/objectstorage"
49+
"d7y.io/dragonfly/v2/pkg/redis"
4950
"d7y.io/dragonfly/v2/pkg/rpc"
5051
)
5152

@@ -106,6 +107,9 @@ type Server struct {
106107

107108
// Metrics server.
108109
metricsServer *http.Server
110+
111+
// Redis proxy.
112+
redisProxy redis.Proxy
109113
}
110114

111115
// New creates a new manager server.
@@ -221,6 +225,10 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) {
221225
s.metricsServer = metrics.New(&cfg.Metrics, grpcServer)
222226
}
223227

228+
if cfg.Database.Redis.Proxy.Enable {
229+
s.redisProxy = redis.NewProxy(cfg.Database.Redis.Proxy.Addr, cfg.Database.Redis.Addrs[0])
230+
}
231+
224232
return s, nil
225233
}
226234

@@ -234,13 +242,15 @@ func (s *Server) Serve() error {
234242
if err == http.ErrServerClosed {
235243
return
236244
}
245+
237246
logger.Fatalf("rest server closed unexpect: %v", err)
238247
}
239248
} else {
240249
if err := s.restServer.ListenAndServe(); err != nil {
241250
if err == http.ErrServerClosed {
242251
return
243252
}
253+
244254
logger.Fatalf("rest server closed unexpect: %v", err)
245255
}
246256
}
@@ -254,11 +264,22 @@ func (s *Server) Serve() error {
254264
if err == http.ErrServerClosed {
255265
return
256266
}
267+
257268
logger.Fatalf("metrics server closed unexpect: %v", err)
258269
}
259270
}()
260271
}
261272

273+
// Started redis proxy server.
274+
if s.redisProxy != nil {
275+
go func() {
276+
logger.Infof("started redis proxy server at %s", s.config.Database.Redis.Proxy.Addr)
277+
if err := s.redisProxy.Serve(); err != nil {
278+
logger.Fatalf("redis proxy server closed unexpect: %v", err)
279+
}
280+
}()
281+
}
282+
262283
// Started job server.
263284
go func() {
264285
logger.Info("started job server")
@@ -315,6 +336,12 @@ func (s *Server) Stop() {
315336
}
316337
}
317338

339+
// Stop redis proxy server.
340+
if s.redisProxy != nil {
341+
s.redisProxy.Stop()
342+
logger.Info("redis proxy server closed under request")
343+
}
344+
318345
// Stop job server.
319346
s.job.Stop()
320347

manager/service/job.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
"errors"
2222
"fmt"
2323

24-
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
24+
machineryv1tasks "github.com/dragonflyoss/machinery/v1/tasks"
2525

2626
logger "d7y.io/dragonfly/v2/internal/dflog"
2727
internaljob "d7y.io/dragonfly/v2/internal/job"

manager/service/preheat.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
"context"
2121
"strconv"
2222

23-
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
23+
machineryv1tasks "github.com/dragonflyoss/machinery/v1/tasks"
2424
"google.golang.org/grpc/codes"
2525
"google.golang.org/grpc/status"
2626

0 commit comments

Comments
 (0)