|
1 | 1 | /*
|
2 |
| - * Copyright 2002-2018 the original author or authors. |
| 2 | + * Copyright 2002-2019 the original author or authors. |
3 | 3 | *
|
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License");
|
5 | 5 | * you may not use this file except in compliance with the License.
|
|
25 | 25 | import java.util.Map;
|
26 | 26 | import java.util.concurrent.ConcurrentHashMap;
|
27 | 27 | import java.util.concurrent.ConcurrentMap;
|
28 |
| -import java.util.stream.Collectors; |
29 | 28 |
|
30 | 29 | import org.reactivestreams.Publisher;
|
31 | 30 | import reactor.core.publisher.Flux;
|
@@ -88,111 +87,114 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
|
88 | 87 | }
|
89 | 88 |
|
90 | 89 | @Override
|
91 |
| - public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType, |
| 90 | + public Flux<String> decode(Publisher<DataBuffer> input, ResolvableType elementType, |
92 | 91 | @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
93 | 92 |
|
94 | 93 | List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
|
95 | 94 |
|
96 |
| - Flux<DataBuffer> inputFlux = Flux.from(inputStream) |
97 |
| - .flatMapIterable(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes)) |
98 |
| - .bufferUntil(StringDecoder::isEndFrame) |
| 95 | + Flux<DataBuffer> inputFlux = Flux.from(input) |
| 96 | + .flatMapIterable(buffer -> splitOnDelimiter(buffer, delimiterBytes)) |
| 97 | + .bufferUntil(buffer -> buffer == END_FRAME) |
99 | 98 | .map(StringDecoder::joinUntilEndFrame)
|
100 | 99 | .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
|
101 | 100 |
|
102 | 101 | return super.decode(inputFlux, elementType, mimeType, hints);
|
103 | 102 | }
|
104 | 103 |
|
105 | 104 | private List<byte[]> getDelimiterBytes(@Nullable MimeType mimeType) {
|
106 |
| - return this.delimitersCache.computeIfAbsent(getCharset(mimeType), |
107 |
| - charset -> this.delimiters.stream() |
108 |
| - .map(s -> s.getBytes(charset)) |
109 |
| - .collect(Collectors.toList())); |
| 105 | + return this.delimitersCache.computeIfAbsent(getCharset(mimeType), charset -> { |
| 106 | + List<byte[]> list = new ArrayList<>(); |
| 107 | + for (String delimiter : this.delimiters) { |
| 108 | + byte[] bytes = delimiter.getBytes(charset); |
| 109 | + list.add(bytes); |
| 110 | + } |
| 111 | + return list; |
| 112 | + }); |
110 | 113 | }
|
111 | 114 |
|
112 | 115 | /**
|
113 | 116 | * Split the given data buffer on delimiter boundaries.
|
114 | 117 | * The returned Flux contains an {@link #END_FRAME} buffer after each delimiter.
|
115 | 118 | */
|
116 |
| - private List<DataBuffer> splitOnDelimiter(DataBuffer dataBuffer, List<byte[]> delimiterBytes) { |
| 119 | + private List<DataBuffer> splitOnDelimiter(DataBuffer buffer, List<byte[]> delimiterBytes) { |
117 | 120 | List<DataBuffer> frames = new ArrayList<>();
|
118 |
| - do { |
119 |
| - int length = Integer.MAX_VALUE; |
120 |
| - byte[] matchingDelimiter = null; |
121 |
| - for (byte[] delimiter : delimiterBytes) { |
122 |
| - int index = indexOf(dataBuffer, delimiter); |
123 |
| - if (index >= 0 && index < length) { |
124 |
| - length = index; |
125 |
| - matchingDelimiter = delimiter; |
| 121 | + try { |
| 122 | + do { |
| 123 | + int length = Integer.MAX_VALUE; |
| 124 | + byte[] matchingDelimiter = null; |
| 125 | + for (byte[] delimiter : delimiterBytes) { |
| 126 | + int index = indexOf(buffer, delimiter); |
| 127 | + if (index >= 0 && index < length) { |
| 128 | + length = index; |
| 129 | + matchingDelimiter = delimiter; |
| 130 | + } |
126 | 131 | }
|
127 |
| - } |
128 |
| - DataBuffer frame; |
129 |
| - int readPosition = dataBuffer.readPosition(); |
130 |
| - if (matchingDelimiter != null) { |
131 |
| - if (this.stripDelimiter) { |
132 |
| - frame = dataBuffer.slice(readPosition, length); |
| 132 | + DataBuffer frame; |
| 133 | + int readPosition = buffer.readPosition(); |
| 134 | + if (matchingDelimiter != null) { |
| 135 | + frame = this.stripDelimiter ? |
| 136 | + buffer.slice(readPosition, length) : |
| 137 | + buffer.slice(readPosition, length + matchingDelimiter.length); |
| 138 | + buffer.readPosition(readPosition + length + matchingDelimiter.length); |
| 139 | + frames.add(DataBufferUtils.retain(frame)); |
| 140 | + frames.add(END_FRAME); |
133 | 141 | }
|
134 | 142 | else {
|
135 |
| - frame = dataBuffer.slice(readPosition, length + matchingDelimiter.length); |
| 143 | + frame = buffer.slice(readPosition, buffer.readableByteCount()); |
| 144 | + buffer.readPosition(readPosition + buffer.readableByteCount()); |
| 145 | + frames.add(DataBufferUtils.retain(frame)); |
136 | 146 | }
|
137 |
| - dataBuffer.readPosition(readPosition + length + matchingDelimiter.length); |
138 |
| - |
139 |
| - frames.add(DataBufferUtils.retain(frame)); |
140 |
| - frames.add(END_FRAME); |
141 | 147 | }
|
142 |
| - else { |
143 |
| - frame = dataBuffer.slice(readPosition, dataBuffer.readableByteCount()); |
144 |
| - dataBuffer.readPosition(readPosition + dataBuffer.readableByteCount()); |
145 |
| - frames.add(DataBufferUtils.retain(frame)); |
| 148 | + while (buffer.readableByteCount() > 0); |
| 149 | + } |
| 150 | + catch (Throwable ex) { |
| 151 | + for (DataBuffer frame : frames) { |
| 152 | + DataBufferUtils.release(frame); |
146 | 153 | }
|
| 154 | + throw ex; |
| 155 | + } |
| 156 | + finally { |
| 157 | + DataBufferUtils.release(buffer); |
147 | 158 | }
|
148 |
| - while (dataBuffer.readableByteCount() > 0); |
149 |
| - |
150 |
| - DataBufferUtils.release(dataBuffer); |
151 | 159 | return frames;
|
152 | 160 | }
|
153 | 161 |
|
154 | 162 | /**
|
155 | 163 | * Find the given delimiter in the given data buffer.
|
156 | 164 | * @return the index of the delimiter, or -1 if not found.
|
157 | 165 | */
|
158 |
| - private static int indexOf(DataBuffer dataBuffer, byte[] delimiter) { |
159 |
| - for (int i = dataBuffer.readPosition(); i < dataBuffer.writePosition(); i++) { |
160 |
| - int dataBufferPos = i; |
| 166 | + private static int indexOf(DataBuffer buffer, byte[] delimiter) { |
| 167 | + for (int i = buffer.readPosition(); i < buffer.writePosition(); i++) { |
| 168 | + int bufferPos = i; |
161 | 169 | int delimiterPos = 0;
|
162 | 170 | while (delimiterPos < delimiter.length) {
|
163 |
| - if (dataBuffer.getByte(dataBufferPos) != delimiter[delimiterPos]) { |
| 171 | + if (buffer.getByte(bufferPos) != delimiter[delimiterPos]) { |
164 | 172 | break;
|
165 | 173 | }
|
166 | 174 | else {
|
167 |
| - dataBufferPos++; |
168 |
| - if (dataBufferPos == dataBuffer.writePosition() && |
169 |
| - delimiterPos != delimiter.length - 1) { |
| 175 | + bufferPos++; |
| 176 | + boolean endOfBuffer = bufferPos == buffer.writePosition(); |
| 177 | + boolean endOfDelimiter = delimiterPos == delimiter.length - 1; |
| 178 | + if (endOfBuffer && !endOfDelimiter) { |
170 | 179 | return -1;
|
171 | 180 | }
|
172 | 181 | }
|
173 | 182 | delimiterPos++;
|
174 | 183 | }
|
175 | 184 | if (delimiterPos == delimiter.length) {
|
176 |
| - return i - dataBuffer.readPosition(); |
| 185 | + return i - buffer.readPosition(); |
177 | 186 | }
|
178 | 187 | }
|
179 | 188 | return -1;
|
180 | 189 | }
|
181 | 190 |
|
182 |
| - /** |
183 |
| - * Check whether the given buffer is {@link #END_FRAME}. |
184 |
| - */ |
185 |
| - private static boolean isEndFrame(DataBuffer dataBuffer) { |
186 |
| - return dataBuffer == END_FRAME; |
187 |
| - } |
188 |
| - |
189 | 191 | /**
|
190 | 192 | * Join the given list of buffers into a single buffer.
|
191 | 193 | */
|
192 | 194 | private static DataBuffer joinUntilEndFrame(List<DataBuffer> dataBuffers) {
|
193 | 195 | if (!dataBuffers.isEmpty()) {
|
194 | 196 | int lastIdx = dataBuffers.size() - 1;
|
195 |
| - if (isEndFrame(dataBuffers.get(lastIdx))) { |
| 197 | + if (dataBuffers.get(lastIdx) == END_FRAME) { |
196 | 198 | dataBuffers.remove(lastIdx);
|
197 | 199 | }
|
198 | 200 | }
|
|
0 commit comments