Skip to content

Commit 00b69dd

Browse files
committed
Don't over-allocate in HeapBufferedAsyncEntityConsumer in order to consume the response
Signed-off-by: Andriy Redko <[email protected]>
1 parent 18ac060 commit 00b69dd

File tree

3 files changed

+89
-6
lines changed

3 files changed

+89
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
7171
### Fixed
7272
- Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827))
7373
- Fix compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944))
74+
- Don't over-allocate in HeapBufferedAsyncEntityConsumer in order to consume the response ([#9993](https://github.com/opensearch-project/OpenSearch/pull/9993))
7475

7576
### Security
7677

client/rest/src/main/java/org/opensearch/client/nio/HeapBufferedAsyncEntityConsumer.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,25 +86,25 @@ protected void data(final ByteBuffer src, final boolean endOfStream) throws IOEx
8686
return;
8787
}
8888

89+
int len = src.limit();
90+
if (len < 0) {
91+
len = 4096;
92+
}
93+
8994
ByteArrayBuffer buffer = bufferRef.get();
9095
if (buffer == null) {
91-
buffer = new ByteArrayBuffer(bufferLimitBytes);
96+
buffer = new ByteArrayBuffer(len);
9297
if (bufferRef.compareAndSet(null, buffer) == false) {
9398
buffer = bufferRef.get();
9499
}
95100
}
96101

97-
int len = src.limit();
98102
if (buffer.length() + len > bufferLimitBytes) {
99103
throw new ContentTooLongException(
100104
"entity content is too long [" + len + "] for the configured buffer limit [" + bufferLimitBytes + "]"
101105
);
102106
}
103107

104-
if (len < 0) {
105-
len = 4096;
106-
}
107-
108108
if (src.hasArray()) {
109109
buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining());
110110
} else {
@@ -136,4 +136,12 @@ public void releaseResources() {
136136
buffer = null;
137137
}
138138
}
139+
140+
/**
141+
* Gets current byte buffer instance
142+
* @return byte buffer instance
143+
*/
144+
ByteArrayBuffer getBuffer() {
145+
return bufferRef.get();
146+
}
139147
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.client.nio;
10+
11+
import com.carrotsearch.randomizedtesting.RandomizedTest;
12+
13+
import org.apache.hc.core5.http.ContentTooLongException;
14+
import org.junit.After;
15+
import org.junit.Before;
16+
import org.junit.Test;
17+
18+
import java.io.IOException;
19+
import java.nio.ByteBuffer;
20+
21+
import static org.hamcrest.CoreMatchers.equalTo;
22+
import static org.hamcrest.MatcherAssert.assertThat;
23+
import static org.junit.Assert.assertThrows;
24+
25+
public class HeapBufferedAsyncEntityConsumerTest extends RandomizedTest {
26+
private static final int BUFFER_LIMIT = 100 * 1024 * 1024 /* 100Mb */;
27+
private HeapBufferedAsyncEntityConsumer consumer;
28+
29+
@Before
30+
public void setUp() {
31+
consumer = new HeapBufferedAsyncEntityConsumer(BUFFER_LIMIT);
32+
}
33+
34+
@After
35+
public void tearDown() {
36+
consumer.releaseResources();
37+
}
38+
39+
@Test
40+
public void consumerAllocatesBufferLimit() throws IOException {
41+
consumer.consume(randomByteBufferOfLength(1000).flip());
42+
assertThat(consumer.getBuffer().capacity(), equalTo(1000));
43+
}
44+
45+
@Test
46+
public void consumerAllocatesEmptyBuffer() throws IOException {
47+
consumer.consume(ByteBuffer.allocate(0).flip());
48+
assertThat(consumer.getBuffer().capacity(), equalTo(0));
49+
}
50+
51+
@Test
52+
public void consumerExpandsBufferLimits() throws IOException {
53+
consumer.consume(randomByteBufferOfLength(1000).flip());
54+
consumer.consume(randomByteBufferOfLength(2000).flip());
55+
consumer.consume(randomByteBufferOfLength(3000).flip());
56+
assertThat(consumer.getBuffer().capacity(), equalTo(6000));
57+
}
58+
59+
@Test
60+
public void consumerAllocateLimit() throws IOException {
61+
consumer.consume(randomByteBufferOfLength(BUFFER_LIMIT).flip());
62+
assertThat(consumer.getBuffer().capacity(), equalTo(BUFFER_LIMIT));
63+
}
64+
65+
@Test
66+
public void consumerFailsToOverAllocateLimit() throws IOException {
67+
consumer.consume(randomByteBufferOfLength(BUFFER_LIMIT).flip());
68+
assertThrows(ContentTooLongException.class, () -> consumer.consume(randomByteBufferOfLength(1).flip()));
69+
}
70+
71+
private ByteBuffer randomByteBufferOfLength(int length) {
72+
return ByteBuffer.allocate(length).put(randomBytesOfLength(length));
73+
}
74+
}

0 commit comments

Comments
 (0)