|
37 | 37 | import org.apache.nifi.processor.Relationship;
|
38 | 38 | import org.apache.nifi.processor.exception.ProcessException;
|
39 | 39 | import org.apache.nifi.processor.util.StandardValidators;
|
| 40 | +import org.apache.nifi.processors.box.utils.BoxDate; |
40 | 41 | import org.apache.nifi.serialization.RecordReader;
|
41 | 42 | import org.apache.nifi.serialization.RecordReaderFactory;
|
42 | 43 | import org.apache.nifi.serialization.record.Record;
|
| 44 | +import org.apache.nifi.serialization.record.RecordField; |
| 45 | +import org.apache.nifi.serialization.record.RecordFieldType; |
43 | 46 |
|
44 | 47 | import java.io.InputStream;
|
| 48 | +import java.time.LocalDate; |
45 | 49 | import java.util.ArrayList;
|
| 50 | +import java.util.Arrays; |
46 | 51 | import java.util.List;
|
47 | 52 | import java.util.Map;
|
| 53 | +import java.util.Objects; |
48 | 54 | import java.util.Set;
|
49 | 55 |
|
50 | 56 | import static java.lang.String.valueOf;
|
@@ -223,20 +229,58 @@ private void processRecord(Record record, Metadata metadata, List<String> errors
|
223 | 229 | return;
|
224 | 230 | }
|
225 | 231 |
|
226 |
| - List<String> fieldNames = record.getSchema().getFieldNames(); |
| 232 | + final List<RecordField> fields = record.getSchema().getFields(); |
227 | 233 |
|
228 |
| - if (fieldNames.isEmpty()) { |
| 234 | + if (fields.isEmpty()) { |
229 | 235 | errors.add("Record has no fields");
|
230 | 236 | return;
|
231 | 237 | }
|
232 | 238 |
|
233 |
| - for (String fieldName : fieldNames) { |
234 |
| - Object valueObj = record.getValue(fieldName); |
235 |
| - String value = valueObj != null ? valueObj.toString() : null; |
236 |
| - metadata.add("/" + fieldName, value); |
| 239 | + for (final RecordField field : fields) { |
| 240 | + addValueToMetadata(metadata, record, field); |
237 | 241 | }
|
238 | 242 | }
|
239 | 243 |
|
| 244 | + private void addValueToMetadata(final Metadata metadata, final Record record, final RecordField field) { |
| 245 | + if (record.getValue(field) == null) { |
| 246 | + return; |
| 247 | + } |
| 248 | + |
| 249 | + final RecordFieldType fieldType = field.getDataType().getFieldType(); |
| 250 | + final String fieldName = field.getFieldName(); |
| 251 | + final String path = "/" + fieldName; |
| 252 | + |
| 253 | + if (isNumber(fieldType)) { |
| 254 | + metadata.add(path, record.getAsDouble(fieldName)); |
| 255 | + } else if (isDate(fieldType)) { |
| 256 | + final LocalDate date = record.getAsLocalDate(fieldName, null); |
| 257 | + metadata.add(path, BoxDate.of(date).format()); |
| 258 | + } else if (isArray(fieldType)) { |
| 259 | + final List<String> values = Arrays.stream(record.getAsArray(fieldName)) |
| 260 | + .filter(Objects::nonNull) |
| 261 | + .map(Object::toString) |
| 262 | + .toList(); |
| 263 | + |
| 264 | + metadata.add(path, values); |
| 265 | + } else { |
| 266 | + metadata.add(path, record.getAsString(fieldName)); |
| 267 | + } |
| 268 | + } |
| 269 | + |
| 270 | + private boolean isNumber(final RecordFieldType fieldType) { |
| 271 | + final boolean isInteger = RecordFieldType.BIGINT.equals(fieldType) || RecordFieldType.BIGINT.isWiderThan(fieldType); |
| 272 | + final boolean isFloat = RecordFieldType.DECIMAL.equals(fieldType) || RecordFieldType.DECIMAL.isWiderThan(fieldType); |
| 273 | + return isInteger || isFloat; |
| 274 | + } |
| 275 | + |
| 276 | + private boolean isDate(final RecordFieldType fieldType) { |
| 277 | + return RecordFieldType.DATE.equals(fieldType); |
| 278 | + } |
| 279 | + |
| 280 | + private boolean isArray(final RecordFieldType fieldType) { |
| 281 | + return RecordFieldType.ARRAY.equals(fieldType); |
| 282 | + } |
| 283 | + |
240 | 284 | /**
|
241 | 285 | * Returns a BoxFile object for the given file ID.
|
242 | 286 | *
|
|
0 commit comments