Skip to content

Commit cf8ef16

Browse files
shawnh2arkodg
authored andcommitted
feat: implement offline kubernetes controller (envoyproxy#5767)
Signed-off-by: Arko Dasgupta <[email protected]>
1 parent d896202 commit cf8ef16

File tree

13 files changed

+936
-475
lines changed

13 files changed

+936
-475
lines changed

internal/provider/file/file.go

Lines changed: 57 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"os"
1414
"path/filepath"
1515
"strings"
16+
"sync"
1617
"sync/atomic"
1718
"time"
1819

@@ -25,33 +26,45 @@ import (
2526
"github.com/envoyproxy/gateway/internal/envoygateway/config"
2627
"github.com/envoyproxy/gateway/internal/filewatcher"
2728
"github.com/envoyproxy/gateway/internal/message"
29+
"github.com/envoyproxy/gateway/internal/provider/kubernetes"
2830
"github.com/envoyproxy/gateway/internal/utils/path"
2931
)
3032

3133
type Provider struct {
32-
paths []string
33-
logger logr.Logger
34-
watcher filewatcher.FileWatcher
35-
resourcesStore *resourcesStore
36-
extensionManagerEnabled bool
34+
paths []string
35+
logger logr.Logger
36+
watcher filewatcher.FileWatcher
37+
resources *message.ProviderResources
38+
reconciler *kubernetes.OfflineGatewayAPIReconciler
39+
store *resourcesStore
40+
status *StatusHandler
3741

3842
// ready indicates whether the provider can start watching filesystem events.
3943
ready atomic.Bool
4044
}
4145

42-
func New(svr *config.Server, resources *message.ProviderResources) (*Provider, error) {
46+
func New(ctx context.Context, svr *config.Server, resources *message.ProviderResources) (*Provider, error) {
4347
logger := svr.Logger.Logger
4448
paths := sets.New[string]()
4549
if svr.EnvoyGateway.Provider.Custom.Resource.File != nil {
4650
paths.Insert(svr.EnvoyGateway.Provider.Custom.Resource.File.Paths...)
4751
}
4852

53+
// Create gateway-api offline reconciler.
54+
statusHandler := NewStatusHandler(logger)
55+
reconciler, err := kubernetes.NewOfflineGatewayAPIController(ctx, svr, statusHandler.Writer(), resources)
56+
if err != nil {
57+
return nil, fmt.Errorf("failed to create offline gateway-api controller")
58+
}
59+
4960
return &Provider{
50-
paths: paths.UnsortedList(),
51-
logger: logger,
52-
watcher: filewatcher.NewWatcher(),
53-
resourcesStore: newResourcesStore(svr.EnvoyGateway.Gateway.ControllerName, resources, logger),
54-
extensionManagerEnabled: svr.EnvoyGateway.ExtensionManager != nil,
61+
paths: paths.UnsortedList(),
62+
logger: logger,
63+
watcher: filewatcher.NewWatcher(),
64+
resources: resources,
65+
reconciler: reconciler,
66+
store: newResourcesStore(svr.EnvoyGateway.Gateway.ControllerName, reconciler.Client, resources, logger),
67+
status: statusHandler,
5568
}, nil
5669
}
5770

@@ -73,13 +86,18 @@ func (p *Provider) Start(ctx context.Context) error {
7386
}
7487
go p.startHealthProbeServer(ctx, readyzChecker)
7588

76-
// Subscribe resources status.
77-
p.subscribeAndUpdateStatus(ctx)
89+
// Offline controller should be started before initial resources load.
90+
// Nor we may lose some messages from controller.
91+
wg := new(sync.WaitGroup)
92+
wg.Add(2)
93+
go p.startReconciling(ctx, wg)
94+
go p.status.Start(ctx, wg)
95+
wg.Wait()
7896

7997
initDirs, initFiles := path.ListDirsAndFiles(p.paths)
80-
// Initially load resources from paths on host.
81-
if err := p.resourcesStore.LoadAndStore(initFiles.UnsortedList(), initDirs.UnsortedList()); err != nil {
82-
return fmt.Errorf("failed to load resources into store: %w", err)
98+
// Initially load resources.
99+
if err := p.store.ReloadAll(ctx, initFiles.UnsortedList(), initDirs.UnsortedList()); err != nil {
100+
p.logger.Error(err, "failed to reload resources initially")
83101
}
84102

85103
// Add paths to the watcher, and aggregate all path channels into one.
@@ -150,18 +168,31 @@ func (p *Provider) Start(ctx context.Context) error {
150168
}
151169
p.logger.Info("file changed", "op", event.Op, "name", event.Name, "dir", filepath.Dir(event.Name))
152170

153-
switch event.Op {
154-
case fsnotify.Create, fsnotify.Write, fsnotify.Remove:
155-
// Since we do not watch any events in the subdirectories, any events involving files
156-
// modifications in current directory will trigger the event handling.
157-
goto handle
158-
default:
159-
// do nothing
160-
continue
171+
handle:
172+
if err := p.store.ReloadAll(ctx, curFiles.UnsortedList(), curDirs.UnsortedList()); err != nil {
173+
p.logger.Error(err, "error when reload resources", "op", event.Op, "name", event.Name)
161174
}
175+
}
176+
}
177+
}
162178

163-
handle:
164-
p.resourcesStore.HandleEvent(curFiles.UnsortedList(), curDirs.UnsortedList())
179+
// startReconciling starts reconcile on offline controller when receiving signal from resources store.
180+
func (p *Provider) startReconciling(ctx context.Context, ready *sync.WaitGroup) {
181+
p.logger.Info("start reconciling")
182+
defer p.logger.Info("stop reconciling")
183+
ready.Done()
184+
185+
for {
186+
select {
187+
case rid := <-p.store.reconcile:
188+
p.logger.Info("start reconcile", "id", rid, "time", time.Now())
189+
if err := p.reconciler.Reconcile(ctx); err != nil {
190+
p.logger.Error(err, "failed to reconcile", "id", rid)
191+
}
192+
p.logger.Info("reconcile finished", "id", rid, "time", time.Now())
193+
194+
case <-ctx.Done():
195+
return
165196
}
166197
}
167198
}

0 commit comments

Comments
 (0)