Skip to content

Commit 8031bb7

Browse files
authored
[CHORE] Enable rust log service, set timeouts, change tilt. (#4338)
## Description of changes - Change timeouts in tilt to 60s - Change deadline for proptest to 90s. - Reenable use_alt_host_for_everything in tilt. - Template with the right file consistently. ## Test plan - [X] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes N/A
1 parent e174d32 commit 8031bb7

File tree

13 files changed

+54
-23
lines changed

13 files changed

+54
-23
lines changed

Tiltfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ k8s_yaml(
166166
# We manually call helm template so we can call set-file
167167
k8s_yaml(
168168
local(
169-
'helm template --set-file rustFrontendService.configuration=rust/frontend/sample_configs/distributed.yaml,rustLogService.configuration=rust/worker/tilt_config.yaml,compaction_service.configuration=rust/worker/tilt_config.yaml,query_service.configuration=rust/worker/tilt_config.yaml --values k8s/distributed-chroma/values.yaml,k8s/distributed-chroma/values.dev.yaml k8s/distributed-chroma'
169+
'helm template --set-file rustFrontendService.configuration=rust/frontend/sample_configs/tilt_config.yaml,rustLogService.configuration=rust/worker/tilt_config.yaml,compaction_service.configuration=rust/worker/tilt_config.yaml,query_service.configuration=rust/worker/tilt_config.yaml --values k8s/distributed-chroma/values.yaml,k8s/distributed-chroma/values.dev.yaml k8s/distributed-chroma'
170170
),
171171
)
172172
watch_file('rust/frontend/sample_configs/distributed.yaml')

chromadb/test/conftest.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656

5757
hypothesis.settings.register_profile(
5858
"base",
59-
deadline=45000,
59+
deadline=90000,
6060
suppress_health_check=[
6161
hypothesis.HealthCheck.data_too_large,
6262
hypothesis.HealthCheck.large_base_example,

chromadb/test/property/invariants.py

+2
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ def count(collection: Collection, record_set: RecordSet) -> None:
108108
"""The given collection count is equal to the number of embeddings"""
109109
count = collection.count()
110110
normalized_record_set = wrap_all(record_set)
111+
if count != len(normalized_record_set["ids"]):
112+
print('count mismatch:', count, '=!', len(normalized_record_set["ids"]))
111113
assert count == len(normalized_record_set["ids"])
112114

113115

chromadb/test/property/test_embeddings.py

+10-6
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@
4747
traces: DefaultDict[str, int] = defaultdict(lambda: 0)
4848

4949

50+
VERSION_INCREASE_WAIT_TIME = 300
51+
52+
5053
def trace(key: str) -> None:
5154
global traces
5255
traces[key] += 1
@@ -375,7 +378,8 @@ def wait_for_compaction(self) -> None:
375378
current_version,
376379
)
377380
new_version = wait_for_version_increase(
378-
self.client, self.collection.name, current_version, additional_time=240
381+
self.client, self.collection.name, current_version,
382+
additional_time=VERSION_INCREASE_WAIT_TIME
379383
)
380384
# Everything got compacted.
381385
self.log_operation_count = 0
@@ -1420,7 +1424,7 @@ def test_no_op_compaction(client: ClientAPI) -> None:
14201424
coll.delete(ids=[str(i) for i in range(batch, batch + 100)])
14211425
if not NOT_CLUSTER_ONLY:
14221426
wait_for_version_increase(
1423-
client, coll.name, get_collection_version(client, coll.name), 240
1427+
client, coll.name, get_collection_version(client, coll.name), VERSION_INCREASE_WAIT_TIME
14241428
)
14251429

14261430

@@ -1439,7 +1443,7 @@ def test_add_then_purge(client: ClientAPI) -> None:
14391443
)
14401444
if not NOT_CLUSTER_ONLY:
14411445
wait_for_version_increase(
1442-
client, coll.name, get_collection_version(client, coll.name), 240
1446+
client, coll.name, get_collection_version(client, coll.name), VERSION_INCREASE_WAIT_TIME
14431447
)
14441448

14451449
# Purge records and wait for compaction
@@ -1449,7 +1453,7 @@ def test_add_then_purge(client: ClientAPI) -> None:
14491453
coll.delete(ids=record_ids)
14501454
if not NOT_CLUSTER_ONLY:
14511455
wait_for_version_increase(
1452-
client, coll.name, get_collection_version(client, coll.name), 240
1456+
client, coll.name, get_collection_version(client, coll.name), VERSION_INCREASE_WAIT_TIME
14531457
)
14541458

14551459
# There should be no records left
@@ -1472,7 +1476,7 @@ def test_encompassing_delete(client: ClientAPI) -> None:
14721476

14731477
if not NOT_CLUSTER_ONLY:
14741478
wait_for_version_increase(
1475-
client, col.name, get_collection_version(client, col.name), 240
1479+
client, col.name, get_collection_version(client, col.name), VERSION_INCREASE_WAIT_TIME
14761480
)
14771481

14781482
# Add and then delete and then add 16
@@ -1486,7 +1490,7 @@ def test_encompassing_delete(client: ClientAPI) -> None:
14861490

14871491
if not NOT_CLUSTER_ONLY:
14881492
wait_for_version_increase(
1489-
client, col.name, get_collection_version(client, col.name), 240
1493+
client, col.name, get_collection_version(client, col.name), VERSION_INCREASE_WAIT_TIME
14901494
)
14911495

14921496
# Ensure we can get all

rust/frontend/sample_configs/tilt_config.yaml

+3-3
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,16 @@ log:
2323
host: "logservice.chroma"
2424
port: 50051
2525
connect_timeout_ms: 5000
26-
request_timeout_ms: 5000
26+
request_timeout_ms: 60000 # 1 minute
2727
alt_host: "rust-log-service.chroma"
28-
#use_alt_host_for_everything: true
28+
use_alt_host_for_everything: true
2929

3030
executor:
3131
distributed:
3232
connections_per_node: 5
3333
replication_factor: 2
3434
connect_timeout_ms: 5000
35-
request_timeout_ms: 5000
35+
request_timeout_ms: 60000 # 1 minute
3636
assignment:
3737
rendezvous_hashing:
3838
hasher: Murmur3

rust/frontend/src/config.rs

+5
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ impl FrontendServerConfig {
160160
}
161161

162162
pub fn load_from_path(path: &str) -> Self {
163+
// SAFETY(rescrv): If we cannot read the config, we panic anyway.
164+
eprintln!(
165+
"==========\n{}\n==========\n",
166+
std::fs::read_to_string(path).unwrap()
167+
);
163168
// Unfortunately, figment doesn't support environment variables with underscores. So we have to map and replace them.
164169
// Excluding our own environment variables, which are prefixed with CHROMA_.
165170
let mut f = figment::Figment::from(

rust/garbage_collector/src/helper.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ impl ChromaGrpcClients {
2424
let sysdb_channel = Channel::from_static("http://localhost:50051")
2525
.connect()
2626
.await?;
27-
let logservice_channel = Channel::from_static("http://localhost:50052")
27+
let logservice_channel = Channel::from_static("http://localhost:50054")
2828
.connect()
2929
.await?;
3030
let queryservice_channel = Channel::from_static("http://localhost:50053")

rust/log-service/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,7 @@ impl LogService for LogServer {
709709
);
710710
let limits = Limits {
711711
max_files: Some(pull_logs.batch_size as u64 + 1),
712-
max_bytes: Some(pull_logs.batch_size as u64 * 100_000),
712+
max_bytes: None,
713713
};
714714
let fragments = match log_reader
715715
.scan(

rust/wal3/src/reader.rs

+10-1
Original file line numberDiff line numberDiff line change
@@ -131,14 +131,23 @@ impl LogReader {
131131
}
132132
fragments.retain(|f| f.limit > from);
133133
fragments.sort_by_key(|f| f.start.offset());
134-
fragments.truncate(limits.max_files.unwrap_or(u64::MAX) as usize);
134+
if let Some(max_files) = limits.max_files {
135+
if fragments.len() as u64 > max_files {
136+
tracing::info!("truncating to {} files from {}", max_files, fragments.len());
137+
fragments.truncate(max_files as usize);
138+
}
139+
}
135140
while fragments.len() > 1
136141
&& fragments
137142
.iter()
138143
.map(|f| f.num_bytes)
139144
.fold(0, u64::saturating_add)
140145
> limits.max_bytes.unwrap_or(u64::MAX)
141146
{
147+
tracing::info!(
148+
"truncating to {} files because bytes restrictions",
149+
fragments.len() - 1
150+
);
142151
fragments.pop();
143152
}
144153
Ok(fragments)

rust/worker/src/config.rs

+5
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ impl RootConfig {
6161
pub fn load_from_path(path: &str) -> Self {
6262
// Unfortunately, figment doesn't support environment variables with underscores. So we have to map and replace them.
6363
// Excluding our own environment variables, which are prefixed with CHROMA_.
64+
eprintln!("loading config from {path}");
65+
eprintln!(
66+
"{}",
67+
std::fs::read_to_string(path).unwrap_or("<ERROR>".to_string())
68+
);
6469
let mut f = figment::Figment::from(Env::prefixed("CHROMA_").map(|k| match k {
6570
k if k == "my_member_id" => k.into(),
6671
k => k.as_str().replace("__", ".").into(),

rust/worker/src/execution/operators/count_records.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl Operator<CountRecordsInput, CountRecordsOutput> for CountRecordsOperator {
8181
Err(e) => {
8282
match *e {
8383
RecordSegmentReaderCreationError::UninitializedSegment => {
84-
tracing::info!("[CountQueryOrchestrator] Record segment is uninitialized");
84+
tracing::info!("[CountQueryOrchestrator] Record segment is uninitialized; using {} records from log", input.log_records.len());
8585
// This means there no compaction has occured.
8686
// So we can just traverse the log records
8787
// and count the number of records.

rust/worker/src/lib.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,14 @@ const CONFIG_PATH_ENV_VAR: &str = "CONFIG_PATH";
2222
pub async fn query_service_entrypoint() {
2323
// Check if the config path is set in the env var
2424
let config = match std::env::var(CONFIG_PATH_ENV_VAR) {
25-
Ok(config_path) => config::RootConfig::load_from_path(&config_path),
26-
Err(_) => config::RootConfig::load(),
25+
Ok(config_path) => {
26+
eprintln!("loading from {config_path}");
27+
config::RootConfig::load_from_path(&config_path)
28+
}
29+
Err(err) => {
30+
eprintln!("loading from default path because {err}");
31+
config::RootConfig::load()
32+
}
2733
};
2834

2935
let config = config.query_service;

rust/worker/tilt_config.yaml

+6-6
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ query_service:
4040
host: "logservice.chroma"
4141
port: 50051
4242
connect_timeout_ms: 5000
43-
request_timeout_ms: 5000
43+
request_timeout_ms: 60000 # 1 minute
4444
alt_host: "rust-log-service.chroma"
45-
#use_alt_host_for_everything: true
45+
use_alt_host_for_everything: true
4646
dispatcher:
4747
num_worker_threads: 4
4848
dispatcher_queue_size: 100
@@ -78,7 +78,7 @@ query_service:
7878
weighted_lru:
7979
capacity: 8589934592 # 8GB
8080
permitted_parallelism: 180
81-
fetch_log_batch_size: 4000
81+
fetch_log_batch_size: 1000
8282

8383
compaction_service:
8484
service_name: "compaction-service"
@@ -117,9 +117,9 @@ compaction_service:
117117
host: "logservice.chroma"
118118
port: 50051
119119
connect_timeout_ms: 5000
120-
request_timeout_ms: 5000
120+
request_timeout_ms: 60000 # 1 minute
121121
alt_host: "rust-log-service.chroma"
122-
#use_alt_host_for_everything: true
122+
use_alt_host_for_everything: true
123123
dispatcher:
124124
num_worker_threads: 4
125125
dispatcher_queue_size: 100
@@ -134,7 +134,7 @@ compaction_service:
134134
max_compaction_size: 10000
135135
max_partition_size: 5000
136136
disabled_collections: [] # uuids to disable compaction for
137-
fetch_log_batch_size: 4000
137+
fetch_log_batch_size: 1000
138138
blockfile_provider:
139139
arrow:
140140
block_manager_config:

0 commit comments

Comments
 (0)