-
Notifications
You must be signed in to change notification settings - Fork 4
Add storage sqlserver #591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some initial comments, will continue checking
{ | ||
internal abstract class BaseSqlRepository : IDisposable | ||
{ | ||
public Dictionary<long, HashSet<ManagedStreamPage>> ManagedPages { get; private set; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont fully understand why this needs to have a hashset?
Cant it keep only the latest version information?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we could probably remove that, it's kinda a remnant from the previous solution and I was a bit unsure if we wanted to keep track of previous version here. But considering how delete etc. removes old rows we can just keep the latest one.
public virtual void AddStreamPage(long key, byte[] value) | ||
{ | ||
var page = new StreamPage(ToPageKey(key, Stream.Metadata.CurrentVersion), Stream.Metadata.StreamKey, key, value, Stream.Metadata.CurrentVersion); | ||
UnpersistedPages.Add(page); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if a check is required here or not if UnpersistedPages reaches a certain value, say 32mb or similar of data and it would force saving at that point.
Say an operator loads in 1 billion+ rows but the stream is set to use low amount of memory, this could cause the stream to go over that memory limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the "session"/streamPageRepository the unpersited pages gets written if the WritePagesBulkLimit setting is reached. So it should be similar in functionality, just that it writes based of number of items rather than size. We could maybe use "size" for the setting instead?
In the dictionary for "managed pages" should we have some similar "rule" ie clear some items after a certain point or do we feel like that can be kept "forever"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not add it to the base method since the "storage" repository does not have that functionality that it writes after a certain number of items. Reasoning was that it will be a low number of pages added through that repository.
await session.Write(pageId, Encoding.UTF8.GetBytes(pageId.ToString())); | ||
await session.Commit(); | ||
await session.Delete(pageId); | ||
var storedPage = await session.Read(pageId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
storedPage should be null here, since it was deleted, but should not be null if it was restored from previous checkpoint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this test case, since it tested a case that shouldn't exist.
for (int i = 1; i <= numberOfPages; i++) | ||
{ | ||
var page = await session.Read(i); | ||
Assert.NotEmpty(page); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe an assert for content aswell to make sure data returned is correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added, also added two (one for storage and one for session) explicit tests for asserting written data is returned.
94e2476
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added, also added two (one for storage and one for session) explicit tests for asserting written data is returned on read.
await session.Commit(); | ||
|
||
await storage.CheckpointAsync([], false); // new version | ||
await session.Write(pageId, Encoding.UTF8.GetBytes("1")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe change from "1" to "2" and make sure it is reverted back to "1" in this test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assert has been added in 7062770.
…ed pages outside the bg task
…e last successful version of the stream
…is removed newly inserted row versions (since the stream is not updated yet)
…ize removed unsuccessful rows
…re occurs. Also changed the managed pages to only hold a single page version
…d only delete the lowest version from the parent select
…then old versions
{ | ||
await WaitForBackgroundTasks(); | ||
await SaveStreamPagesAsync(); | ||
// todo: do we need to delete old versions etc. here as well (or only on checkpoint)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only delete after the checkpoint or at the same time as a checkpoint (in a transaction)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Benchmark
Benchmark suite | Current: a738ff6 | Previous: 7db1083 | Ratio |
---|---|---|---|
FlowtideDotNet.Benchmarks.Stream.StreamBenchmark.InnerJoin |
525948410 ns (± 12232438.308807356 ) |
588638700 ns (± 9231369.002121696 ) |
0.89 |
FlowtideDotNet.Benchmarks.Stream.StreamBenchmark.LeftJoin |
628923520 ns (± 30603702.457230017 ) |
632393477.7777778 ns (± 23156158.76418074 ) |
0.99 |
FlowtideDotNet.Benchmarks.Stream.StreamBenchmark.ProjectionAndNormalization |
194127860 ns (± 19741954.10541835 ) |
171268600 ns (± 6644227.006958748 ) |
1.13 |
FlowtideDotNet.Benchmarks.Stream.StreamBenchmark.SumAggregation |
222199900 ns (± 8188662.915953441 ) |
190822830 ns (± 6415026.852121856 ) |
1.16 |
FlowtideDotNet.Benchmarks.Stream.StreamBenchmark.ListAggWithMapAggregation |
2141062200 ns (± 132086665.61217383 ) |
2656394200 ns (± 136074715.58524996 ) |
0.81 |
This comment was automatically generated by workflow using github-action-benchmark.
…onexisting), updated and added tests
f0df34c
to
f9edd55
Compare
…called for session that calls write
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks g!ood Thank you for the pull request!
No description provided.