Skip to content

Commit 901674a

Browse files
Add Eureka data-source extension (alibaba#1502)
1 parent 13de916 commit 901674a

File tree

7 files changed

+475
-0
lines changed

7 files changed

+475
-0
lines changed

sentinel-extension/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
<module>sentinel-datasource-spring-cloud-config</module>
2323
<module>sentinel-datasource-consul</module>
2424
<module>sentinel-datasource-etcd</module>
25+
<module>sentinel-datasource-eureka</module>
2526
<module>sentinel-annotation-cdi-interceptor</module>
2627
</modules>
2728

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Sentinel DataSource Eureka
2+
3+
Sentinel DataSource Eureka provides integration with [Eureka](https://github.com/Netflix/eureka) so that Eureka
4+
can be the dynamic rule data source of Sentinel.
5+
6+
To use Sentinel DataSource Eureka, you should add the following dependency:
7+
8+
```xml
9+
<dependency>
10+
<groupId>com.alibaba.csp</groupId>
11+
<artifactId>sentinel-datasource-eureka</artifactId>
12+
<version>x.y.z</version>
13+
</dependency>
14+
```
15+
16+
Then you can create an `EurekaDataSource` and register to rule managers.
17+
18+
SDK usage:
19+
20+
```java
21+
EurekaDataSource<List<FlowRule>> eurekaDataSource = new EurekaDataSource("app-id", "instance-id",
22+
Arrays.asList("http://localhost:8761/eureka", "http://localhost:8762/eureka", "http://localhost:8763/eureka"),
23+
"rule-key", new Converter<String, List<FlowRule>>() {
24+
@Override
25+
public List<FlowRule> convert(String o) {
26+
return JSON.parseObject(o, new TypeReference<List<FlowRule>>() {
27+
});
28+
}
29+
});
30+
FlowRuleManager.register2Property(eurekaDataSource.getProperty());
31+
```
32+
33+
Example for Spring Cloud Application:
34+
35+
```java
36+
@Bean
37+
public EurekaDataSource<List<FlowRule>> eurekaDataSource(EurekaInstanceConfig eurekaInstanceConfig, EurekaClientConfig eurekaClientConfig) {
38+
39+
List<String> serviceUrls = EndpointUtils.getServiceUrlsFromConfig(eurekaClientConfig,
40+
eurekaInstanceConfig.getMetadataMap().get("zone"), eurekaClientConfig.shouldPreferSameZoneEureka());
41+
42+
EurekaDataSource<List<FlowRule>> eurekaDataSource = new EurekaDataSource(eurekaInstanceConfig.getAppname(),
43+
eurekaInstanceConfig.getInstanceId(), serviceUrls, "flowrules", new Converter<String, List<FlowRule>>() {
44+
@Override
45+
public List<FlowRule> convert(String o) {
46+
return JSON.parseObject(o, new TypeReference<List<FlowRule>>() {
47+
});
48+
}
49+
});
50+
51+
FlowRuleManager.register2Property(eurekaDataSource.getProperty());
52+
return eurekaDataSource;
53+
}
54+
55+
```
56+
57+
To refresh the rule dynamically,you need to call [Eureka-REST-operations](https://github.com/Netflix/eureka/wiki/Eureka-REST-operations)
58+
to update instance metadata:
59+
60+
```
61+
PUT /eureka/apps/{appID}/{instanceID}/metadata?{ruleKey}={json of the rules}
62+
```
63+
64+
Note: don't forget to encode your json string in the url.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sentinel-extension</artifactId>
7+
<groupId>com.alibaba.csp</groupId>
8+
<version>1.8.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>sentinel-datasource-eureka</artifactId>
13+
14+
<properties>
15+
<spring.cloud.version>2.1.2.RELEASE</spring.cloud.version>
16+
</properties>
17+
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>com.alibaba.csp</groupId>
22+
<artifactId>sentinel-datasource-extension</artifactId>
23+
</dependency>
24+
25+
<dependency>
26+
<groupId>com.alibaba</groupId>
27+
<artifactId>fastjson</artifactId>
28+
</dependency>
29+
30+
<dependency>
31+
<groupId>junit</groupId>
32+
<artifactId>junit</artifactId>
33+
<scope>test</scope>
34+
</dependency>
35+
36+
<dependency>
37+
<groupId>org.awaitility</groupId>
38+
<artifactId>awaitility</artifactId>
39+
<scope>test</scope>
40+
</dependency>
41+
42+
<dependency>
43+
<groupId>org.springframework.boot</groupId>
44+
<artifactId>spring-boot-starter-test</artifactId>
45+
<version>${spring.cloud.version}</version>
46+
<scope>test</scope>
47+
</dependency>
48+
49+
<dependency>
50+
<groupId>org.springframework.cloud</groupId>
51+
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
52+
<version>${spring.cloud.version}</version>
53+
<scope>test</scope>
54+
<exclusions>
55+
<exclusion>
56+
<groupId>com.google.code.gson</groupId>
57+
<artifactId>gson</artifactId>
58+
</exclusion>
59+
</exclusions>
60+
</dependency>
61+
62+
63+
</dependencies>
64+
65+
66+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* Copyright 1999-2018 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.alibaba.csp.sentinel.datasource.eureka;
17+
18+
import com.alibaba.csp.sentinel.datasource.AutoRefreshDataSource;
19+
import com.alibaba.csp.sentinel.datasource.Converter;
20+
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
21+
import com.alibaba.csp.sentinel.log.RecordLog;
22+
import com.alibaba.csp.sentinel.util.AssertUtil;
23+
import com.alibaba.csp.sentinel.util.StringUtil;
24+
import com.alibaba.fastjson.JSON;
25+
26+
import java.io.*;
27+
import java.net.HttpURLConnection;
28+
import java.net.InetAddress;
29+
import java.net.URL;
30+
import java.util.ArrayList;
31+
import java.util.Collections;
32+
import java.util.List;
33+
34+
/**
35+
* <p>
36+
* A {@link ReadableDataSource} based on Eureka. This class will automatically
37+
* fetches the metadata of the instance every period.
38+
* </p>
39+
* <p>
40+
* Limitations: Default refresh interval is 10s. Because there is synchronization between eureka servers,
41+
* it may take longer to take effect.
42+
* </p>
43+
*
44+
* @author: liyang
45+
* @create: 2020-05-23 12:01
46+
*/
47+
public class EurekaDataSource<T> extends AutoRefreshDataSource<String, T> {
48+
49+
private static final long DEFAULT_REFRESH_MS = 10000;
50+
51+
/**
52+
* connect timeout: 3s
53+
*/
54+
private static final int DEFAULT_CONNECT_TIMEOUT_MS = 3000;
55+
56+
/**
57+
* read timeout: 30s
58+
*/
59+
private static final int DEFAULT_READ_TIMEOUT_MS = 30000;
60+
61+
62+
private int connectTimeoutMills;
63+
64+
65+
private int readTimeoutMills;
66+
67+
/**
68+
* eureka instance appid
69+
*/
70+
private String appId;
71+
/**
72+
* eureka instance id
73+
*/
74+
private String instanceId;
75+
76+
/**
77+
* collect of eureka server urls
78+
*/
79+
private List<String> serviceUrls;
80+
81+
/**
82+
* metadata key of the rule source
83+
*/
84+
private String ruleKey;
85+
86+
87+
public EurekaDataSource(String appId, String instanceId, List<String> serviceUrls, String ruleKey,
88+
Converter<String, T> configParser) {
89+
this(appId, instanceId, serviceUrls, ruleKey, configParser, DEFAULT_REFRESH_MS, DEFAULT_CONNECT_TIMEOUT_MS, DEFAULT_READ_TIMEOUT_MS);
90+
}
91+
92+
93+
public EurekaDataSource(String appId, String instanceId, List<String> serviceUrls, String ruleKey,
94+
Converter<String, T> configParser, long refreshMs, int connectTimeoutMills,
95+
int readTimeoutMills) {
96+
super(configParser, refreshMs);
97+
AssertUtil.notNull(appId, "appId can't be null");
98+
AssertUtil.notNull(instanceId, "instanceId can't be null");
99+
AssertUtil.assertNotEmpty(serviceUrls, "serviceUrls can't be empty");
100+
AssertUtil.notNull(ruleKey, "ruleKey can't be null");
101+
AssertUtil.assertState(connectTimeoutMills > 0, "connectTimeoutMills must be greater than 0");
102+
AssertUtil.assertState(readTimeoutMills > 0, "readTimeoutMills must be greater than 0");
103+
104+
this.appId = appId;
105+
this.instanceId = instanceId;
106+
this.serviceUrls = ensureEndWithSlash(serviceUrls);
107+
AssertUtil.assertNotEmpty(this.serviceUrls, "No available service url");
108+
this.ruleKey = ruleKey;
109+
this.connectTimeoutMills = connectTimeoutMills;
110+
this.readTimeoutMills = readTimeoutMills;
111+
}
112+
113+
114+
private List<String> ensureEndWithSlash(List<String> serviceUrls) {
115+
List<String> newServiceUrls = new ArrayList<>();
116+
for (String serviceUrl : serviceUrls) {
117+
if (StringUtil.isBlank(serviceUrl)) {
118+
continue;
119+
}
120+
if (!serviceUrl.endsWith("/")) {
121+
serviceUrl = serviceUrl + "/";
122+
}
123+
newServiceUrls.add(serviceUrl);
124+
}
125+
return newServiceUrls;
126+
}
127+
128+
@Override
129+
public String readSource() throws Exception {
130+
return fetchStringSourceFromEurekaMetadata(this.appId, this.instanceId, this.serviceUrls, ruleKey);
131+
}
132+
133+
134+
private String fetchStringSourceFromEurekaMetadata(String appId, String instanceId, List<String> serviceUrls,
135+
String ruleKey) throws Exception {
136+
List<String> shuffleUrls = new ArrayList<>(serviceUrls.size());
137+
shuffleUrls.addAll(serviceUrls);
138+
Collections.shuffle(shuffleUrls);
139+
for (int i = 0; i < shuffleUrls.size(); i++) {
140+
String serviceUrl = shuffleUrls.get(i) + String.format("apps/%s/%s", appId, instanceId);
141+
HttpURLConnection conn = null;
142+
try {
143+
conn = (HttpURLConnection) new URL(serviceUrl).openConnection();
144+
conn.addRequestProperty("Accept", "application/json;charset=utf-8");
145+
146+
conn.setConnectTimeout(connectTimeoutMills);
147+
conn.setReadTimeout(readTimeoutMills);
148+
conn.setRequestMethod("GET");
149+
conn.setDoOutput(true);
150+
conn.connect();
151+
RecordLog.debug("[EurekaDataSource] Request from eureka server: " + serviceUrl);
152+
if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
153+
String s = toString(conn.getInputStream());
154+
String ruleString = JSON.parseObject(s)
155+
.getJSONObject("instance")
156+
.getJSONObject("metadata")
157+
.getString(ruleKey);
158+
return ruleString;
159+
}
160+
RecordLog.warn("[EurekaDataSource] Warn: retrying on another server if available " +
161+
"due to response code: {}, response message: {}", conn.getResponseCode(), toString(conn.getErrorStream()));
162+
} catch (Exception e) {
163+
try {
164+
if (conn != null) {
165+
RecordLog.warn("[EurekaDataSource] Warn: failed to request " + conn.getURL() + " from "
166+
+ InetAddress.getByName(conn.getURL().getHost()).getHostAddress(), e);
167+
}
168+
} catch (Exception e1) {
169+
RecordLog.warn("[EurekaDataSource] Warn: failed to request ", e1);
170+
//ignore
171+
}
172+
RecordLog.warn("[EurekaDataSource] Warn: failed to request,retrying on another server if available");
173+
174+
} finally {
175+
if (conn != null) {
176+
conn.disconnect();
177+
}
178+
}
179+
}
180+
throw new EurekaMetadataFetchException("Can't get any data");
181+
}
182+
183+
184+
public static class EurekaMetadataFetchException extends Exception {
185+
186+
public EurekaMetadataFetchException(String message) {
187+
super(message);
188+
}
189+
}
190+
191+
192+
private String toString(InputStream input) throws IOException {
193+
if (input == null) {
194+
return null;
195+
}
196+
InputStreamReader inputStreamReader = new InputStreamReader(input, "utf-8");
197+
CharArrayWriter sw = new CharArrayWriter();
198+
copy(inputStreamReader, sw);
199+
return sw.toString();
200+
}
201+
202+
private long copy(Reader input, Writer output) throws IOException {
203+
char[] buffer = new char[1 << 12];
204+
long count = 0;
205+
for (int n = 0; (n = input.read(buffer)) >= 0; ) {
206+
output.write(buffer, 0, n);
207+
count += n;
208+
}
209+
return count;
210+
}
211+
212+
213+
}

0 commit comments

Comments
 (0)