Skip to content

Commit 1026b07

Browse files
authored
deprecate and relocate connector related endpoints in functions REST API (#4087)
* deprecate and relocate connector related endpoints in functions REST API * cleaning up * cleaning up
1 parent f81a711 commit 1026b07

File tree

11 files changed

+77
-15
lines changed

11 files changed

+77
-15
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,10 @@ public StreamingOutput downloadFunction(final @QueryParam("path") String path) {
400400
@ApiResponse(code = 408, message = "Request timeout")
401401
})
402402
@Path("/connectors")
403+
@Deprecated
404+
/**
405+
* Deprecated in favor of moving endpoint to {@link org.apache.pulsar.broker.admin.v2.Worker}
406+
*/
403407
public List<ConnectorDefinition> getConnectorsList() throws IOException {
404408
return functions.getListOfConnectors();
405409
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Worker.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@
2424
import lombok.extern.slf4j.Slf4j;
2525
import org.apache.pulsar.broker.admin.AdminResource;
2626
import org.apache.pulsar.common.functions.WorkerInfo;
27+
import org.apache.pulsar.common.io.ConnectorDefinition;
2728
import org.apache.pulsar.functions.worker.WorkerService;
2829
import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
2930

3031
import javax.ws.rs.GET;
3132
import javax.ws.rs.Path;
3233
import javax.ws.rs.Produces;
3334
import javax.ws.rs.core.MediaType;
35+
import java.io.IOException;
3436
import java.util.Collection;
3537
import java.util.List;
3638
import java.util.Map;
@@ -96,4 +98,20 @@ public WorkerInfo getClusterLeader() {
9698
public Map<String, Collection<String>> getAssignments() {
9799
return worker.getAssignments();
98100
}
101+
102+
@GET
103+
@ApiOperation(
104+
value = "Fetches a list of supported Pulsar IO connectors currently running in cluster mode",
105+
response = List.class
106+
)
107+
@ApiResponses(value = {
108+
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
109+
@ApiResponse(code = 400, message = "Invalid request"),
110+
@ApiResponse(code = 408, message = "Request timeout")
111+
})
112+
@Path("/connectors")
113+
@Produces(MediaType.APPLICATION_JSON)
114+
public List<ConnectorDefinition> getConnectorsList() throws IOException {
115+
return worker.getListOfConnectors();
116+
}
99117
}

pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,30 +379,39 @@ FunctionStats getFunctionStats(String tenant, String namespace, String function)
379379
void downloadFunction(String destinationFile, String path) throws PulsarAdminException;
380380

381381
/**
382+
* Deprecated in favor of getting sources and sinks for their own APIs
383+
*
382384
* Fetches a list of supported Pulsar IO connectors currently running in cluster mode
383385
*
384386
* @throws PulsarAdminException
385387
* Unexpected error
386388
*
387389
*/
390+
@Deprecated
388391
List<ConnectorDefinition> getConnectorsList() throws PulsarAdminException;
389392

390393
/**
394+
* Deprecated in favor of getting sources and sinks for their own APIs
395+
*
391396
* Fetches a list of supported Pulsar IO sources currently running in cluster mode
392397
*
393398
* @throws PulsarAdminException
394399
* Unexpected error
395400
*
396401
*/
402+
@Deprecated
397403
Set<String> getSources() throws PulsarAdminException;
398404

399405
/**
406+
* Deprecated in favor of getting sources and sinks for their own APIs
407+
*
400408
* Fetches a list of supported Pulsar IO sinks currently running in cluster mode
401409
*
402410
* @throws PulsarAdminException
403411
* Unexpected error
404412
*
405413
*/
414+
@Deprecated
406415
Set<String> getSinks() throws PulsarAdminException;
407416

408417
/**

pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/RestUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import lombok.AccessLevel;
2525
import lombok.NoArgsConstructor;
2626

27+
import javax.ws.rs.core.Response;
28+
2729
@NoArgsConstructor(access = AccessLevel.PRIVATE)
2830
public final class RestUtils {
2931

@@ -36,4 +38,8 @@ public static String createMessage(String message) {
3638
return createBaseMessage(message).toString();
3739
}
3840

41+
public static void throwUnavailableException() {
42+
throw new RestException(Response.Status.SERVICE_UNAVAILABLE,
43+
"Function worker service is not done initializing. " + "Please try again in a little while.");
44+
}
3945
}

pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,10 @@
114114
import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
115115
import static org.apache.pulsar.functions.utils.ComponentType.SINK;
116116
import static org.apache.pulsar.functions.utils.ComponentType.SOURCE;
117-
import static org.apache.pulsar.functions.utils.FunctionCommon.extractFileFromPkgURL;
118117
import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace;
119118
import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
120119
import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
120+
import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
121121

122122
@Slf4j
123123
public abstract class ComponentImpl {
@@ -1675,11 +1675,6 @@ private void validateTriggerRequestParams(final String tenant,
16751675
}
16761676
}
16771677

1678-
protected static void throwUnavailableException() {
1679-
throw new RestException(Status.SERVICE_UNAVAILABLE,
1680-
"Function worker service is not done initializing. " + "Please try again in a little while.");
1681-
}
1682-
16831678
private void throwStateStoreUnvailableResponse() {
16841679
throw new RestException(Status.SERVICE_UNAVAILABLE,
16851680
"State storage client is not done initializing. " + "Please try again in a little while.");

pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import java.util.List;
4242
import java.util.function.Supplier;
4343

44+
import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
45+
4446
@Slf4j
4547
public class SinkImpl extends ComponentImpl {
4648

pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import java.util.List;
4242
import java.util.function.Supplier;
4343

44+
import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
45+
4446
@Slf4j
4547
public class SourceImpl extends ComponentImpl {
4648
private class GetSourceStatus extends GetStatus<SourceStatus, SourceStatus.SourceInstanceStatus.SourceInstanceStatusData> {

pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import lombok.extern.slf4j.Slf4j;
2222
import org.apache.pulsar.common.functions.WorkerInfo;
23+
import org.apache.pulsar.common.io.ConnectorDefinition;
2324
import org.apache.pulsar.common.policies.data.ErrorData;
2425
import org.apache.pulsar.common.policies.data.FunctionStats;
2526
import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
@@ -45,6 +46,7 @@
4546
import java.util.function.Supplier;
4647

4748
import static com.google.common.base.Preconditions.checkNotNull;
49+
import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
4850

4951
@Slf4j
5052
public class WorkerImpl {
@@ -77,17 +79,16 @@ private boolean isWorkerServiceAvailable() {
7779

7880
public List<WorkerInfo> getCluster() {
7981
if (!isWorkerServiceAvailable()) {
80-
throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
82+
throwUnavailableException();
8183
}
8284
List<WorkerInfo> workers = worker().getMembershipManager().getCurrentMembership();
8385
return workers;
8486
}
8587

8688
public WorkerInfo getClusterLeader() {
8789
if (!isWorkerServiceAvailable()) {
88-
throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
90+
throwUnavailableException();
8991
}
90-
9192
MembershipManager membershipManager = worker().getMembershipManager();
9293
WorkerInfo leader = membershipManager.getLeader();
9394

@@ -99,9 +100,8 @@ public WorkerInfo getClusterLeader() {
99100
}
100101

101102
public Map<String, Collection<String>> getAssignments() {
102-
103103
if (!isWorkerServiceAvailable()) {
104-
throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
104+
throwUnavailableException();
105105
}
106106

107107
FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
@@ -128,9 +128,7 @@ public List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics(final Strin
128128

129129
private List<org.apache.pulsar.common.stats.Metrics> getWorkerMetrics() {
130130
if (!isWorkerServiceAvailable()) {
131-
throw new WebApplicationException(
132-
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
133-
.entity(new ErrorData("Function worker service is not available")).build());
131+
throwUnavailableException();
134132
}
135133
return worker().getMetricsGenerator().generate();
136134
}
@@ -146,7 +144,7 @@ public List<WorkerFunctionInstanceStats> getFunctionsMetrics(String clientRole)
146144

147145
private List<WorkerFunctionInstanceStats> getFunctionsMetrics() throws IOException {
148146
if (!isWorkerServiceAvailable()) {
149-
throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
147+
throwUnavailableException();
150148
}
151149

152150
WorkerService workerService = worker();
@@ -184,4 +182,12 @@ private List<WorkerFunctionInstanceStats> getFunctionsMetrics() throws IOExcepti
184182
}
185183
return metricsList;
186184
}
185+
186+
public List<ConnectorDefinition> getListOfConnectors() {
187+
if (!isWorkerServiceAvailable()) {
188+
throwUnavailableException();
189+
}
190+
191+
return this.worker().getConnectorsManager().getConnectors();
192+
}
187193
}

pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,10 @@ public Response downloadFunction(final @QueryParam("path") String path) {
311311
@ApiResponse(code = 408, message = "Request timeout")
312312
})
313313
@Path("/connectors")
314+
/**
315+
* Deprecated in favor of moving endpoint to {@link org.apache.pulsar.broker.admin.v2.Worker}
316+
*/
317+
@Deprecated
314318
public List<ConnectorDefinition> getConnectorsList() throws IOException {
315319
return functions.getListOfConnectors();
316320
}

pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/WorkerApiV2Resource.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import lombok.extern.slf4j.Slf4j;
2626
import org.apache.pulsar.broker.web.AuthenticationFilter;
2727
import org.apache.pulsar.common.functions.WorkerInfo;
28+
import org.apache.pulsar.common.io.ConnectorDefinition;
2829
import org.apache.pulsar.functions.worker.WorkerService;
2930
import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;
3031

@@ -36,6 +37,7 @@
3637
import javax.ws.rs.Produces;
3738
import javax.ws.rs.core.Context;
3839
import javax.ws.rs.core.MediaType;
40+
import java.io.IOException;
3941
import java.util.Collection;
4042
import java.util.List;
4143
import java.util.Map;
@@ -120,4 +122,14 @@ public WorkerInfo getClusterLeader() {
120122
public Map<String, Collection<String>> getAssignments() {
121123
return worker.getAssignments();
122124
}
125+
126+
@ApiResponses(value = {
127+
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
128+
@ApiResponse(code = 400, message = "Invalid request"),
129+
@ApiResponse(code = 408, message = "Request timeout")
130+
})
131+
@Path("/connectors")
132+
public List<ConnectorDefinition> getConnectorsList() throws IOException {
133+
return worker.getListOfConnectors();
134+
}
123135
}

pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,10 @@ public StreamingOutput downloadFunction(final @QueryParam("path") String path) {
306306

307307
@GET
308308
@Path("/connectors")
309+
/**
310+
* Deprecated in favor of moving endpoint to {@link org.apache.pulsar.broker.admin.v2.Worker}
311+
*/
312+
@Deprecated
309313
public List<ConnectorDefinition> getConnectorsList() throws IOException {
310314
return functions.getListOfConnectors();
311315
}

0 commit comments

Comments
 (0)