Skip to content

Metadata discovery #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions pkg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@ func NewBlobCacheClient(ctx context.Context, cfg BlobCacheConfig) (*BlobCacheCli
}

// Block until tailscale is authenticated (or the timeout is reached)
err = tailscale.WaitForAuth(ctx, time.Second*30)
if err != nil {
return nil, err
if cfg.Tailscale.WaitForAuth {
err = tailscale.WaitForAuth(ctx, time.Second*30)
if err != nil {
return nil, err
}
}

metadata, err := NewBlobCacheMetadata(cfg.Metadata)
Expand All @@ -93,8 +95,9 @@ func NewBlobCacheClient(ctx context.Context, cfg BlobCacheConfig) (*BlobCacheCli
metadata: metadata,
closestHostWithCapacity: nil,
}

bc.hostMap = NewHostMap(cfg, bc.addHost)
bc.discoveryClient = NewDiscoveryClient(cfg, tailscale, bc.hostMap)
bc.discoveryClient = NewDiscoveryClient(cfg, tailscale, bc.hostMap, metadata)

// Start searching for nearby blobcache hosts
go bc.discoveryClient.StartInBackground(bc.ctx)
Expand Down Expand Up @@ -457,6 +460,25 @@ func (c *BlobCacheClient) HostsAvailable() bool {
return c.hostMap.Members().Cardinality() > 0
}

func (c *BlobCacheClient) WaitForHosts(timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

Logger.Infof("Waiting for hosts to be available...")
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if c.HostsAvailable() {
return nil
}

time.Sleep(1 * time.Second)
}
}
}

func (c *BlobCacheClient) GetState() error {
ctx, cancel := context.WithTimeout(c.ctx, getContentRequestTimeout)
defer cancel()
Expand Down
4 changes: 3 additions & 1 deletion pkg/config.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ maxCachePct: 60
grpcMessageSizeBytes: 1000000000
grpcDialTimeoutS: 1
discoveryIntervalS: 5
discoveryMode: metadata
blobfs:
enabled: false
mountPoint: /tmp/test
maxBackgroundTasks: 512
maxReadAheadKB: 128
tailscale:
waitForAuth: false
controlUrl:
user:
authKey:
Expand All @@ -25,4 +27,4 @@ tailscale:
metadata:
redisAddr:
redisPasswd:
redisTLSEnabled: false
redisTLSEnabled: false
101 changes: 89 additions & 12 deletions pkg/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ type DiscoveryClient struct {
tailscale *Tailscale
cfg BlobCacheConfig
hostMap *HostMap
metadata *BlobCacheMetadata
mu sync.Mutex
tsClient *tailscale.LocalClient
}

func NewDiscoveryClient(cfg BlobCacheConfig, tailscale *Tailscale, hostMap *HostMap) *DiscoveryClient {
func NewDiscoveryClient(cfg BlobCacheConfig, tailscale *Tailscale, hostMap *HostMap, metadata *BlobCacheMetadata) *DiscoveryClient {
return &DiscoveryClient{
cfg: cfg,
tailscale: tailscale,
hostMap: hostMap,
metadata: metadata,
tsClient: nil,
}
}

Expand All @@ -37,17 +41,26 @@ func (d *DiscoveryClient) updateHostMap(newHosts []*BlobCacheHost) {

// Used by blobcache servers to discover their closest peers
func (d *DiscoveryClient) StartInBackground(ctx context.Context) error {
server, err := d.tailscale.GetOrCreateServer()
if err != nil {
return err
// Default to metadata discovery if no mode is specified
if d.cfg.DiscoveryMode == "" {
d.cfg.DiscoveryMode = string(DiscoveryModeMetadata)
}

client, err := server.LocalClient()
if err != nil {
return err
if d.cfg.DiscoveryMode == string(DiscoveryModeTailscale) {
server, err := d.tailscale.GetOrCreateServer()
if err != nil {
return err
}

client, err := server.LocalClient()
if err != nil {
return err
}

d.tsClient = client
}

hosts, err := d.FindNearbyHosts(ctx, client)
hosts, err := d.FindNearbyHosts(ctx)
if err == nil {
d.updateHostMap(hosts)
}
Expand All @@ -56,7 +69,7 @@ func (d *DiscoveryClient) StartInBackground(ctx context.Context) error {
for {
select {
case <-ticker.C:
hosts, err := d.FindNearbyHosts(ctx, client)
hosts, err := d.FindNearbyHosts(ctx)
if err != nil {
continue
}
Expand All @@ -68,8 +81,8 @@ func (d *DiscoveryClient) StartInBackground(ctx context.Context) error {
}
}

func (d *DiscoveryClient) FindNearbyHosts(ctx context.Context, client *tailscale.LocalClient) ([]*BlobCacheHost, error) {
status, err := client.Status(ctx)
func (d *DiscoveryClient) discoverHostsViaTailscale(ctx context.Context) ([]*BlobCacheHost, error) {
status, err := d.tsClient.Status(ctx)
if err != nil {
return nil, err
}
Expand All @@ -87,6 +100,8 @@ func (d *DiscoveryClient) FindNearbyHosts(ctx context.Context, client *tailscale
wg.Add(1)

go func(hostname string) {
Logger.Debugf("Discovered host: %s", hostname)

defer wg.Done()

hostname = hostname[:len(hostname)-1] // Strip the last period
Expand All @@ -105,7 +120,6 @@ func (d *DiscoveryClient) FindNearbyHosts(ctx context.Context, client *tailscale
d.mu.Lock()
defer d.mu.Unlock()
hosts = append(hosts, host)

}(peer.DNSName)
}
}
Expand All @@ -114,6 +128,69 @@ func (d *DiscoveryClient) FindNearbyHosts(ctx context.Context, client *tailscale
return hosts, nil
}

func (d *DiscoveryClient) discoverHostsViaMetadata(ctx context.Context) ([]*BlobCacheHost, error) {
hosts, err := d.metadata.GetAvailableHosts(ctx)
if err != nil {
return nil, err
}

var wg sync.WaitGroup
filteredHosts := []*BlobCacheHost{}
mu := sync.Mutex{}

for _, host := range hosts {
if host.PrivateAddr != "" {
addr := fmt.Sprintf("%s:%d", host.PrivateAddr, d.cfg.Port)

// Don't try to get the state on peers we're already aware of
if d.hostMap.Get(addr) != nil {
continue
}

wg.Add(1)
go func(addr string) {
defer wg.Done()

hostState, err := d.GetHostState(ctx, addr)
if err != nil {
return
}

mu.Lock()
filteredHosts = append(filteredHosts, hostState)
mu.Unlock()

Logger.Debugf("Added host with private address to map: %s", hostState.PrivateAddr)
}(addr)
}
}

wg.Wait()
return filteredHosts, nil
}

func (d *DiscoveryClient) FindNearbyHosts(ctx context.Context) ([]*BlobCacheHost, error) {
var hosts []*BlobCacheHost
var err error

switch d.cfg.DiscoveryMode {
case string(DiscoveryModeTailscale):
hosts, err = d.discoverHostsViaTailscale(ctx)
if err != nil {
return nil, err
}
case string(DiscoveryModeMetadata):
hosts, err = d.discoverHostsViaMetadata(ctx)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("invalid discovery mode: %s", d.cfg.DiscoveryMode)
}

return hosts, nil
}

// checkService attempts to connect to the gRPC service and verifies its availability
func (d *DiscoveryClient) GetHostState(ctx context.Context, addr string) (*BlobCacheHost, error) {
host := BlobCacheHost{
Expand Down
82 changes: 82 additions & 0 deletions pkg/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package blobcache

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -285,6 +287,76 @@ func (m *BlobCacheMetadata) GetFsNodeChildren(ctx context.Context, id string) ([
return entries, nil
}

func (m *BlobCacheMetadata) GetAvailableHosts(ctx context.Context) ([]*BlobCacheHost, error) {
hostAddrs, err := m.rdb.SMembers(ctx, MetadataKeys.MetadataHostIndex()).Result()
if err != nil {
return nil, err
}

hosts := []*BlobCacheHost{}
for _, addr := range hostAddrs {
hostBytes, err := m.rdb.Get(ctx, MetadataKeys.MetadataHostKeepAlive(addr)).Bytes()
if err != nil {

// If the keepalive key doesn't exist, remove the host index key
if err == redis.Nil {
m.RemoveHostFromIndex(ctx, &BlobCacheHost{Addr: addr})
}

continue
}

host := &BlobCacheHost{}
if err = json.Unmarshal(hostBytes, host); err != nil {
continue
}

hosts = append(hosts, host)
}

if len(hosts) == 0 {
return nil, errors.New("no available hosts")
}

return hosts, nil
}

func (m *BlobCacheMetadata) AddHostToIndex(ctx context.Context, host *BlobCacheHost) error {
err := m.rdb.SAdd(ctx, MetadataKeys.MetadataHostIndex(), host.Addr).Err()
if err != nil {
return err
}

return m.SetHostKeepAlive(ctx, host)
}

func (m *BlobCacheMetadata) RemoveHostFromIndex(ctx context.Context, host *BlobCacheHost) error {
return m.rdb.SRem(ctx, MetadataKeys.MetadataHostIndex(), host.Addr).Err()
}

func (m *BlobCacheMetadata) SetHostKeepAlive(ctx context.Context, host *BlobCacheHost) error {
hostBytes, err := json.Marshal(host)
if err != nil {
return err
}

return m.rdb.Set(ctx, MetadataKeys.MetadataHostKeepAlive(host.Addr), hostBytes, time.Duration(defaultHostKeepAliveTimeoutS)*time.Second).Err()
}

func (m *BlobCacheMetadata) GetHostIndex(ctx context.Context) ([]*BlobCacheHost, error) {
hostAddrs, err := m.rdb.SMembers(ctx, MetadataKeys.MetadataHostIndex()).Result()
if err != nil {
return nil, err
}

hosts := []*BlobCacheHost{}
for _, addr := range hostAddrs {
hosts = append(hosts, &BlobCacheHost{Addr: addr})
}

return hosts, nil
}

func (m *BlobCacheMetadata) AddFsNodeChild(ctx context.Context, pid, id string) error {
err := m.rdb.SAdd(ctx, MetadataKeys.MetadataFsNodeChildren(pid), id).Err()
if err != nil {
Expand All @@ -300,12 +372,14 @@ func (m *BlobCacheMetadata) RemoveFsNodeChild(ctx context.Context, id string) er
// Metadata key storage format
var (
metadataPrefix string = "blobcache"
metadataHostIndex string = "blobcache:host_index"
metadataEntry string = "blobcache:entry:%s"
metadataClientLock string = "blobcache:client_lock:%s:%s"
metadataLocation string = "blobcache:location:%s"
metadataRef string = "blobcache:ref:%s"
metadataFsNode string = "blobcache:fs:node:%s"
metadataFsNodeChildren string = "blobcache:fs:node:%s:children"
metadataHostKeepAlive string = "blobcache:host:keepalive:%s"
)

// Metadata keys
Expand All @@ -317,6 +391,14 @@ func (k *metadataKeys) MetadataEntry(hash string) string {
return fmt.Sprintf(metadataEntry, hash)
}

func (k *metadataKeys) MetadataHostIndex() string {
return metadataHostIndex
}

func (k *metadataKeys) MetadataHostKeepAlive(addr string) string {
return fmt.Sprintf(metadataHostKeepAlive, addr)
}

func (k *metadataKeys) MetadataLocation(hash string) string {
return fmt.Sprintf(metadataLocation, hash)
}
Expand Down
Loading