Skip to content

Commit b5249dc

Browse files
committed
Add support for u128 datatype
1 parent 3fa90e7 commit b5249dc

24 files changed

+310
-5
lines changed

columnar/src/columnar/column_type.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub enum ColumnType {
2020
Bool = 5u8,
2121
IpAddr = 6u8,
2222
DateTime = 7u8,
23+
U128 = 8u8,
2324
}
2425

2526
impl fmt::Display for ColumnType {
@@ -33,6 +34,7 @@ impl fmt::Display for ColumnType {
3334
ColumnType::Bool => "bool",
3435
ColumnType::IpAddr => "ip",
3536
ColumnType::DateTime => "datetime",
37+
ColumnType::U128 => "u128",
3638
};
3739
write!(f, "{short_str}")
3840
}
@@ -83,7 +85,8 @@ impl ColumnType {
8385
| ColumnType::Str
8486
| ColumnType::Bool
8587
| ColumnType::IpAddr
86-
| ColumnType::DateTime => None,
88+
| ColumnType::DateTime
89+
| ColumnType::U128 => None,
8790
}
8891
}
8992
}

columnar/src/columnar/merge/mod.rs

+34-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub(crate) enum ColumnTypeCategory {
3838
Bool,
3939
IpAddr,
4040
DateTime,
41+
U128,
4142
}
4243

4344
impl From<ColumnType> for ColumnTypeCategory {
@@ -51,6 +52,7 @@ impl From<ColumnType> for ColumnTypeCategory {
5152
ColumnType::Bool => ColumnTypeCategory::Bool,
5253
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
5354
ColumnType::DateTime => ColumnTypeCategory::DateTime,
55+
ColumnType::U128 => ColumnTypeCategory::U128,
5456
}
5557
}
5658
}
@@ -123,7 +125,10 @@ fn dynamic_column_to_u64_monotonic(dynamic_column: DynamicColumn) -> Option<Colu
123125
DynamicColumn::U64(column) => Some(column.to_u64_monotonic()),
124126
DynamicColumn::F64(column) => Some(column.to_u64_monotonic()),
125127
DynamicColumn::DateTime(column) => Some(column.to_u64_monotonic()),
126-
DynamicColumn::IpAddr(_) | DynamicColumn::Bytes(_) | DynamicColumn::Str(_) => None,
128+
DynamicColumn::IpAddr(_)
129+
| DynamicColumn::Bytes(_)
130+
| DynamicColumn::Str(_)
131+
| DynamicColumn::U128(_) => None,
127132
}
128133
}
129134

@@ -193,6 +198,33 @@ fn merge_column(
193198

194199
serialize_column_mappable_to_u128(merged_column_index, &merge_column_values, wrt)?;
195200
}
201+
ColumnType::U128 => {
202+
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
203+
let mut column_values: Vec<Option<Arc<dyn ColumnValues<u128>>>> =
204+
Vec::with_capacity(columns_to_merge.len());
205+
for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() {
206+
if let Some(DynamicColumn::U128(Column { index: idx, values })) = dynamic_column_opt
207+
{
208+
column_indexes.push(idx);
209+
column_values.push(Some(values));
210+
} else {
211+
column_indexes.push(ColumnIndex::Empty {
212+
num_docs: num_docs_per_column[i],
213+
});
214+
column_values.push(None);
215+
}
216+
}
217+
218+
let merged_column_index =
219+
crate::column_index::merge_column_index(&column_indexes[..], merge_row_order);
220+
let merge_column_values = MergedColumnValues {
221+
column_indexes: &column_indexes[..],
222+
column_values: &column_values,
223+
merge_row_order,
224+
};
225+
226+
serialize_column_mappable_to_u128(merged_column_index, &merge_column_values, wrt)?;
227+
}
196228
ColumnType::Bytes | ColumnType::Str => {
197229
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
198230
let mut bytes_columns: Vec<Option<BytesColumn>> =
@@ -464,6 +496,7 @@ fn min_max_if_numerical(column: &DynamicColumn) -> Option<(NumericalValue, Numer
464496
DynamicColumn::F64(column) => Some((column.min_value().into(), column.max_value().into())),
465497
DynamicColumn::Bool(_)
466498
| DynamicColumn::IpAddr(_)
499+
| DynamicColumn::U128(_)
467500
| DynamicColumn::DateTime(_)
468501
| DynamicColumn::Bytes(_)
469502
| DynamicColumn::Str(_) => None,

columnar/src/columnar/writer/column_operation.rs

+12
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,18 @@ impl SymbolValue for Ipv6Addr {
155155
}
156156
}
157157

158+
impl SymbolValue for u128 {
159+
fn serialize(self, buffer: &mut [u8]) -> u8 {
160+
buffer[0..16].copy_from_slice(&self.to_be_bytes());
161+
16
162+
}
163+
164+
fn deserialize(bytes: &[u8]) -> Self {
165+
let octets: [u8; 16] = bytes[0..16].try_into().unwrap();
166+
u128::from_be_bytes(octets)
167+
}
168+
}
169+
158170
#[derive(Default)]
159171
struct MiniBuffer {
160172
pub bytes: [u8; 17],

columnar/src/columnar/writer/mod.rs

+56
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ struct SpareBuffers {
3131
value_index_builders: PreallocatedIndexBuilders,
3232
u64_values: Vec<u64>,
3333
ip_addr_values: Vec<Ipv6Addr>,
34+
u128_values: Vec<u128>,
3435
}
3536

3637
/// Makes it possible to create a new columnar.
@@ -52,6 +53,7 @@ pub struct ColumnarWriter {
5253
datetime_field_hash_map: ArenaHashMap,
5354
bool_field_hash_map: ArenaHashMap,
5455
ip_addr_field_hash_map: ArenaHashMap,
56+
u128_field_hash_map: ArenaHashMap,
5557
bytes_field_hash_map: ArenaHashMap,
5658
str_field_hash_map: ArenaHashMap,
5759
arena: MemoryArena,
@@ -145,6 +147,10 @@ impl ColumnarWriter {
145147
column_name.as_bytes(),
146148
|column_opt: Option<ColumnWriter>| column_opt.unwrap_or_default(),
147149
),
150+
ColumnType::U128 => self.u128_field_hash_map.mutate_or_create(
151+
column_name.as_bytes(),
152+
|column_opt: Option<ColumnWriter>| column_opt.unwrap_or_default(),
153+
),
148154
}
149155
}
150156

@@ -177,6 +183,18 @@ impl ColumnarWriter {
177183
);
178184
}
179185

186+
pub fn record_u128(&mut self, doc: RowId, column_name: &str, u128: u128) {
187+
let (hash_map, arena) = (&mut self.u128_field_hash_map, &mut self.arena);
188+
hash_map.mutate_or_create(
189+
column_name.as_bytes(),
190+
|column_opt: Option<ColumnWriter>| {
191+
let mut column: ColumnWriter = column_opt.unwrap_or_default();
192+
column.record(doc, u128, arena);
193+
column
194+
},
195+
);
196+
}
197+
180198
pub fn record_bool(&mut self, doc: RowId, column_name: &str, val: bool) {
181199
let (hash_map, arena) = (&mut self.bool_field_hash_map, &mut self.arena);
182200
hash_map.mutate_or_create(
@@ -323,6 +341,20 @@ impl ColumnarWriter {
323341
)?;
324342
column_serializer.finalize()?;
325343
}
344+
ColumnType::U128 => {
345+
let column_writer: ColumnWriter = self.u128_field_hash_map.read(addr);
346+
let cardinality = column_writer.get_cardinality(num_docs);
347+
let mut column_serializer =
348+
serializer.start_serialize_column(column_name, ColumnType::U128);
349+
serialize_u128_column(
350+
cardinality,
351+
num_docs,
352+
column_writer.operation_iterator(arena, &mut symbol_byte_buffer),
353+
buffers,
354+
&mut column_serializer,
355+
)?;
356+
column_serializer.finalize()?;
357+
}
326358
ColumnType::Bytes | ColumnType::Str => {
327359
let str_or_bytes_column_writer: StrOrBytesColumnWriter =
328360
if column_type == ColumnType::Bytes {
@@ -536,6 +568,30 @@ fn serialize_ip_addr_column(
536568
Ok(())
537569
}
538570

571+
fn serialize_u128_column(
572+
cardinality: Cardinality,
573+
num_docs: RowId,
574+
column_operations_it: impl Iterator<Item = ColumnOperation<u128>>,
575+
buffers: &mut SpareBuffers,
576+
wrt: &mut impl io::Write,
577+
) -> io::Result<()> {
578+
let SpareBuffers {
579+
value_index_builders,
580+
u128_values,
581+
..
582+
} = buffers;
583+
send_to_serialize_column_mappable_to_u128(
584+
column_operations_it,
585+
cardinality,
586+
num_docs,
587+
value_index_builders,
588+
u128_values,
589+
wrt,
590+
)?;
591+
Ok(())
592+
}
593+
594+
539595
fn send_to_serialize_column_mappable_to_u128<
540596
T: Copy + Ord + std::fmt::Debug + Send + Sync + MonotonicallyMappableToU128 + PartialOrd,
541597
>(

columnar/src/dynamic_column.rs

+16
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub enum DynamicColumn {
1616
I64(Column<i64>),
1717
U64(Column<u64>),
1818
F64(Column<f64>),
19+
U128(Column<u128>),
1920
IpAddr(Column<Ipv6Addr>),
2021
DateTime(Column<DateTime>),
2122
Bytes(BytesColumn),
@@ -30,6 +31,7 @@ impl fmt::Debug for DynamicColumn {
3031
DynamicColumn::I64(col) => write!(f, " {col:?}")?,
3132
DynamicColumn::U64(col) => write!(f, " {col:?}")?,
3233
DynamicColumn::F64(col) => write!(f, "{col:?}")?,
34+
DynamicColumn::U128(col) => write!(f, "{col:?}")?,
3335
DynamicColumn::IpAddr(col) => write!(f, "{col:?}")?,
3436
DynamicColumn::DateTime(col) => write!(f, "{col:?}")?,
3537
DynamicColumn::Bytes(col) => write!(f, "{col:?}")?,
@@ -46,6 +48,7 @@ impl DynamicColumn {
4648
DynamicColumn::I64(c) => &c.index,
4749
DynamicColumn::U64(c) => &c.index,
4850
DynamicColumn::F64(c) => &c.index,
51+
DynamicColumn::U128(c) => &c.index,
4952
DynamicColumn::IpAddr(c) => &c.index,
5053
DynamicColumn::DateTime(c) => &c.index,
5154
DynamicColumn::Bytes(c) => &c.ords().index,
@@ -63,6 +66,7 @@ impl DynamicColumn {
6366
DynamicColumn::I64(c) => c.values.num_vals(),
6467
DynamicColumn::U64(c) => c.values.num_vals(),
6568
DynamicColumn::F64(c) => c.values.num_vals(),
69+
DynamicColumn::U128(c) => c.values.num_vals(),
6670
DynamicColumn::IpAddr(c) => c.values.num_vals(),
6771
DynamicColumn::DateTime(c) => c.values.num_vals(),
6872
DynamicColumn::Bytes(c) => c.ords().values.num_vals(),
@@ -76,6 +80,7 @@ impl DynamicColumn {
7680
DynamicColumn::I64(_) => ColumnType::I64,
7781
DynamicColumn::U64(_) => ColumnType::U64,
7882
DynamicColumn::F64(_) => ColumnType::F64,
83+
DynamicColumn::U128(_) => ColumnType::U128,
7984
DynamicColumn::IpAddr(_) => ColumnType::IpAddr,
8085
DynamicColumn::DateTime(_) => ColumnType::DateTime,
8186
DynamicColumn::Bytes(_) => ColumnType::Bytes,
@@ -227,6 +232,7 @@ static_dynamic_conversions!(Column<DateTime>, DateTime);
227232
static_dynamic_conversions!(StrColumn, Str);
228233
static_dynamic_conversions!(BytesColumn, Bytes);
229234
static_dynamic_conversions!(Column<Ipv6Addr>, IpAddr);
235+
static_dynamic_conversions!(Column<u128>, U128);
230236

231237
#[derive(Clone, Debug)]
232238
pub struct DynamicColumnHandle {
@@ -272,6 +278,13 @@ impl DynamicColumnHandle {
272278
)?;
273279
Ok(Some(column))
274280
}
281+
ColumnType::U128 => {
282+
let column = crate::column::open_column_u128_as_compact_u64(
283+
column_bytes,
284+
self.format_version,
285+
)?;
286+
Ok(Some(column))
287+
}
275288
ColumnType::Bool
276289
| ColumnType::I64
277290
| ColumnType::U64
@@ -301,6 +314,9 @@ impl DynamicColumnHandle {
301314
ColumnType::F64 => {
302315
crate::column::open_column_u64::<f64>(column_bytes, self.format_version)?.into()
303316
}
317+
ColumnType::U128 => {
318+
crate::column::open_column_u128::<u128>(column_bytes, self.format_version)?.into()
319+
}
304320
ColumnType::Bool => {
305321
crate::column::open_column_u64::<bool>(column_bytes, self.format_version)?.into()
306322
}

columnar/src/tests.rs

+17
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ enum ColumnValue {
252252
Bytes(&'static [u8]),
253253
Numerical(NumericalValue),
254254
IpAddr(Ipv6Addr),
255+
U128(u128),
255256
Bool(bool),
256257
DateTime(DateTime),
257258
}
@@ -269,6 +270,7 @@ impl ColumnValue {
269270
ColumnValue::Bytes(_) => ColumnTypeCategory::Bytes,
270271
ColumnValue::Numerical(_) => ColumnTypeCategory::Numerical,
271272
ColumnValue::IpAddr(_) => ColumnTypeCategory::IpAddr,
273+
ColumnValue::U128(_) => ColumnTypeCategory::U128,
272274
ColumnValue::Bool(_) => ColumnTypeCategory::Bool,
273275
ColumnValue::DateTime(_) => ColumnTypeCategory::DateTime,
274276
}
@@ -303,6 +305,7 @@ fn column_value_strategy() -> impl Strategy<Value = ColumnValue> {
303305
0,
304306
ip_addr_byte
305307
))),
308+
1 => any::<u128>().prop_map(|val| ColumnValue::U128(val)),
306309
1 => any::<bool>().prop_map(ColumnValue::Bool),
307310
1 => (679_723_993i64..1_679_723_995i64)
308311
.prop_map(|val| { ColumnValue::DateTime(DateTime::from_timestamp_secs(val)) })
@@ -353,6 +356,9 @@ fn build_columnar_with_mapping(docs: &[Vec<(&'static str, ColumnValue)>]) -> Col
353356
ColumnValue::IpAddr(ip_addr) => {
354357
columnar_writer.record_ip_addr(doc_id as u32, column_name, ip_addr);
355358
}
359+
ColumnValue::U128(u128) => {
360+
columnar_writer.record_u128(doc_id as u32, column_name, u128);
361+
}
356362
ColumnValue::Bool(bool_val) => {
357363
columnar_writer.record_bool(doc_id as u32, column_name, bool_val);
358364
}
@@ -506,6 +512,15 @@ impl AssertEqualToColumnValue for Ipv6Addr {
506512
}
507513
}
508514

515+
impl AssertEqualToColumnValue for u128 {
516+
fn assert_equal_to_column_value(&self, column_value: &ColumnValue) {
517+
let ColumnValue::U128(val) = column_value else {
518+
panic!()
519+
};
520+
assert_eq!(self, val);
521+
}
522+
}
523+
509524
impl<T: Coerce + PartialEq + Debug + Into<NumericalValue>> AssertEqualToColumnValue for T {
510525
fn assert_equal_to_column_value(&self, column_value: &ColumnValue) {
511526
let ColumnValue::Numerical(num) = column_value else {
@@ -617,6 +632,8 @@ proptest! {
617632
assert_column_values(col, expected_col_values),
618633
DynamicColumn::IpAddr(col) =>
619634
assert_column_values(col, expected_col_values),
635+
DynamicColumn::U128(col) =>
636+
assert_column_values(col, expected_col_values),
620637
DynamicColumn::DateTime(col) =>
621638
assert_column_values(col, expected_col_values),
622639
DynamicColumn::Bytes(col) =>

src/aggregation/metric/top_hits.rs

+7
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,10 @@ impl TopHitsAggregationReq {
299299
.values_for_doc(doc_id)
300300
.map(FastFieldValue::IpAddr)
301301
.collect::<Vec<_>>(),
302+
DynamicColumn::U128(accessor) => accessor
303+
.values_for_doc(doc_id)
304+
.map(FastFieldValue::U128)
305+
.collect::<Vec<_>>(),
302306
DynamicColumn::DateTime(accessor) => accessor
303307
.values_for_doc(doc_id)
304308
.map(FastFieldValue::Date)
@@ -334,6 +338,8 @@ pub enum FastFieldValue {
334338
IpAddr(Ipv6Addr),
335339
/// A list of values.
336340
Array(Vec<Self>),
341+
/// U128
342+
U128(u128),
337343
}
338344

339345
impl From<FastFieldValue> for OwnedValue {
@@ -350,6 +356,7 @@ impl From<FastFieldValue> for OwnedValue {
350356
FastFieldValue::Array(a) => {
351357
OwnedValue::Array(a.into_iter().map(OwnedValue::from).collect())
352358
}
359+
FastFieldValue::U128(u128) => OwnedValue::U128(u128),
353360
}
354361
}
355362
}

src/core/json_utils.rs

+3
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,9 @@ pub(crate) fn index_json_value<'a, V: Value<'a>>(
207207
ReferenceValueLeaf::IpAddr(_) => {
208208
unimplemented!("IP address support in dynamic fields is not yet implemented")
209209
}
210+
ReferenceValueLeaf::U128(_) => {
211+
unimplemented!("U128 support in dynamic fields is not yet implemented")
212+
}
210213
},
211214
ReferenceValue::Array(elements) => {
212215
for val in elements {

0 commit comments

Comments
 (0)