Skip to content

Commit 582c75c

Browse files
UlimoCopilot
andauthored
Add queue structure to state management for operators (#743)
* Add queue structure to state management for operators This allows doing FIFO operations on data that is too large for RAM. * Add pop method * Update src/FlowtideDotNet.Storage/DataStructures/PrimitiveListKeyContainerSerializer.cs Co-authored-by: Copilot <[email protected]> --------- Co-authored-by: Copilot <[email protected]>
1 parent ef904c8 commit 582c75c

12 files changed

+1059
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License")
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
using FlowtideDotNet.Storage.Memory;
14+
using FlowtideDotNet.Storage.Tree;
15+
using System;
16+
using System.Collections.Generic;
17+
using System.Linq;
18+
using System.Text;
19+
using System.Threading.Tasks;
20+
21+
namespace FlowtideDotNet.Storage.DataStructures
22+
{
23+
internal class PrimitiveListKeyContainer<K> : IKeyContainer<K>
24+
where K : unmanaged
25+
{
26+
internal PrimitiveList<K> _list;
27+
28+
internal PrimitiveListKeyContainer(IMemoryAllocator memoryAllocator)
29+
{
30+
_list = new PrimitiveList<K>(memoryAllocator);
31+
}
32+
33+
internal PrimitiveListKeyContainer(PrimitiveList<K> list)
34+
{
35+
_list = list;
36+
}
37+
38+
39+
public int Count => _list.Count;
40+
41+
public void Add(K key)
42+
{
43+
_list.Add(key);
44+
}
45+
46+
public void AddRangeFrom(IKeyContainer<K> container, int start, int count)
47+
{
48+
if (container is PrimitiveListKeyContainer<K> other)
49+
{
50+
_list.AddRangeFrom(other._list, start, count);
51+
}
52+
else
53+
{
54+
throw new NotImplementedException();
55+
}
56+
}
57+
58+
public int BinarySearch(K key, IComparer<K> comparer)
59+
{
60+
int lo = 0;
61+
int hi = _list.Count - 1;
62+
63+
while (lo <= hi)
64+
{
65+
int i = lo + ((hi - lo) >> 1);
66+
67+
int c = comparer.Compare(_list.Get(i), key);
68+
if (c == 0)
69+
{
70+
return lo;
71+
}
72+
else if (c < 0)
73+
{
74+
lo = i + 1;
75+
}
76+
else
77+
{
78+
hi = i - 1;
79+
}
80+
}
81+
return ~lo;
82+
}
83+
84+
public void Dispose()
85+
{
86+
_list.Dispose();
87+
}
88+
89+
public K Get(in int index)
90+
{
91+
return _list[index];
92+
}
93+
94+
public int GetByteSize()
95+
{
96+
return _list.SlicedMemory.Length;
97+
}
98+
99+
public unsafe int GetByteSize(int start, int end)
100+
{
101+
return (end - start + 1) * sizeof(K);
102+
}
103+
104+
public void Insert(int index, K key)
105+
{
106+
_list.InsertAt(index, key);
107+
}
108+
109+
public void Insert_Internal(int index, K key)
110+
{
111+
_list.InsertAt(index, key);
112+
}
113+
114+
public void RemoveAt(int index)
115+
{
116+
_list.RemoveAt(index);
117+
}
118+
119+
public void RemoveRange(int start, int count)
120+
{
121+
_list.RemoveRange(start, count);
122+
}
123+
124+
public void Update(int index, K key)
125+
{
126+
_list.Update(index, key);
127+
}
128+
}
129+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License")
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
using FlowtideDotNet.Storage.Memory;
14+
using FlowtideDotNet.Storage.Tree;
15+
using System;
16+
using System.Buffers;
17+
using System.Buffers.Binary;
18+
using System.Collections.Generic;
19+
using System.Linq;
20+
using System.Text;
21+
using System.Threading.Tasks;
22+
23+
namespace FlowtideDotNet.Storage.DataStructures
24+
{
25+
internal class PrimitiveListKeyContainerSerializer<K> : IBPlusTreeKeySerializer<K, PrimitiveListKeyContainer<K>>
26+
where K : unmanaged
27+
{
28+
private readonly IMemoryAllocator _memoryAllocator;
29+
30+
public PrimitiveListKeyContainerSerializer(IMemoryAllocator memoryAllocator)
31+
{
32+
this._memoryAllocator = memoryAllocator;
33+
}
34+
35+
public Task CheckpointAsync(IBPlusTreeSerializerCheckpointContext context)
36+
{
37+
return Task.CompletedTask;
38+
}
39+
40+
public PrimitiveListKeyContainer<K> CreateEmpty()
41+
{
42+
return new PrimitiveListKeyContainer<K>(_memoryAllocator);
43+
}
44+
45+
public unsafe PrimitiveListKeyContainer<K> Deserialize(ref SequenceReader<byte> reader)
46+
{
47+
if (!reader.TryReadLittleEndian(out int count))
48+
{
49+
throw new InvalidOperationException("Failed to read count");
50+
}
51+
var nativeMemory = _memoryAllocator.Allocate(count, 64);
52+
53+
if (!reader.TryCopyTo(nativeMemory.Memory.Span.Slice(0, count)))
54+
{
55+
throw new InvalidOperationException("Failed to read bytes");
56+
}
57+
reader.Advance(count);
58+
return new PrimitiveListKeyContainer<K>(new PrimitiveList<K>(nativeMemory, count / sizeof(K), _memoryAllocator));
59+
}
60+
61+
public Task InitializeAsync(IBPlusTreeSerializerInitializeContext context)
62+
{
63+
return Task.CompletedTask;
64+
}
65+
66+
public void Serialize(in IBufferWriter<byte> writer, in PrimitiveListKeyContainer<K> values)
67+
{
68+
var mem = values._list.SlicedMemory;
69+
var headerSpan = writer.GetSpan(4);
70+
BinaryPrimitives.WriteInt32LittleEndian(headerSpan, mem.Length);
71+
writer.Advance(4);
72+
writer.Write(mem.Span);
73+
}
74+
}
75+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License")
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
using FlowtideDotNet.Storage.Memory;
14+
using FlowtideDotNet.Storage.Tree;
15+
using System;
16+
using System.Collections.Generic;
17+
using System.Linq;
18+
using System.Text;
19+
using System.Threading.Tasks;
20+
21+
namespace FlowtideDotNet.Storage.Queue
22+
{
23+
public class FlowtideQueueOptions<V, TValueContainer>
24+
where TValueContainer : IValueContainer<V>
25+
{
26+
/// <summary>
27+
/// Serializer for values
28+
/// </summary>
29+
public required IBplusTreeValueSerializer<V, TValueContainer> ValueSerializer { get; set; }
30+
31+
public required IMemoryAllocator MemoryAllocator { get; set; }
32+
33+
public int? PageSizeBytes { get; set; }
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License")
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
using FlowtideDotNet.Storage.Tree;
14+
using System;
15+
using System.Collections.Generic;
16+
using System.Linq;
17+
using System.Text;
18+
using System.Threading.Tasks;
19+
20+
namespace FlowtideDotNet.Storage.Queue
21+
{
22+
public interface IFlowtideQueue<V, TValueContainer>
23+
where TValueContainer : IValueContainer<V>
24+
{
25+
ValueTask Enqueue(in V value);
26+
27+
ValueTask<V> Dequeue();
28+
29+
ValueTask<V> Pop();
30+
31+
ValueTask Commit();
32+
33+
ValueTask Clear();
34+
35+
long Count { get; }
36+
}
37+
}

0 commit comments

Comments
 (0)