-
Notifications
You must be signed in to change notification settings - Fork 1.6k
[ENH] Add a scout-logs function to find the max log position. #4232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
381ad43
to
97af226
Compare
go/pkg/log/store/queries/queries.sql
Outdated
@@ -45,6 +45,13 @@ SELECT CAST(COALESCE(MIN(r.offset), 0) as bigint) AS min_offset, CAST(COALESCE(M | |||
FROM record_log r | |||
WHERE r.collection_id = $1; | |||
|
|||
-- name: GetBoundsForCollection :one | |||
SELECT | |||
COALESCE(record_compaction_offset_position, 0) AS record_compaction_offset_position, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm, COALESCE
will ensure that if the collection does not exist in the table then it returns 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll make it return the uncoalesced row instead. I thought "one" would enforce it.
go/pkg/log/repository/log.go
Outdated
var tx pgx.Tx | ||
tx, err = r.conn.BeginTx(ctx, pgx.TxOptions{}) | ||
if err != nil { | ||
trace_log.Error("Error in begin transaction for garbage collection", zap.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: error msg is wrong. This is not inside garbage_collection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a nit. Fixed.
go/pkg/log/repository/log.go
Outdated
} | ||
rollback = false; | ||
tx.Commit(ctx) | ||
start = bounds.RecordEnumerationOffsetPosition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I am misunderstanding something, shouldn't start be bounds.RecordsCompactionOffsetPosition
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
go/pkg/log/repository/log.go
Outdated
return | ||
} | ||
var totalUncompactedDepth int64 | ||
totalUncompactedDepth, err = queriesWithTx.GetTotalUncompactedRecordsCount(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this call? Isn't start = bounds.RecordsCompactionOffsetPosition
and limit=bounds.RecordEnumerationOffsetPosition
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can try that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried that and it fails predictably on a number of tests.
This PR makes a currently-not-used scout-logs function. It adds it to the go service, the rust log service over grpc, and the rust log client over grpc and in-memory.
705bdbf
to
97b6fdb
Compare
go/pkg/log/server/server.go
Outdated
var collectionID types.UniqueID | ||
collectionID, err = types.ToUniqueID(&req.CollectionId) | ||
if err != nil { | ||
// TODO HANDLE ERROR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: don't understand the todo
idl/chromadb/proto/logservice.proto
Outdated
@@ -14,6 +14,15 @@ message PushLogsResponse { | |||
int32 record_count = 1; | |||
} | |||
|
|||
message ScoutLogsRequest { | |||
string collection_id = 1; | |||
int64 start_from_offset = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start_from_offset
does not seem to be used anywhere
let start = start as i64; | ||
let sema = Arc::clone(&sema); | ||
async move { | ||
let _permit = sema.acquire().await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a few places where we have this pattern. Might be useful to have a util abstraction for this. Don't have to do it in this PR though
This PR makes a currently-not-used scout-logs function. It adds it to
the go service, the rust log service over grpc, and the rust log client
over grpc and in-memory.