19
19
20
20
import de .siegmar .fastcsv .reader .CommentStrategy ;
21
21
import de .siegmar .fastcsv .reader .CsvReader ;
22
- import de .siegmar .fastcsv .reader .CsvRow ;
22
+ import de .siegmar .fastcsv .reader .CsvRecord ;
23
+ import org .apache .commons .csv .CSVFormat ;
24
+ import org .apache .nifi .logging .ComponentLog ;
25
+ import org .apache .nifi .serialization .MalformedRecordException ;
26
+ import org .apache .nifi .serialization .record .DataType ;
27
+ import org .apache .nifi .serialization .record .MapRecord ;
28
+ import org .apache .nifi .serialization .record .Record ;
29
+ import org .apache .nifi .serialization .record .RecordField ;
30
+ import org .apache .nifi .serialization .record .RecordFieldType ;
31
+ import org .apache .nifi .serialization .record .RecordSchema ;
32
+
23
33
import java .io .IOException ;
24
34
import java .io .InputStream ;
25
35
import java .io .InputStreamReader ;
33
43
import java .util .Optional ;
34
44
import java .util .SortedMap ;
35
45
import java .util .TreeMap ;
36
- import org .apache .commons .csv .CSVFormat ;
37
- import org .apache .nifi .logging .ComponentLog ;
38
- import org .apache .nifi .serialization .MalformedRecordException ;
39
- import org .apache .nifi .serialization .record .DataType ;
40
- import org .apache .nifi .serialization .record .MapRecord ;
41
- import org .apache .nifi .serialization .record .Record ;
42
- import org .apache .nifi .serialization .record .RecordField ;
43
- import org .apache .nifi .serialization .record .RecordFieldType ;
44
- import org .apache .nifi .serialization .record .RecordSchema ;
45
46
46
47
public class FastCSVRecordReader extends AbstractCSVRecordReader {
47
- private final CsvReader csvReader ;
48
- private final Iterator <CsvRow > csvRowIterator ;
49
48
50
- private List <RecordField > recordFields ;
49
+ private final CsvReader <CsvRecord > csvReader ;
50
+ private final Iterator <CsvRecord > csvRecordIterator ;
51
51
52
+ private List <RecordField > recordFields ;
52
53
private Map <String , Integer > headerMap ;
53
54
54
55
private final boolean ignoreHeader ;
55
56
private final boolean trimDoubleQuote ;
56
57
private final CSVFormat csvFormat ;
57
58
58
- public FastCSVRecordReader (final InputStream in , final ComponentLog logger , final RecordSchema schema , final CSVFormat csvFormat , final boolean hasHeader , final boolean ignoreHeader ,
59
- final String dateFormat , final String timeFormat , final String timestampFormat , final String encoding , final boolean trimDoubleQuote ) throws IOException {
59
+ public FastCSVRecordReader (final InputStream in ,
60
+ final ComponentLog logger ,
61
+ final RecordSchema schema ,
62
+ final CSVFormat csvFormat ,
63
+ final boolean hasHeader ,
64
+ final boolean ignoreHeader ,
65
+ final String dateFormat ,
66
+ final String timeFormat ,
67
+ final String timestampFormat ,
68
+ final String encoding ,
69
+ final boolean trimDoubleQuote ) throws IOException {
60
70
super (logger , schema , hasHeader , ignoreHeader , dateFormat , timeFormat , timestampFormat , trimDoubleQuote );
61
71
this .ignoreHeader = ignoreHeader ;
62
72
this .trimDoubleQuote = trimDoubleQuote ;
@@ -66,8 +76,8 @@ public FastCSVRecordReader(final InputStream in, final ComponentLog logger, fina
66
76
.fieldSeparator (csvFormat .getDelimiterString ().charAt (0 ))
67
77
.quoteCharacter (csvFormat .getQuoteCharacter ())
68
78
.commentStrategy (CommentStrategy .SKIP )
69
- .skipEmptyRows (csvFormat .getIgnoreEmptyLines ())
70
- .errorOnDifferentFieldCount (! csvFormat .getAllowMissingColumnNames ());
79
+ .skipEmptyLines (csvFormat .getIgnoreEmptyLines ())
80
+ .ignoreDifferentFieldCount ( csvFormat .getAllowMissingColumnNames ());
71
81
72
82
if (csvFormat .getCommentMarker () != null ) {
73
83
builder .commentCharacter (csvFormat .getCommentMarker ());
@@ -82,23 +92,26 @@ public FastCSVRecordReader(final InputStream in, final ComponentLog logger, fina
82
92
}
83
93
}
84
94
85
- csvReader = builder .build (new InputStreamReader (in , encoding ));
86
- csvRowIterator = csvReader .iterator ();
95
+ this . csvReader = builder .ofCsvRecord (new InputStreamReader (in , encoding ));
96
+ this . csvRecordIterator = csvReader .iterator ();
87
97
}
88
98
89
99
@ Override
90
- public Record nextRecord (final boolean coerceTypes , final boolean dropUnknownFields ) throws IOException , MalformedRecordException {
100
+ public Record nextRecord (final boolean coerceTypes , final boolean dropUnknownFields )
101
+ throws IOException , MalformedRecordException {
91
102
92
103
try {
93
104
final RecordSchema schema = getSchema ();
94
-
95
105
final List <RecordField > recordFields = getRecordFields ();
96
106
final int numFieldNames = recordFields .size ();
97
- if (!csvRowIterator .hasNext ()) {
107
+
108
+ if (!csvRecordIterator .hasNext ()) {
98
109
return null ;
99
110
}
100
- final CsvRow csvRecord = csvRowIterator .next ();
111
+
112
+ final CsvRecord csvRecord = csvRecordIterator .next ();
101
113
final Map <String , Object > values = new LinkedHashMap <>(recordFields .size () * 2 );
114
+
102
115
for (int i = 0 ; i < csvRecord .getFieldCount (); i ++) {
103
116
String rawValue = csvRecord .getField (i );
104
117
if (csvFormat .getTrim ()) {
@@ -108,28 +121,20 @@ public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFie
108
121
rawValue = trim (rawValue );
109
122
}
110
123
111
- final String rawFieldName ;
112
- final DataType dataType ;
113
124
if (i >= numFieldNames ) {
114
125
if (!dropUnknownFields ) {
115
126
values .put ("unknown_field_index_" + i , rawValue );
116
127
}
117
128
continue ;
118
- } else {
119
- final RecordField recordField = recordFields .get (i );
120
- rawFieldName = recordField .getFieldName ();
121
- dataType = recordField .getDataType ();
122
129
}
123
130
124
- final Object value ;
125
- if (coerceTypes ) {
126
- value = convert (rawValue , dataType , rawFieldName );
127
- } else {
128
- // The CSV Reader is going to return all fields as Strings, because CSV doesn't have any way to
129
- // dictate a field type. As a result, we will use the schema that we have to attempt to convert
130
- // the value into the desired type if it's a simple type.
131
- value = convertSimpleIfPossible (rawValue , dataType , rawFieldName );
132
- }
131
+ final RecordField recordField = recordFields .get (i );
132
+ final String rawFieldName = recordField .getFieldName ();
133
+ final DataType dataType = recordField .getDataType ();
134
+
135
+ final Object value = coerceTypes
136
+ ? convert (rawValue , dataType , rawFieldName )
137
+ : convertSimpleIfPossible (rawValue , dataType , rawFieldName );
133
138
134
139
values .putIfAbsent (rawFieldName , value );
135
140
}
@@ -140,10 +145,9 @@ public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFie
140
145
}
141
146
}
142
147
143
-
144
148
private List <RecordField > getRecordFields () {
145
- if (this . recordFields != null ) {
146
- return this . recordFields ;
149
+ if (recordFields != null ) {
150
+ return recordFields ;
147
151
}
148
152
149
153
if (ignoreHeader ) {
@@ -152,39 +156,33 @@ private List<RecordField> getRecordFields() {
152
156
+ "have the same number of fields, as this is not conformant to RFC-4180" );
153
157
}
154
158
155
- // When getting the field names from the first record, it has to be read in
156
- if (!csvRowIterator .hasNext ()) {
159
+ if (!csvRecordIterator .hasNext ()) {
157
160
return Collections .emptyList ();
158
161
}
159
- CsvRow headerRow = csvRowIterator .next ();
162
+
163
+ // read header row
164
+ CsvRecord headerRecord = csvRecordIterator .next ();
160
165
headerMap = new HashMap <>();
161
- for (int i = 0 ; i < headerRow .getFieldCount (); i ++) {
162
- String rawValue = headerRow .getField (i );
166
+ for (int i = 0 ; i < headerRecord .getFieldCount (); i ++) {
167
+ String rawValue = headerRecord .getField (i );
163
168
if (csvFormat .getTrim ()) {
164
169
rawValue = rawValue .trim ();
165
170
}
166
- if (this . trimDoubleQuote ) {
171
+ if (trimDoubleQuote ) {
167
172
rawValue = trim (rawValue );
168
173
}
169
174
headerMap .put (rawValue , i );
170
175
}
171
176
172
-
173
- // Use a SortedMap keyed by index of the field so that we can get a List of field names in the correct order
174
- final SortedMap <Integer , String > sortedMap = new TreeMap <>();
175
- for (final Map .Entry <String , Integer > entry : headerMap .entrySet ()) {
177
+ SortedMap <Integer , String > sortedMap = new TreeMap <>();
178
+ for (Map .Entry <String , Integer > entry : headerMap .entrySet ()) {
176
179
sortedMap .put (entry .getValue (), entry .getKey ());
177
180
}
178
181
179
- final List <RecordField > fields = new ArrayList <>();
180
- final List <String > rawFieldNames = new ArrayList <>(sortedMap .values ());
181
- for (final String rawFieldName : rawFieldNames ) {
182
- final Optional <RecordField > option = schema .getField (rawFieldName );
183
- if (option .isPresent ()) {
184
- fields .add (option .get ());
185
- } else {
186
- fields .add (new RecordField (rawFieldName , RecordFieldType .STRING .getDataType ()));
187
- }
182
+ List <RecordField > fields = new ArrayList <>();
183
+ for (String rawFieldName : new ArrayList <>(sortedMap .values ())) {
184
+ Optional <RecordField > optField = getSchema ().getField (rawFieldName );
185
+ fields .add (optField .orElseGet (() -> new RecordField (rawFieldName , RecordFieldType .STRING .getDataType ())));
188
186
}
189
187
190
188
this .recordFields = fields ;
0 commit comments