Skip to content

Commit a8b98f6

Browse files
Metadata discovery (#17)
* upgrade tailscale, add waitforhosts function * revert tailscale upgrade * do metadata discovery by default * make wait for auth conditional * add debug logs * more debug logs * one more log * remove some logs; default mode * print host state * cleanup * remove log * more cleanup
1 parent 70c6fb2 commit a8b98f6

File tree

6 files changed

+232
-26
lines changed

6 files changed

+232
-26
lines changed

pkg/client.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,11 @@ func NewBlobCacheClient(ctx context.Context, cfg BlobCacheConfig) (*BlobCacheCli
7171
}
7272

7373
// Block until tailscale is authenticated (or the timeout is reached)
74-
err = tailscale.WaitForAuth(ctx, time.Second*30)
75-
if err != nil {
76-
return nil, err
74+
if cfg.Tailscale.WaitForAuth {
75+
err = tailscale.WaitForAuth(ctx, time.Second*30)
76+
if err != nil {
77+
return nil, err
78+
}
7779
}
7880

7981
metadata, err := NewBlobCacheMetadata(cfg.Metadata)
@@ -93,8 +95,9 @@ func NewBlobCacheClient(ctx context.Context, cfg BlobCacheConfig) (*BlobCacheCli
9395
metadata: metadata,
9496
closestHostWithCapacity: nil,
9597
}
98+
9699
bc.hostMap = NewHostMap(cfg, bc.addHost)
97-
bc.discoveryClient = NewDiscoveryClient(cfg, tailscale, bc.hostMap)
100+
bc.discoveryClient = NewDiscoveryClient(cfg, tailscale, bc.hostMap, metadata)
98101

99102
// Start searching for nearby blobcache hosts
100103
go bc.discoveryClient.StartInBackground(bc.ctx)

pkg/config.default.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ maxCachePct: 60
88
grpcMessageSizeBytes: 1000000000
99
grpcDialTimeoutS: 1
1010
discoveryIntervalS: 5
11+
discoveryMode: metadata
1112
blobfs:
1213
enabled: false
1314
mountPoint: /tmp/test
1415
maxBackgroundTasks: 512
1516
maxReadAheadKB: 128
1617
tailscale:
18+
waitForAuth: false
1719
controlUrl:
1820
user:
1921
authKey:
@@ -25,4 +27,4 @@ tailscale:
2527
metadata:
2628
redisAddr:
2729
redisPasswd:
28-
redisTLSEnabled: false
30+
redisTLSEnabled: false

pkg/discovery.go

Lines changed: 89 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@ type DiscoveryClient struct {
1818
tailscale *Tailscale
1919
cfg BlobCacheConfig
2020
hostMap *HostMap
21+
metadata *BlobCacheMetadata
2122
mu sync.Mutex
23+
tsClient *tailscale.LocalClient
2224
}
2325

24-
func NewDiscoveryClient(cfg BlobCacheConfig, tailscale *Tailscale, hostMap *HostMap) *DiscoveryClient {
26+
func NewDiscoveryClient(cfg BlobCacheConfig, tailscale *Tailscale, hostMap *HostMap, metadata *BlobCacheMetadata) *DiscoveryClient {
2527
return &DiscoveryClient{
2628
cfg: cfg,
2729
tailscale: tailscale,
2830
hostMap: hostMap,
31+
metadata: metadata,
32+
tsClient: nil,
2933
}
3034
}
3135

@@ -37,17 +41,26 @@ func (d *DiscoveryClient) updateHostMap(newHosts []*BlobCacheHost) {
3741

3842
// Used by blobcache servers to discover their closest peers
3943
func (d *DiscoveryClient) StartInBackground(ctx context.Context) error {
40-
server, err := d.tailscale.GetOrCreateServer()
41-
if err != nil {
42-
return err
44+
// Default to metadata discovery if no mode is specified
45+
if d.cfg.DiscoveryMode == "" {
46+
d.cfg.DiscoveryMode = string(DiscoveryModeMetadata)
4347
}
4448

45-
client, err := server.LocalClient()
46-
if err != nil {
47-
return err
49+
if d.cfg.DiscoveryMode == string(DiscoveryModeTailscale) {
50+
server, err := d.tailscale.GetOrCreateServer()
51+
if err != nil {
52+
return err
53+
}
54+
55+
client, err := server.LocalClient()
56+
if err != nil {
57+
return err
58+
}
59+
60+
d.tsClient = client
4861
}
4962

50-
hosts, err := d.FindNearbyHosts(ctx, client)
63+
hosts, err := d.FindNearbyHosts(ctx)
5164
if err == nil {
5265
d.updateHostMap(hosts)
5366
}
@@ -56,7 +69,7 @@ func (d *DiscoveryClient) StartInBackground(ctx context.Context) error {
5669
for {
5770
select {
5871
case <-ticker.C:
59-
hosts, err := d.FindNearbyHosts(ctx, client)
72+
hosts, err := d.FindNearbyHosts(ctx)
6073
if err != nil {
6174
continue
6275
}
@@ -68,8 +81,8 @@ func (d *DiscoveryClient) StartInBackground(ctx context.Context) error {
6881
}
6982
}
7083

71-
func (d *DiscoveryClient) FindNearbyHosts(ctx context.Context, client *tailscale.LocalClient) ([]*BlobCacheHost, error) {
72-
status, err := client.Status(ctx)
84+
func (d *DiscoveryClient) discoverHostsViaTailscale(ctx context.Context) ([]*BlobCacheHost, error) {
85+
status, err := d.tsClient.Status(ctx)
7386
if err != nil {
7487
return nil, err
7588
}
@@ -87,6 +100,8 @@ func (d *DiscoveryClient) FindNearbyHosts(ctx context.Context, client *tailscale
87100
wg.Add(1)
88101

89102
go func(hostname string) {
103+
Logger.Debugf("Discovered host: %s", hostname)
104+
90105
defer wg.Done()
91106

92107
hostname = hostname[:len(hostname)-1] // Strip the last period
@@ -105,7 +120,6 @@ func (d *DiscoveryClient) FindNearbyHosts(ctx context.Context, client *tailscale
105120
d.mu.Lock()
106121
defer d.mu.Unlock()
107122
hosts = append(hosts, host)
108-
109123
}(peer.DNSName)
110124
}
111125
}
@@ -114,6 +128,69 @@ func (d *DiscoveryClient) FindNearbyHosts(ctx context.Context, client *tailscale
114128
return hosts, nil
115129
}
116130

131+
func (d *DiscoveryClient) discoverHostsViaMetadata(ctx context.Context) ([]*BlobCacheHost, error) {
132+
hosts, err := d.metadata.GetAvailableHosts(ctx)
133+
if err != nil {
134+
return nil, err
135+
}
136+
137+
var wg sync.WaitGroup
138+
filteredHosts := []*BlobCacheHost{}
139+
mu := sync.Mutex{}
140+
141+
for _, host := range hosts {
142+
if host.PrivateAddr != "" {
143+
addr := fmt.Sprintf("%s:%d", host.PrivateAddr, d.cfg.Port)
144+
145+
// Don't try to get the state on peers we're already aware of
146+
if d.hostMap.Get(addr) != nil {
147+
continue
148+
}
149+
150+
wg.Add(1)
151+
go func(addr string) {
152+
defer wg.Done()
153+
154+
hostState, err := d.GetHostState(ctx, addr)
155+
if err != nil {
156+
return
157+
}
158+
159+
mu.Lock()
160+
filteredHosts = append(filteredHosts, hostState)
161+
mu.Unlock()
162+
163+
Logger.Debugf("Added host with private address to map: %s", hostState.PrivateAddr)
164+
}(addr)
165+
}
166+
}
167+
168+
wg.Wait()
169+
return filteredHosts, nil
170+
}
171+
172+
func (d *DiscoveryClient) FindNearbyHosts(ctx context.Context) ([]*BlobCacheHost, error) {
173+
var hosts []*BlobCacheHost
174+
var err error
175+
176+
switch d.cfg.DiscoveryMode {
177+
case string(DiscoveryModeTailscale):
178+
hosts, err = d.discoverHostsViaTailscale(ctx)
179+
if err != nil {
180+
return nil, err
181+
}
182+
case string(DiscoveryModeMetadata):
183+
hosts, err = d.discoverHostsViaMetadata(ctx)
184+
if err != nil {
185+
return nil, err
186+
}
187+
default:
188+
return nil, fmt.Errorf("invalid discovery mode: %s", d.cfg.DiscoveryMode)
189+
}
190+
191+
return hosts, nil
192+
}
193+
117194
// checkService attempts to connect to the gRPC service and verifies its availability
118195
func (d *DiscoveryClient) GetHostState(ctx context.Context, addr string) (*BlobCacheHost, error) {
119196
host := BlobCacheHost{

pkg/metadata.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package blobcache
22

33
import (
44
"context"
5+
"encoding/json"
6+
"errors"
57
"fmt"
68
"os"
79
"path/filepath"
@@ -285,6 +287,76 @@ func (m *BlobCacheMetadata) GetFsNodeChildren(ctx context.Context, id string) ([
285287
return entries, nil
286288
}
287289

290+
func (m *BlobCacheMetadata) GetAvailableHosts(ctx context.Context) ([]*BlobCacheHost, error) {
291+
hostAddrs, err := m.rdb.SMembers(ctx, MetadataKeys.MetadataHostIndex()).Result()
292+
if err != nil {
293+
return nil, err
294+
}
295+
296+
hosts := []*BlobCacheHost{}
297+
for _, addr := range hostAddrs {
298+
hostBytes, err := m.rdb.Get(ctx, MetadataKeys.MetadataHostKeepAlive(addr)).Bytes()
299+
if err != nil {
300+
301+
// If the keepalive key doesn't exist, remove the host index key
302+
if err == redis.Nil {
303+
m.RemoveHostFromIndex(ctx, &BlobCacheHost{Addr: addr})
304+
}
305+
306+
continue
307+
}
308+
309+
host := &BlobCacheHost{}
310+
if err = json.Unmarshal(hostBytes, host); err != nil {
311+
continue
312+
}
313+
314+
hosts = append(hosts, host)
315+
}
316+
317+
if len(hosts) == 0 {
318+
return nil, errors.New("no available hosts")
319+
}
320+
321+
return hosts, nil
322+
}
323+
324+
func (m *BlobCacheMetadata) AddHostToIndex(ctx context.Context, host *BlobCacheHost) error {
325+
err := m.rdb.SAdd(ctx, MetadataKeys.MetadataHostIndex(), host.Addr).Err()
326+
if err != nil {
327+
return err
328+
}
329+
330+
return m.SetHostKeepAlive(ctx, host)
331+
}
332+
333+
func (m *BlobCacheMetadata) RemoveHostFromIndex(ctx context.Context, host *BlobCacheHost) error {
334+
return m.rdb.SRem(ctx, MetadataKeys.MetadataHostIndex(), host.Addr).Err()
335+
}
336+
337+
func (m *BlobCacheMetadata) SetHostKeepAlive(ctx context.Context, host *BlobCacheHost) error {
338+
hostBytes, err := json.Marshal(host)
339+
if err != nil {
340+
return err
341+
}
342+
343+
return m.rdb.Set(ctx, MetadataKeys.MetadataHostKeepAlive(host.Addr), hostBytes, time.Duration(defaultHostKeepAliveTimeoutS)*time.Second).Err()
344+
}
345+
346+
func (m *BlobCacheMetadata) GetHostIndex(ctx context.Context) ([]*BlobCacheHost, error) {
347+
hostAddrs, err := m.rdb.SMembers(ctx, MetadataKeys.MetadataHostIndex()).Result()
348+
if err != nil {
349+
return nil, err
350+
}
351+
352+
hosts := []*BlobCacheHost{}
353+
for _, addr := range hostAddrs {
354+
hosts = append(hosts, &BlobCacheHost{Addr: addr})
355+
}
356+
357+
return hosts, nil
358+
}
359+
288360
func (m *BlobCacheMetadata) AddFsNodeChild(ctx context.Context, pid, id string) error {
289361
err := m.rdb.SAdd(ctx, MetadataKeys.MetadataFsNodeChildren(pid), id).Err()
290362
if err != nil {
@@ -300,12 +372,14 @@ func (m *BlobCacheMetadata) RemoveFsNodeChild(ctx context.Context, id string) er
300372
// Metadata key storage format
301373
var (
302374
metadataPrefix string = "blobcache"
375+
metadataHostIndex string = "blobcache:host_index"
303376
metadataEntry string = "blobcache:entry:%s"
304377
metadataClientLock string = "blobcache:client_lock:%s:%s"
305378
metadataLocation string = "blobcache:location:%s"
306379
metadataRef string = "blobcache:ref:%s"
307380
metadataFsNode string = "blobcache:fs:node:%s"
308381
metadataFsNodeChildren string = "blobcache:fs:node:%s:children"
382+
metadataHostKeepAlive string = "blobcache:host:keepalive:%s"
309383
)
310384

311385
// Metadata keys
@@ -317,6 +391,14 @@ func (k *metadataKeys) MetadataEntry(hash string) string {
317391
return fmt.Sprintf(metadataEntry, hash)
318392
}
319393

394+
func (k *metadataKeys) MetadataHostIndex() string {
395+
return metadataHostIndex
396+
}
397+
398+
func (k *metadataKeys) MetadataHostKeepAlive(addr string) string {
399+
return fmt.Sprintf(metadataHostKeepAlive, addr)
400+
}
401+
320402
func (k *metadataKeys) MetadataLocation(hash string) string {
321403
return fmt.Sprintf(metadataLocation, hash)
322404
}

0 commit comments

Comments
 (0)