Skip to content

[ENH]: set up rendezvous hashing for collection -> garbage collector node mapping #4113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ k8s_resource(
'memberlists.chroma.cluster:CustomResourceDefinition',
'query-service-memberlist:MemberList',
'compaction-service-memberlist:MemberList',
'garbage-collection-service-memberlist:MemberList',

'sysdb-serviceaccount:serviceaccount',
'sysdb-serviceaccount-rolebinding:RoleBinding',
Expand Down Expand Up @@ -245,6 +246,6 @@ k8s_resource('jaeger', resource_deps=['k8s_setup'], labels=["observability"])
k8s_resource('grafana', resource_deps=['k8s_setup'], labels=["observability"])
k8s_resource('prometheus', resource_deps=['k8s_setup'], labels=["observability"])
k8s_resource('otel-collector', resource_deps=['k8s_setup'], labels=["observability"])
k8s_resource('garbage-collector', resource_deps=['k8s_setup'], labels=["chroma"])
k8s_resource('garbage-collector', resource_deps=['k8s_setup', 'minio-deployment'], labels=["chroma"])
# Local S3
k8s_resource('minio-deployment', resource_deps=['k8s_setup'], labels=["debug"], port_forwards=['9000:9000', '9005:9005'])
4 changes: 4 additions & 0 deletions go/cmd/coordinator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func init() {
Cmd.Flags().StringVar(&conf.CompactionServiceMemberlistName, "compaction-memberlist-name", "compaction-service-memberlist", "Compaction memberlist name")
Cmd.Flags().StringVar(&conf.CompactionServicePodLabel, "compaction-pod-label", "compaction-service", "Compaction pod label")

// Garbage collection service Memberlist
Cmd.Flags().StringVar(&conf.GarbageCollectionServiceMemberlistName, "garbage-collection-memberlist-name", "garbage-collection-service-memberlist", "Garbage collection memberlist name")
Cmd.Flags().StringVar(&conf.GarbageCollectionServicePodLabel, "garbage-collection-pod-label", "garbage-collection-service", "Garbage collection pod label")

// S3 config
Cmd.Flags().BoolVar(&conf.MetaStoreConfig.CreateBucketIfNotExists, "create-bucket-if-not-exists", false, "Create bucket if not exists")
Cmd.Flags().StringVar(&conf.MetaStoreConfig.BucketName, "bucket-name", "chroma-storage", "Bucket name")
Expand Down
13 changes: 13 additions & 0 deletions go/pkg/sysdb/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type Config struct {
CompactionServiceMemberlistName string
CompactionServicePodLabel string

// Garbage collection service memberlist config
GarbageCollectionServiceMemberlistName string
GarbageCollectionServicePodLabel string

// Config for soft deletes.
SoftDeleteEnabled bool
SoftDeleteCleanupInterval time.Duration
Expand Down Expand Up @@ -130,6 +134,9 @@ func NewWithGrpcProvider(config Config, provider grpcutils.GrpcProvider) (*Serve
return nil, err
}

// Create memberlist manager for garbage collection service
garbageCollectionMemberlistManager, err := createMemberlistManager(namespace, config.GarbageCollectionServiceMemberlistName, config.GarbageCollectionServicePodLabel, config.WatchInterval, config.ReconcileInterval, config.ReconcileCount)

// Start the memberlist manager for query service
err = queryMemberlistManager.Start()
if err != nil {
Expand All @@ -141,6 +148,12 @@ func NewWithGrpcProvider(config Config, provider grpcutils.GrpcProvider) (*Serve
return nil, err
}

// Start the memberlist manager for garbage collection service
err = garbageCollectionMemberlistManager.Start()
if err != nil {
return nil, err
}

log.Info("Starting soft delete cleaner", zap.Duration("cleanup_interval", s.softDeleteCleaner.cleanupInterval), zap.Duration("max_age", s.softDeleteCleaner.maxAge), zap.Uint("limit_per_check", s.softDeleteCleaner.limitPerCheck))
s.softDeleteCleaner.Start()

Expand Down
2 changes: 1 addition & 1 deletion k8s/distributed-chroma/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ apiVersion: v2
name: distributed-chroma
description: A helm chart for distributed Chroma
type: application
version: 0.1.35
version: 0.1.36
appVersion: "0.4.24"
keywords:
- chroma
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
apiVersion: chroma.cluster/v1
kind: MemberList
metadata:
name: garbage-collection-service-memberlist
namespace: {{ .Values.namespace }}
spec:
members:

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: garbage-collection-service-memberlist-readerwriter
rules:
- apiGroups:
- chroma.cluster
resources:
- memberlists
verbs:
- get
- list
- watch
# TODO: FIX THIS LEAKY PERMISSION
Copy link
Contributor

@Sicheng-Pan Sicheng-Pan Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this planned for future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be honest, I'm not sure exactly what this is referencing
it was from the memberlist template that I copied this from

- create
- update
- patch
- delete

---

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: sysdb-garbage-collection-service-memberlist-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: garbage-collection-service-memberlist-readerwriter
subjects:
- kind: ServiceAccount
name: sysdb-serviceaccount
namespace: {{ .Values.namespace }}

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: garbage-collection-service-garbage-collection-service-memberlist-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: garbage-collection-service-memberlist-readerwriter
subjects:
- kind: ServiceAccount
name: garbage-collector-serviceaccount
namespace: {{ .Values.namespace }}

---

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: garbage-collection-service-memberlist-readerwriter-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: garbage-collection-service-memberlist-readerwriter
subjects:
- kind: ServiceAccount
name: default
namespace: {{ .Values.namespace }}
7 changes: 6 additions & 1 deletion k8s/distributed-chroma/templates/garbage-collector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ data:
{{ end }}

apiVersion: apps/v1
kind: Deployment
kind: StatefulSet
metadata:
name: garbage-collector
namespace: {{ .Values.namespace }}
Expand All @@ -27,6 +27,7 @@ spec:
metadata:
labels:
app: garbage-collector
member-type: garbage-collection-service
spec:
serviceAccountName: garbage-collector-serviceaccount
volumes:
Expand Down Expand Up @@ -58,6 +59,10 @@ spec:
- name: CONFIG_PATH
value: /config/config.yaml
{{ end }}
- name: CHROMA_GC_MY_MEMBER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
{{ range .Values.garbageCollector.env }}
- name: {{ .name }}
{{ .value | nindent 12 }}
Expand Down
22 changes: 11 additions & 11 deletions k8s/distributed-chroma/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,11 @@ garbageCollector:
configuration: |
service_name: "garbage-collector"
otel_endpoint: "http://otel-collector:4317"
relative_cutoff_time_seconds: 43200 # GC all versions created at time < now() - relative_cutoff_time_seconds (12 hours)
relative_cutoff_time_seconds: 60 # GC all versions created at time < now() - relative_cutoff_time_seconds (1 minute)
max_collections_to_gc: 1000
gc_interval_mins: 120
gc_interval_mins: 1
disallow_collections: []
default_mode: "delete"
sysdb_config:
host: "sysdb.chroma"
port: 50051
Expand All @@ -156,12 +157,11 @@ garbageCollector:
storage_config:
s3:
bucket: "chroma-storage"
# object_store:
# bucket:
# name: "chroma-storage"
# type: "minio"
# credentials: "Minio"
# connect_timeout_ms: 5000
# request_timeout_ms: 30000
# upload_part_size_bytes: 536870912
# download_part_size_bytes: 8388608
assignment_policy:
rendezvous_hashing:
hasher: Murmur3
memberlist_provider:
custom_resource:
kube_namespace: "chroma"
memberlist_name: "garbage-collection-service-memberlist"
queue_size: 100
1 change: 1 addition & 0 deletions rust/garbage_collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ chroma-blockstore = { workspace = true }
chroma-cache = { workspace = true }
chroma-index = { workspace = true }
chroma-segment = { workspace = true }
chroma-memberlist = { workspace = true }
chroma-tracing = { workspace = true }

[dev-dependencies]
Expand Down
9 changes: 9 additions & 0 deletions rust/garbage_collector/garbage_collector_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,12 @@ storage_config:
request_timeout_ms: 30000 # 1 minute
upload_part_size_bytes: 536870912 # 512MiB
download_part_size_bytes: 8388608 # 8MiB
assignment_policy:
rendezvous_hashing:
hasher: Murmur3
my_member_id: "garbage-collector-0"
memberlist_provider:
custom_resource:
kube_namespace: "chroma"
memberlist_name: "garbage-collection-service-memberlist"
queue_size: 100
10 changes: 7 additions & 3 deletions rust/garbage_collector/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub(super) struct GarbageCollectorConfig {
pub(super) default_mode: CleanupMode,
#[serde(default)]
pub(super) tenant_mode_overrides: Option<HashMap<String, CleanupMode>>,
pub(super) assignment_policy: chroma_config::assignment::config::AssignmentPolicyConfig,
pub(super) memberlist_provider: chroma_memberlist::config::MemberlistProviderConfig,
pub my_member_id: String,
}

impl GarbageCollectorConfig {
Expand All @@ -45,9 +48,10 @@ impl GarbageCollectorConfig {
pub(super) fn load_from_path(path: &str) -> Self {
// Unfortunately, figment doesn't support environment variables with underscores. So we have to map and replace them.
// Excluding our own environment variables, which are prefixed with CHROMA_.
let mut f = figment::Figment::from(
Env::prefixed("CHROMA_GC_").map(|k| k.as_str().replace("__", ".").into()),
);
let mut f = figment::Figment::from(Env::prefixed("CHROMA_GC_").map(|k| match k {
k if k == "my_member_id" => k.into(),
k => k.as_str().replace("__", ".").into(),
}));
if std::path::Path::new(path).exists() {
f = figment::Figment::from(Yaml::file(path)).merge(f);
}
Expand Down
Loading
Loading