|
7 | 7 | */
|
8 | 8 | package org.opensearch.index.compositeindex.datacube.startree.aggregators;
|
9 | 9 |
|
| 10 | +import org.opensearch.index.compositeindex.datacube.startree.utils.CompensatedSumType; |
10 | 11 | import org.opensearch.index.mapper.FieldValueConverter;
|
11 | 12 | import org.opensearch.index.mapper.NumberFieldMapper;
|
12 | 13 | import org.opensearch.search.aggregations.metrics.CompensatedSum;
|
|
21 | 22 | *
|
22 | 23 | * @opensearch.experimental
|
23 | 24 | */
|
24 |
| -class SumValueAggregator implements ValueAggregator<Double> { |
| 25 | +class SumValueAggregator implements ValueAggregator<CompensatedSum> { |
25 | 26 |
|
26 | 27 | private final FieldValueConverter fieldValueConverter;
|
| 28 | + private final CompensatedSumType compensatedSumConverter; |
27 | 29 | private static final FieldValueConverter VALUE_AGGREGATOR_TYPE = NumberFieldMapper.NumberType.DOUBLE;
|
28 | 30 |
|
29 |
| - private CompensatedSum kahanSummation = new CompensatedSum(0, 0); |
30 |
| - |
31 | 31 | public SumValueAggregator(FieldValueConverter fieldValueConverter) {
|
32 | 32 | this.fieldValueConverter = fieldValueConverter;
|
| 33 | + this.compensatedSumConverter = new CompensatedSumType(); |
33 | 34 | }
|
34 | 35 |
|
35 | 36 | @Override
|
36 | 37 | public FieldValueConverter getAggregatedValueType() {
|
37 |
| - return VALUE_AGGREGATOR_TYPE; |
| 38 | + return compensatedSumConverter; |
38 | 39 | }
|
39 | 40 |
|
40 | 41 | @Override
|
41 |
| - public Double getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue) { |
42 |
| - kahanSummation.reset(0, 0); |
| 42 | + public CompensatedSum getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue) { |
| 43 | + CompensatedSum kahanSummation = new CompensatedSum(0, 0); |
43 | 44 | // add takes care of the sum and compensation internally
|
44 | 45 | if (segmentDocValue != null) {
|
45 |
| - kahanSummation.add(fieldValueConverter.toDoubleValue(segmentDocValue)); |
| 46 | + kahanSummation.reset(fieldValueConverter.toDoubleValue(segmentDocValue), 0); |
46 | 47 | } else {
|
47 |
| - kahanSummation.add(getIdentityMetricValue()); |
| 48 | + kahanSummation.reset(getIdentityMetricDoubleValue(), 0); |
48 | 49 | }
|
49 |
| - return kahanSummation.value(); |
| 50 | + return kahanSummation; |
50 | 51 | }
|
51 | 52 |
|
52 | 53 | // we have overridden this method because the reset with sum and compensation helps us keep
|
53 | 54 | // track of precision and avoids a potential loss in accuracy of sums.
|
54 | 55 | @Override
|
55 |
| - public Double mergeAggregatedValueAndSegmentValue(Double value, Long segmentDocValue) { |
56 |
| - assert value == null || kahanSummation.value() == value; |
| 56 | + public CompensatedSum mergeAggregatedValueAndSegmentValue(CompensatedSum value, Long segmentDocValue) { |
| 57 | + CompensatedSum kahanSummation = new CompensatedSum(0, 0); |
| 58 | + if (value != null) { |
| 59 | + kahanSummation.reset(value.value(), value.delta()); |
| 60 | + } |
57 | 61 | // add takes care of the sum and compensation internally
|
58 | 62 | if (segmentDocValue != null) {
|
59 | 63 | kahanSummation.add(fieldValueConverter.toDoubleValue(segmentDocValue));
|
60 | 64 | } else {
|
61 |
| - kahanSummation.add(getIdentityMetricValue()); |
| 65 | + kahanSummation.add(getIdentityMetricDoubleValue()); |
62 | 66 | }
|
63 |
| - return kahanSummation.value(); |
| 67 | + return kahanSummation; |
64 | 68 | }
|
65 | 69 |
|
66 | 70 | @Override
|
67 |
| - public Double mergeAggregatedValues(Double value, Double aggregatedValue) { |
68 |
| - assert aggregatedValue == null || kahanSummation.value() == aggregatedValue; |
69 |
| - // add takes care of the sum and compensation internally |
| 71 | + public CompensatedSum mergeAggregatedValues(CompensatedSum value, CompensatedSum aggregatedValue) { |
| 72 | + CompensatedSum kahanSummation = new CompensatedSum(0, 0); |
| 73 | + if (aggregatedValue != null) { |
| 74 | + kahanSummation.reset(aggregatedValue.value(), aggregatedValue.delta()); |
| 75 | + } |
70 | 76 | if (value != null) {
|
71 |
| - kahanSummation.add(value); |
| 77 | + kahanSummation.add(value.value(), value.delta()); |
72 | 78 | } else {
|
73 |
| - kahanSummation.add(getIdentityMetricValue()); |
| 79 | + kahanSummation.add(getIdentityMetricDoubleValue()); |
74 | 80 | }
|
75 |
| - return kahanSummation.value(); |
| 81 | + return kahanSummation; |
76 | 82 | }
|
77 | 83 |
|
78 | 84 | @Override
|
79 |
| - public Double getInitialAggregatedValue(Double value) { |
80 |
| - kahanSummation.reset(0, 0); |
| 85 | + public CompensatedSum getInitialAggregatedValue(CompensatedSum value) { |
| 86 | + CompensatedSum kahanSummation = new CompensatedSum(0, 0); |
81 | 87 | // add takes care of the sum and compensation internally
|
82 |
| - if (value != null) { |
83 |
| - kahanSummation.add(value); |
| 88 | + if (value == null) { |
| 89 | + kahanSummation.reset(getIdentityMetricDoubleValue(), 0); |
84 | 90 | } else {
|
85 |
| - kahanSummation.add(getIdentityMetricValue()); |
| 91 | + kahanSummation.reset(value.value(), value.delta()); |
86 | 92 | }
|
87 |
| - return kahanSummation.value(); |
| 93 | + return kahanSummation; |
88 | 94 | }
|
89 | 95 |
|
90 | 96 | @Override
|
91 |
| - public Double toAggregatedValueType(Long value) { |
| 97 | + public CompensatedSum toAggregatedValueType(Long value) { |
| 98 | + CompensatedSum kahanSummation = new CompensatedSum(0, 0); |
92 | 99 | try {
|
93 | 100 | if (value == null) {
|
94 |
| - return getIdentityMetricValue(); |
| 101 | + kahanSummation.reset(getIdentityMetricDoubleValue(), 0); |
| 102 | + return kahanSummation; |
95 | 103 | }
|
96 |
| - return VALUE_AGGREGATOR_TYPE.toDoubleValue(value); |
| 104 | + kahanSummation.reset(VALUE_AGGREGATOR_TYPE.toDoubleValue(value), 0); |
| 105 | + return kahanSummation; |
97 | 106 | } catch (Exception e) {
|
98 | 107 | throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e);
|
99 | 108 | }
|
100 | 109 | }
|
101 | 110 |
|
| 111 | + /** |
| 112 | + * Since getIdentityMetricValue is called for every null document, and it creates a new object, |
| 113 | + * in this class, calling getIdentityMetricDoubleValue to avoid initializing an object |
| 114 | + */ |
| 115 | + private double getIdentityMetricDoubleValue() { |
| 116 | + return 0.0; |
| 117 | + } |
| 118 | + |
| 119 | + /** |
| 120 | + * Since getIdentityMetricValue is called for every null document, and it creates a new object, |
| 121 | + * in this class, calling getIdentityMetricDoubleValue to avoid initializing an object |
| 122 | + */ |
102 | 123 | @Override
|
103 |
| - public Double getIdentityMetricValue() { |
| 124 | + public CompensatedSum getIdentityMetricValue() { |
104 | 125 | // in present aggregations, if the metric behind sum is missing, we treat it as 0
|
105 |
| - return 0D; |
| 126 | + return new CompensatedSum(0, 0); |
106 | 127 | }
|
107 | 128 | }
|
0 commit comments