Skip to content

Commit 89a6f7e

Browse files
authored
Fix http events sink (#3042)
1 parent 686e647 commit 89a6f7e

File tree

3 files changed

+81
-80
lines changed

3 files changed

+81
-80
lines changed

app/src/main/java/io/apicurio/registry/events/http/HttpEventSink.java

+32-34
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,19 @@
1515
*/
1616
package io.apicurio.registry.events.http;
1717

18-
import java.util.UUID;
19-
import javax.enterprise.context.ApplicationScoped;
20-
import javax.inject.Inject;
21-
import javax.ws.rs.core.MediaType;
22-
23-
import io.vertx.core.http.HttpMethod;
24-
import io.vertx.core.http.RequestOptions;
25-
import org.slf4j.Logger;
2618
import io.apicurio.registry.events.EventSink;
27-
import io.vertx.core.Vertx;
2819
import io.vertx.core.buffer.Buffer;
2920
import io.vertx.core.eventbus.Message;
30-
import io.vertx.core.http.HttpClient;
31-
import io.vertx.core.http.HttpClientOptions;
21+
import org.slf4j.Logger;
22+
23+
import javax.enterprise.context.ApplicationScoped;
24+
import javax.inject.Inject;
25+
import javax.ws.rs.core.MediaType;
26+
import java.net.URI;
27+
import java.net.http.HttpClient;
28+
import java.net.http.HttpRequest;
29+
import java.net.http.HttpResponse;
30+
import java.util.UUID;
3231

3332
/**
3433
* @author Fabian Martinez
@@ -44,9 +43,6 @@ public class HttpEventSink implements EventSink {
4443
@Inject
4544
HttpSinksConfiguration sinksConfiguration;
4645

47-
@Inject
48-
Vertx vertx;
49-
5046
@Override
5147
public String name() {
5248
return "HTTP Sink";
@@ -73,33 +69,35 @@ public void handle(Message<Buffer> message) {
7369
private void sendEventHttp(String type, HttpSinkConfiguration httpSink, Buffer data) {
7470
try {
7571
log.debug("Sending event to sink " + httpSink.getName());
76-
getHttpClient()
77-
.request(new RequestOptions()
78-
.setMethod(HttpMethod.POST)
79-
.setURI(httpSink.getEndpoint())
80-
.putHeader("ce-id", UUID.randomUUID().toString())
81-
.putHeader("ce-specversion", "1.0")
82-
.putHeader("ce-source", "apicurio-registry")
83-
.putHeader("ce-type", type)
84-
.putHeader("content-type", MediaType.APPLICATION_JSON),
85-
ar -> {
86-
if (ar.succeeded()) {
87-
ar.result()
88-
.exceptionHandler(ex -> log.error("Error sending event to " + httpSink.getEndpoint(), ex))
89-
.end(data);
90-
} else {
91-
log.error("Error sending event to " + httpSink.getEndpoint(), ar.cause());
92-
}
93-
});
72+
73+
final HttpRequest eventRequest = HttpRequest.newBuilder()
74+
.uri(URI.create(httpSink.getEndpoint()))
75+
.version(HttpClient.Version.HTTP_1_1)
76+
.header("ce-id", UUID.randomUUID().toString())
77+
.header("ce-specversion", "1.0")
78+
.header("ce-source", "apicurio-registry")
79+
.header("ce-type", type)
80+
.header("content-type", MediaType.APPLICATION_JSON)
81+
.POST(HttpRequest.BodyPublishers.ofByteArray(data.getBytes()))
82+
.build();
83+
84+
final HttpResponse<String> eventResponse = getHttpClient()
85+
.send(eventRequest, HttpResponse.BodyHandlers.ofString());
86+
87+
if (eventResponse.statusCode() != 200) {
88+
log.warn("Error sending http event: {}", eventResponse.body());
89+
}
90+
91+
9492
} catch (Exception e) {
9593
log.error("Error sending http event", e);
9694
}
9795
}
9896

9997
private synchronized HttpClient getHttpClient() {
10098
if (httpClient == null) {
101-
httpClient = vertx.createHttpClient(new HttpClientOptions()
102-
.setConnectTimeout(15 * 1000));
99+
httpClient = HttpClient.newBuilder()
100+
.build();
103101
}
104102
return httpClient;
105103
}
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
package io.apicurio.registry.events;
22

3+
import io.quarkus.test.junit.QuarkusTestProfile;
4+
35
import java.util.Collections;
46
import java.util.Map;
57

6-
import io.quarkus.test.junit.QuarkusTestProfile;
7-
88
public class HttpEventsProfile implements QuarkusTestProfile {
99

1010
@Override
1111
public Map<String, String> getConfigOverrides() {
12-
return Collections.singletonMap("registry.events.sink.testsink", "http://localhost:8888/");
12+
return Collections.singletonMap("registry.events.sink.testsink", "http://localhost:8976/");
1313
}
1414

1515
}
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,75 @@
11
package io.apicurio.registry.events;
22

3-
import static org.junit.jupiter.api.Assertions.assertLinesMatch;
4-
5-
import java.io.InputStream;
6-
import java.util.Arrays;
7-
import java.util.List;
8-
import java.util.concurrent.CopyOnWriteArrayList;
9-
import java.util.concurrent.TimeUnit;
10-
import java.util.concurrent.TimeoutException;
11-
12-
import org.junit.jupiter.api.Assertions;
13-
import org.junit.jupiter.api.Test;
14-
import org.junit.jupiter.api.Timeout;
15-
import org.junit.jupiter.api.Disabled;
16-
173
import io.apicurio.registry.AbstractResourceTestBase;
184
import io.apicurio.registry.events.dto.RegistryEventType;
195
import io.apicurio.registry.types.ArtifactType;
206
import io.apicurio.registry.utils.IoUtil;
7+
import io.apicurio.registry.utils.tests.ApicurioTestTags;
218
import io.apicurio.registry.utils.tests.TestUtils;
229
import io.quarkus.test.junit.QuarkusTest;
2310
import io.quarkus.test.junit.TestProfile;
2411
import io.vertx.core.Vertx;
2512
import io.vertx.core.http.HttpServer;
2613
import io.vertx.core.http.HttpServerOptions;
14+
import org.junit.jupiter.api.Assertions;
15+
import org.junit.jupiter.api.Tag;
16+
import org.junit.jupiter.api.Test;
17+
import org.junit.jupiter.api.Timeout;
18+
19+
import java.io.InputStream;
20+
import java.time.Duration;
21+
import java.util.Arrays;
22+
import java.util.List;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.CopyOnWriteArrayList;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.TimeoutException;
27+
28+
import static org.junit.jupiter.api.Assertions.assertLinesMatch;
2729

2830
@QuarkusTest
2931
@TestProfile(HttpEventsProfile.class)
30-
@Disabled
32+
@Tag(ApicurioTestTags.SLOW)
3133
public class HttpEventsTest extends AbstractResourceTestBase {
3234

3335
@Test
3436
@Timeout(value = 65, unit = TimeUnit.SECONDS)
3537
public void testHttpEvents() throws TimeoutException {
3638

37-
HttpServer server = null;
38-
try {
39-
List<String> events = new CopyOnWriteArrayList<>();
40-
server = Vertx.vertx().createHttpServer(new HttpServerOptions().setPort(8888))
41-
.requestHandler(req -> {
42-
events.add(req.headers().get("ce-type"));
43-
req.response().setStatusCode(200).end();
44-
}).listen(ar -> {
45-
if (ar.succeeded()) {
39+
CompletableFuture<HttpServer> serverFuture = new CompletableFuture<>();
40+
List<String> events = new CopyOnWriteArrayList<>();
4641

47-
InputStream jsonSchema = getClass().getResourceAsStream("/io/apicurio/registry/util/json-schema.json");
48-
Assertions.assertNotNull(jsonSchema);
49-
String content = IoUtil.toString(jsonSchema);
42+
HttpServer server = Vertx.vertx().createHttpServer(new HttpServerOptions()
43+
.setPort(8976))
44+
.requestHandler(req -> {
45+
events.add(req.headers().get("ce-type"));
46+
req.response().setStatusCode(200).end();
47+
})
48+
.listen(createdServer -> {
49+
if (createdServer.succeeded()) {
50+
serverFuture.complete(createdServer.result());
51+
} else {
52+
serverFuture.completeExceptionally(createdServer.cause());
53+
}
54+
});
5055

51-
String artifactId = TestUtils.generateArtifactId();
56+
TestUtils.waitFor("proxy is ready", Duration.ofSeconds(1).toMillis(), Duration.ofSeconds(30).toMillis(), serverFuture::isDone);
5257

53-
try {
54-
createArtifact(artifactId, ArtifactType.JSON, content);
55-
createArtifactVersion(artifactId, ArtifactType.JSON, content);
56-
} catch ( Exception e ) {
57-
Assertions.fail(e);
58-
}
58+
try {
59+
InputStream jsonSchema = getClass().getResourceAsStream("/io/apicurio/registry/util/json-schema.json");
60+
Assertions.assertNotNull(jsonSchema);
61+
String content = IoUtil.toString(jsonSchema);
5962

60-
} else {
61-
Assertions.fail(ar.cause());
62-
}
63-
});
63+
String artifactId = TestUtils.generateArtifactId();
6464

65-
TestUtils.waitFor("Events to be produced", 200, 60 * 1000, () -> {
66-
return events.size() == 2;
67-
});
65+
try {
66+
createArtifact(artifactId, ArtifactType.JSON, content);
67+
createArtifactVersion(artifactId, ArtifactType.JSON, content);
68+
} catch (Exception e) {
69+
Assertions.fail(e);
70+
}
71+
72+
TestUtils.waitFor("Events to be produced", 200, 60 * 1000, () -> events.size() == 2);
6873

6974
assertLinesMatch(
7075
Arrays.asList(RegistryEventType.ARTIFACT_CREATED.cloudEventType(), RegistryEventType.ARTIFACT_UPDATED.cloudEventType()),
@@ -74,7 +79,5 @@ public void testHttpEvents() throws TimeoutException {
7479
server.close();
7580
}
7681
}
77-
7882
}
79-
8083
}

0 commit comments

Comments
 (0)