|
8 | 8 |
|
9 | 9 | package org.opensearch.index.codec.composite.composite99;
|
10 | 10 |
|
| 11 | +import org.apache.logging.log4j.LogManager; |
| 12 | +import org.apache.logging.log4j.Logger; |
| 13 | +import org.apache.lucene.codecs.CodecUtil; |
11 | 14 | import org.apache.lucene.codecs.DocValuesProducer;
|
12 | 15 | import org.apache.lucene.index.BinaryDocValues;
|
| 16 | +import org.apache.lucene.index.CorruptIndexException; |
| 17 | +import org.apache.lucene.index.DocValues; |
13 | 18 | import org.apache.lucene.index.FieldInfo;
|
| 19 | +import org.apache.lucene.index.FieldInfos; |
| 20 | +import org.apache.lucene.index.IndexFileNames; |
14 | 21 | import org.apache.lucene.index.NumericDocValues;
|
15 | 22 | import org.apache.lucene.index.SegmentReadState;
|
16 | 23 | import org.apache.lucene.index.SortedDocValues;
|
17 | 24 | import org.apache.lucene.index.SortedNumericDocValues;
|
18 | 25 | import org.apache.lucene.index.SortedSetDocValues;
|
| 26 | +import org.apache.lucene.store.ChecksumIndexInput; |
| 27 | +import org.apache.lucene.store.IndexInput; |
19 | 28 | import org.opensearch.common.annotation.ExperimentalApi;
|
| 29 | +import org.opensearch.common.util.io.IOUtils; |
20 | 30 | import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
|
21 | 31 | import org.opensearch.index.codec.composite.CompositeIndexReader;
|
22 |
| -import org.opensearch.index.codec.composite.CompositeIndexValues; |
| 32 | +import org.opensearch.index.codec.composite.LuceneDocValuesProducerFactory; |
| 33 | +import org.opensearch.index.compositeindex.CompositeIndexMetadata; |
| 34 | +import org.opensearch.index.compositeindex.datacube.Metric; |
| 35 | +import org.opensearch.index.compositeindex.datacube.MetricStat; |
| 36 | +import org.opensearch.index.compositeindex.datacube.startree.fileformats.meta.StarTreeMetadata; |
| 37 | +import org.opensearch.index.compositeindex.datacube.startree.index.CompositeIndexValues; |
| 38 | +import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; |
| 39 | +import org.opensearch.index.mapper.CompositeMappedFieldType; |
23 | 40 |
|
24 | 41 | import java.io.IOException;
|
25 | 42 | import java.util.ArrayList;
|
| 43 | +import java.util.LinkedHashMap; |
26 | 44 | import java.util.List;
|
| 45 | +import java.util.Map; |
| 46 | + |
| 47 | +import static org.opensearch.index.compositeindex.CompositeIndexConstants.COMPOSITE_FIELD_MARKER; |
| 48 | +import static org.opensearch.index.compositeindex.datacube.startree.fileformats.StarTreeWriter.VERSION_CURRENT; |
| 49 | +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils.fullyQualifiedFieldNameForStarTreeDimensionsDocValues; |
| 50 | +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues; |
| 51 | +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils.getFieldInfoList; |
27 | 52 |
|
28 | 53 | /**
|
29 | 54 | * Reader for star tree index and star tree doc values from the segments
|
|
32 | 57 | */
|
33 | 58 | @ExperimentalApi
|
34 | 59 | public class Composite99DocValuesReader extends DocValuesProducer implements CompositeIndexReader {
|
35 |
| - private DocValuesProducer delegate; |
| 60 | + private static final Logger logger = LogManager.getLogger(Composite99DocValuesReader.class); |
| 61 | + |
| 62 | + private final DocValuesProducer delegate; |
| 63 | + private IndexInput dataIn; |
| 64 | + private ChecksumIndexInput metaIn; |
| 65 | + private final Map<String, IndexInput> compositeIndexInputMap = new LinkedHashMap<>(); |
| 66 | + private final Map<String, CompositeIndexMetadata> compositeIndexMetadataMap = new LinkedHashMap<>(); |
| 67 | + private final List<String> fields; |
| 68 | + private DocValuesProducer compositeDocValuesProducer; |
| 69 | + private final List<CompositeIndexFieldInfo> compositeFieldInfos = new ArrayList<>(); |
| 70 | + private SegmentReadState readState; |
36 | 71 |
|
37 |
| - public Composite99DocValuesReader(DocValuesProducer producer, SegmentReadState state) throws IOException { |
| 72 | + public Composite99DocValuesReader(DocValuesProducer producer, SegmentReadState readState) throws IOException { |
38 | 73 | this.delegate = producer;
|
39 |
| - // TODO : read star tree files |
| 74 | + this.fields = new ArrayList<>(); |
| 75 | + |
| 76 | + String metaFileName = IndexFileNames.segmentFileName( |
| 77 | + readState.segmentInfo.name, |
| 78 | + readState.segmentSuffix, |
| 79 | + Composite99DocValuesFormat.META_EXTENSION |
| 80 | + ); |
| 81 | + |
| 82 | + String dataFileName = IndexFileNames.segmentFileName( |
| 83 | + readState.segmentInfo.name, |
| 84 | + readState.segmentSuffix, |
| 85 | + Composite99DocValuesFormat.DATA_EXTENSION |
| 86 | + ); |
| 87 | + |
| 88 | + boolean success = false; |
| 89 | + try { |
| 90 | + |
| 91 | + // initialize meta input |
| 92 | + dataIn = readState.directory.openInput(dataFileName, readState.context); |
| 93 | + CodecUtil.checkIndexHeader( |
| 94 | + dataIn, |
| 95 | + Composite99DocValuesFormat.DATA_CODEC_NAME, |
| 96 | + Composite99DocValuesFormat.VERSION_START, |
| 97 | + Composite99DocValuesFormat.VERSION_CURRENT, |
| 98 | + readState.segmentInfo.getId(), |
| 99 | + readState.segmentSuffix |
| 100 | + ); |
| 101 | + |
| 102 | + // initialize data input |
| 103 | + metaIn = readState.directory.openChecksumInput(metaFileName, readState.context); |
| 104 | + Throwable priorE = null; |
| 105 | + try { |
| 106 | + CodecUtil.checkIndexHeader( |
| 107 | + metaIn, |
| 108 | + Composite99DocValuesFormat.META_CODEC_NAME, |
| 109 | + Composite99DocValuesFormat.VERSION_START, |
| 110 | + Composite99DocValuesFormat.VERSION_CURRENT, |
| 111 | + readState.segmentInfo.getId(), |
| 112 | + readState.segmentSuffix |
| 113 | + ); |
| 114 | + |
| 115 | + while (true) { |
| 116 | + |
| 117 | + // validate magic marker |
| 118 | + long magicMarker = metaIn.readLong(); |
| 119 | + if (magicMarker == -1) { |
| 120 | + break; |
| 121 | + } else if (magicMarker < 0) { |
| 122 | + throw new CorruptIndexException("Unknown token encountered: " + magicMarker, metaIn); |
| 123 | + } else if (COMPOSITE_FIELD_MARKER != magicMarker) { |
| 124 | + logger.error("Invalid composite field magic marker"); |
| 125 | + throw new IOException("Invalid composite field magic marker"); |
| 126 | + } |
| 127 | + |
| 128 | + int version = metaIn.readVInt(); |
| 129 | + if (VERSION_CURRENT != version) { |
| 130 | + logger.error("Invalid composite field version"); |
| 131 | + throw new IOException("Invalid composite field version"); |
| 132 | + } |
| 133 | + |
| 134 | + // construct composite index metadata |
| 135 | + String compositeFieldName = metaIn.readString(); |
| 136 | + CompositeMappedFieldType.CompositeFieldType compositeFieldType = CompositeMappedFieldType.CompositeFieldType.fromName( |
| 137 | + metaIn.readString() |
| 138 | + ); |
| 139 | + |
| 140 | + switch (compositeFieldType) { |
| 141 | + case STAR_TREE: |
| 142 | + StarTreeMetadata starTreeMetadata = new StarTreeMetadata( |
| 143 | + metaIn, |
| 144 | + compositeFieldName, |
| 145 | + compositeFieldType, |
| 146 | + version |
| 147 | + ); |
| 148 | + compositeFieldInfos.add(new CompositeIndexFieldInfo(compositeFieldName, compositeFieldType)); |
| 149 | + |
| 150 | + IndexInput starTreeIndexInput = dataIn.slice( |
| 151 | + "star-tree data slice for respective star-tree fields", |
| 152 | + starTreeMetadata.getDataStartFilePointer(), |
| 153 | + starTreeMetadata.getDataLength() |
| 154 | + ); |
| 155 | + compositeIndexInputMap.put(compositeFieldName, starTreeIndexInput); |
| 156 | + compositeIndexMetadataMap.put(compositeFieldName, starTreeMetadata); |
| 157 | + |
| 158 | + List<String> dimensionFields = starTreeMetadata.getDimensionFields(); |
| 159 | + |
| 160 | + // generating star tree unique fields (fully qualified name for dimension and metrics) |
| 161 | + for (String dimensions : dimensionFields) { |
| 162 | + fields.add(fullyQualifiedFieldNameForStarTreeDimensionsDocValues(compositeFieldName, dimensions)); |
| 163 | + } |
| 164 | + |
| 165 | + // adding metric fields |
| 166 | + for (Metric metric : starTreeMetadata.getMetrics()) { |
| 167 | + for (MetricStat metricStat : metric.getMetrics()) { |
| 168 | + fields.add( |
| 169 | + fullyQualifiedFieldNameForStarTreeMetricsDocValues( |
| 170 | + compositeFieldName, |
| 171 | + metric.getField(), |
| 172 | + metricStat.getTypeName() |
| 173 | + ) |
| 174 | + ); |
| 175 | + |
| 176 | + } |
| 177 | + } |
| 178 | + |
| 179 | + break; |
| 180 | + default: |
| 181 | + throw new CorruptIndexException("Invalid composite field type found in the file", dataIn); |
| 182 | + } |
| 183 | + } |
| 184 | + |
| 185 | + // populates the dummy list of field infos to fetch doc id set iterators for respective fields. |
| 186 | + // the dummy field info is used to fetch the doc id set iterators for respective fields based on field name |
| 187 | + FieldInfos fieldInfos = new FieldInfos(getFieldInfoList(fields)); |
| 188 | + this.readState = new SegmentReadState(readState.directory, readState.segmentInfo, fieldInfos, readState.context); |
| 189 | + |
| 190 | + // initialize star-tree doc values producer |
| 191 | + |
| 192 | + compositeDocValuesProducer = LuceneDocValuesProducerFactory.getDocValuesProducerForCompositeCodec( |
| 193 | + Composite99Codec.COMPOSITE_INDEX_CODEC_NAME, |
| 194 | + this.readState, |
| 195 | + Composite99DocValuesFormat.DATA_DOC_VALUES_CODEC, |
| 196 | + Composite99DocValuesFormat.DATA_DOC_VALUES_EXTENSION, |
| 197 | + Composite99DocValuesFormat.META_DOC_VALUES_CODEC, |
| 198 | + Composite99DocValuesFormat.META_DOC_VALUES_EXTENSION |
| 199 | + ); |
| 200 | + |
| 201 | + } catch (Throwable t) { |
| 202 | + priorE = t; |
| 203 | + } finally { |
| 204 | + CodecUtil.checkFooter(metaIn, priorE); |
| 205 | + } |
| 206 | + success = true; |
| 207 | + } finally { |
| 208 | + if (success == false) { |
| 209 | + IOUtils.closeWhileHandlingException(this); |
| 210 | + } |
| 211 | + } |
40 | 212 | }
|
41 | 213 |
|
42 | 214 | @Override
|
@@ -67,24 +239,63 @@ public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
|
67 | 239 | @Override
|
68 | 240 | public void checkIntegrity() throws IOException {
|
69 | 241 | delegate.checkIntegrity();
|
70 |
| - // Todo : check integrity of composite index related [star tree] files |
| 242 | + CodecUtil.checksumEntireFile(dataIn); |
71 | 243 | }
|
72 | 244 |
|
73 | 245 | @Override
|
74 | 246 | public void close() throws IOException {
|
75 | 247 | delegate.close();
|
76 |
| - // Todo: close composite index related files [star tree] files |
| 248 | + boolean success = false; |
| 249 | + try { |
| 250 | + IOUtils.close(metaIn, dataIn); |
| 251 | + IOUtils.close(compositeDocValuesProducer); |
| 252 | + success = true; |
| 253 | + } finally { |
| 254 | + if (!success) { |
| 255 | + IOUtils.closeWhileHandlingException(metaIn, dataIn); |
| 256 | + } |
| 257 | + compositeIndexInputMap.clear(); |
| 258 | + compositeIndexMetadataMap.clear(); |
| 259 | + fields.clear(); |
| 260 | + metaIn = null; |
| 261 | + dataIn = null; |
| 262 | + } |
77 | 263 | }
|
78 | 264 |
|
79 | 265 | @Override
|
80 | 266 | public List<CompositeIndexFieldInfo> getCompositeIndexFields() {
|
81 |
| - // todo : read from file formats and get the field names. |
82 |
| - return new ArrayList<>(); |
| 267 | + return compositeFieldInfos; |
83 | 268 | }
|
84 | 269 |
|
85 | 270 | @Override
|
86 | 271 | public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo compositeIndexFieldInfo) throws IOException {
|
87 |
| - // TODO : read compositeIndexValues [starTreeValues] from star tree files |
88 |
| - throw new UnsupportedOperationException(); |
| 272 | + |
| 273 | + switch (compositeIndexFieldInfo.getType()) { |
| 274 | + case STAR_TREE: |
| 275 | + return new StarTreeValues( |
| 276 | + compositeIndexMetadataMap.get(compositeIndexFieldInfo.getField()), |
| 277 | + compositeIndexInputMap.get(compositeIndexFieldInfo.getField()), |
| 278 | + compositeDocValuesProducer, |
| 279 | + this.readState |
| 280 | + ); |
| 281 | + |
| 282 | + default: |
| 283 | + throw new CorruptIndexException("Unsupported composite index field type: ", compositeIndexFieldInfo.getType().getName()); |
| 284 | + } |
| 285 | + |
89 | 286 | }
|
| 287 | + |
| 288 | + /** |
| 289 | + * Returns the sorted numeric doc values for the given sorted numeric field. |
| 290 | + * If the sorted numeric field is null, it returns an empty doc id set iterator. |
| 291 | + * <p> |
| 292 | + * Sorted numeric field can be null for cases where the segment doesn't hold a particular value. |
| 293 | + * |
| 294 | + * @param sortedNumeric the sorted numeric doc values for a field |
| 295 | + * @return empty sorted numeric values if the field is not present, else sortedNumeric |
| 296 | + */ |
| 297 | + public static SortedNumericDocValues getSortedNumericDocValues(SortedNumericDocValues sortedNumeric) { |
| 298 | + return sortedNumeric == null ? DocValues.emptySortedNumeric() : sortedNumeric; |
| 299 | + } |
| 300 | + |
90 | 301 | }
|
0 commit comments