Skip to content

instrument with victoria metrics pusher #59

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/beam-cloud/blobcache-v2
go 1.22.10

require (
github.com/VictoriaMetrics/metrics v1.37.0
github.com/aws/aws-sdk-go-v2 v1.30.1
github.com/aws/aws-sdk-go-v2/config v1.27.24
github.com/aws/aws-sdk-go-v2/credentials v1.17.24
Expand Down Expand Up @@ -50,6 +51,8 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/stretchr/testify v1.9.0 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/histogram v1.2.0 // indirect
)

require (
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/VictoriaMetrics/metrics v1.37.0 h1:u5Yr+HFofQyn7kgmmkufgkX0nEA6G1oEyK2eaKsVaUM=
github.com/VictoriaMetrics/metrics v1.37.0/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8=
github.com/aws/aws-sdk-go-v2 v1.30.1 h1:4y/5Dvfrhd1MxRDD77SrfsDaj8kUkkljU7XE83NPV+o=
github.com/aws/aws-sdk-go-v2 v1.30.1/go.mod h1:nIQjQVp5sfpQcTc9mPSr1B0PaWK5ByX9MOoDadSN4lc=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs=
Expand Down Expand Up @@ -142,6 +144,10 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk=
github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk=
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
Expand Down
51 changes: 51 additions & 0 deletions pkg/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package blobcache

import (
"context"
"encoding/base64"
"fmt"
"time"

"github.com/VictoriaMetrics/metrics"
)

type BlobcacheMetrics struct {
DiskCacheUsageMB *metrics.Histogram
DiskCacheUsagePct *metrics.Histogram
MemCacheUsageMB *metrics.Histogram
MemCacheUsagePct *metrics.Histogram
}

func initMetrics(ctx context.Context, config BlobCacheMetricsConfig, currentHost *BlobCacheHost) BlobcacheMetrics {
username := config.Username
password := config.Password
credentials := base64.StdEncoding.EncodeToString([]byte(username + ":" + password))

opts := &metrics.PushOptions{
Headers: []string{
fmt.Sprintf("Authorization: Basic %s", credentials),
},
ExtraLabels: "host=" + currentHost.HostId,
}

pushURL := config.URL
interval := time.Duration(config.PushIntervalS) * time.Second
pushProcessMetrics := true

err := metrics.InitPushWithOptions(ctx, pushURL, interval, pushProcessMetrics, opts)
if err != nil {
Logger.Errorf("Failed to initialize metrics: %v", err)
}

diskCacheUsageMB := metrics.NewHistogram(`blobcache_disk_cache_usage_mb`)
diskCacheUsagePct := metrics.NewHistogram(`blobcache_disk_cache_usage_pct`)
memCacheUsageMB := metrics.NewHistogram(`blobcache_mem_cache_usage_mb`)
memCacheUsagePct := metrics.NewHistogram(`blobcache_mem_cache_usage_pct`)

return BlobcacheMetrics{
DiskCacheUsageMB: diskCacheUsageMB,
DiskCacheUsagePct: diskCacheUsagePct,
MemCacheUsageMB: memCacheUsageMB,
MemCacheUsagePct: memCacheUsagePct,
}
}
119 changes: 78 additions & 41 deletions pkg/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type ContentAddressableStorage struct {
diskCacheDir string
diskCachedUsageExceeded bool
mu sync.Mutex
metrics BlobcacheMetrics
}

func NewContentAddressableStorage(ctx context.Context, currentHost *BlobCacheHost, locality string, coordinator CoordinatorClient, config BlobCacheConfig) (*ContentAddressableStorage, error) {
Expand All @@ -48,15 +49,16 @@ func NewContentAddressableStorage(ctx context.Context, currentHost *BlobCacheHos
locality: locality,
diskCacheDir: config.Server.DiskCacheDir,
mu: sync.Mutex{},
metrics: initMetrics(ctx, config.Metrics, currentHost),
}

Logger.Infof("Disk cache directory located at: '%s'", cas.diskCacheDir)

availableMemoryMb := getAvailableMemoryMb()
maxCacheSizeMb := (availableMemoryMb * cas.serverConfig.MaxCachePct) / 100
_, totalMemoryMb := getMemoryMb()
maxCacheSizeMb := (totalMemoryMb * cas.serverConfig.MaxCachePct) / 100
maxCost := maxCacheSizeMb * 1e6

Logger.Infof("Total available memory: %dMB", availableMemoryMb)
Logger.Infof("Total available memory: %dMB", totalMemoryMb)
Logger.Infof("Max cache size: %dMB", maxCacheSizeMb)
Logger.Infof("Max cost: %d", maxCost)

Expand All @@ -75,22 +77,13 @@ func NewContentAddressableStorage(ctx context.Context, currentHost *BlobCacheHos
return nil, err
}

cas.cache = cache
cas.maxCacheSizeMb = maxCacheSizeMb

go cas.monitorDiskCacheUsage()

cas.cache = cache
cas.maxCacheSizeMb = maxCacheSizeMb
return cas, nil
}

func getAvailableMemoryMb() int64 {
v, err := mem.VirtualMemory()
if err != nil {
log.Fatalf("Unable to retrieve host memory info: %v", err)
}
return int64(v.Total / (1024 * 1024))
}

type cacheValue struct {
Hash string
Content []byte
Expand Down Expand Up @@ -286,39 +279,42 @@ func (cas *ContentAddressableStorage) Cleanup() {
cas.cache.Close()
}

func min(a, b int64) int64 {
if a < b {
return a
func (cas *ContentAddressableStorage) GetDiskCacheMetrics() (int64, int64, float64, error) {
var (
diskUsageMb int64
totalDiskSpaceMb int64
usagePercentage float64
err error
)

// Get current disk usage
diskUsageMb, err = getDiskUsageMb(cas.diskCacheDir)
if err != nil {
return 0, 0, 0, err
}
return b
}

func (cas *ContentAddressableStorage) monitorDiskCacheUsage() {
ticker := time.NewTicker(diskCacheUsageCheckInterval)
defer ticker.Stop()
// Get total disk capacity
totalDiskSpaceMb, err = getTotalDiskSpaceMb(cas.diskCacheDir)
if err != nil {
return 0, 0, 0, err
}

for {
select {
case <-cas.ctx.Done():
return
case <-ticker.C:
usage, err := getDiskUsageMb(cas.diskCacheDir)
if err == nil {
totalDiskSpace, err := getTotalDiskSpaceMb(cas.diskCacheDir)
if err == nil {
usagePct := float64(usage) / float64(totalDiskSpace)

Logger.Infof("Disk cache usage: %dMB / %dMB (%.2f%%)", usage, totalDiskSpace, usagePct*100)

cas.mu.Lock()
cas.diskCachedUsageExceeded = usagePct > cas.serverConfig.DiskCacheMaxUsagePct
cas.mu.Unlock()
}
}
}
// Calculate usage percentage
if totalDiskSpaceMb > 0 {
usagePercentage = (float64(diskUsageMb) / float64(totalDiskSpaceMb)) * 100
} else {
usagePercentage = 0
}

return diskUsageMb, totalDiskSpaceMb, usagePercentage, nil
}

func min(a, b int64) int64 {
if a < b {
return a
}
return b
}
func getDiskUsageMb(path string) (int64, error) {
var totalUsage int64 = 0
err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
Expand All @@ -344,3 +340,44 @@ func getTotalDiskSpaceMb(path string) (int64, error) {
}
return int64(stat.Blocks) * int64(stat.Bsize) / (1024 * 1024), nil
}

func getMemoryMb() (int64, int64) {
v, err := mem.VirtualMemory()
if err != nil {
log.Fatalf("Unable to retrieve host memory info: %v", err)
}
return int64(v.Available / (1024 * 1024)), int64(v.Total / (1024 * 1024))
}

func (cas *ContentAddressableStorage) monitorDiskCacheUsage() {
ticker := time.NewTicker(diskCacheUsageCheckInterval)
defer ticker.Stop()

for {
select {
case <-cas.ctx.Done():
return
case <-ticker.C:
currentUsage, totalDiskSpace, usagePercentage, err := cas.GetDiskCacheMetrics()
if err != nil {
Logger.Errorf("Failed to fetch disk cache metrics: %v", err)
continue
}

availableMemoryMb, totalMemoryMb := getMemoryMb()
usedMemoryMb := totalMemoryMb - availableMemoryMb
cas.metrics.MemCacheUsageMB.Update(float64(usedMemoryMb))
cas.metrics.MemCacheUsagePct.Update(float64(usedMemoryMb) / float64(totalMemoryMb) * 100)
cas.metrics.DiskCacheUsageMB.Update(float64(currentUsage))
cas.metrics.DiskCacheUsagePct.Update(float64(usagePercentage))

Logger.Infof("Memory Cache Usage: %dMB / %dMB (%.2f%%)", availableMemoryMb, totalMemoryMb, float64(availableMemoryMb)/float64(totalMemoryMb)*100)
Logger.Infof("Disk Cache Usage: %dMB / %dMB (%.2f%%)", currentUsage, totalDiskSpace, usagePercentage)

// Update internal state for disk usage exceeded
cas.mu.Lock()
cas.diskCachedUsageExceeded = usagePercentage > cas.serverConfig.DiskCacheMaxUsagePct
cas.mu.Unlock()
}
}
}
14 changes: 11 additions & 3 deletions pkg/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,17 @@ const (
)

type BlobCacheConfig struct {
Server BlobCacheServerConfig `key:"server" json:"server"`
Client BlobCacheClientConfig `key:"client" json:"client"`
Global BlobCacheGlobalConfig `key:"global" json:"global"`
Server BlobCacheServerConfig `key:"server" json:"server"`
Client BlobCacheClientConfig `key:"client" json:"client"`
Global BlobCacheGlobalConfig `key:"global" json:"global"`
Metrics BlobCacheMetricsConfig `key:"metrics" json:"metrics"`
}

type BlobCacheMetricsConfig struct {
PushIntervalS int `key:"pushIntervalS" json:"push_interval_s"`
URL string `key:"url" json:"url"`
Username string `key:"username" json:"username"`
Password string `key:"password" json:"password"`
}

type BlobCacheGlobalConfig struct {
Expand Down