Skip to content

Refactor flow algorithm for parameter flow control and support more traffic shaping mode #677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,27 @@ public class ParamFlowQpsDemo {

private static final String RESOURCE_KEY = "resA";

public static void main(String[] args) {
initHotParamFlowRules();
public static void main(String[] args) throws Exception {
initParamFlowRules();

final int threadCount = 8;
final int threadCount = 20;
ParamFlowQpsRunner<Integer> runner = new ParamFlowQpsRunner<>(PARAMS, RESOURCE_KEY, threadCount, 120);
runner.simulateTraffic();
runner.tick();

Thread.sleep(1000);
runner.simulateTraffic();
}

private static void initHotParamFlowRules() {
private static void initParamFlowRules() {
// QPS mode, threshold is 5 for every frequent "hot spot" parameter in index 0 (the first arg).
ParamFlowRule rule = new ParamFlowRule(RESOURCE_KEY)
.setParamIdx(0)
.setGrade(RuleConstant.FLOW_GRADE_QPS)
//.setDurationInSec(3)
//.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)
//.setMaxQueueingTimeMs(600)
.setCount(5);

// We can set threshold count for specific parameter value individually.
// Here we add an exception item. That means: QPS threshold of entries with parameter `PARAM_B` (type: int)
// in index 0 will be 10, rather than the global threshold (5).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class ParamFlowQpsRunner<T> {
private final int threadCount;

private final Map<T, AtomicLong> passCountMap = new ConcurrentHashMap<>();
private final Map<T, AtomicLong> blockCountMap = new ConcurrentHashMap<>();

private volatile boolean stop = false;

Expand All @@ -59,6 +60,7 @@ public ParamFlowQpsRunner(T[] params, String resourceName, int threadCount, int
for (T param : params) {
assertTrue(param != null, "Parameters should not be null");
passCountMap.putIfAbsent(param, new AtomicLong());
blockCountMap.putIfAbsent(param, new AtomicLong());
}
}

Expand Down Expand Up @@ -94,20 +96,28 @@ void tick() {

private void passFor(T param) {
passCountMap.get(param).incrementAndGet();
// System.out.println(String.format("Parameter <%s> passed at: %d", param, TimeUtil.currentTimeMillis()));
}

private void blockFor(T param) {
blockCountMap.get(param).incrementAndGet();
}

final class RunTask implements Runnable {

@Override
public void run() {

while (!stop) {
Entry entry = null;
T param = generateParam();
try {
entry = SphU.entry(resourceName, EntryType.IN, 1, param);
// Add pass for parameter.
passFor(param);
} catch (BlockException e1) {
} catch (BlockException e) {
// block.incrementAndGet();
blockFor(param);
} catch (Exception ex) {
// biz exception
ex.printStackTrace();
Expand All @@ -118,15 +128,19 @@ public void run() {
}
}

try {
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(0, 10));
} catch (InterruptedException e) {
// ignore
}
sleep(ThreadLocalRandom.current().nextInt(0, 10));
}
}
}

private void sleep(int timeMs) {
try {
TimeUnit.MILLISECONDS.sleep(timeMs);
} catch (InterruptedException e) {
// ignore
}
}

final class TimerTask implements Runnable {
@Override
public void run() {
Expand All @@ -139,21 +153,19 @@ public void run() {
map.putIfAbsent(param, 0L);
}
while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
sleep(1000);

// There may be a mismatch for time window of internal sliding window.
// See corresponding `metrics.log` for accurate statistic log.
for (T param : params) {
long globalPass = passCountMap.get(param).get();
long oldPass = map.get(param);
long oneSecondPass = globalPass - oldPass;
map.put(param, globalPass);
System.out.println(String.format("[%d][%d] Parameter flow metrics for resource %s: "
+ "pass count for param <%s> is %d",
seconds, TimeUtil.currentTimeMillis(), resourceName, param, oneSecondPass));

System.out.println(String.format(
"[%d][%d] Parameter flow metrics for resource %s: "
+ "pass count for param <%s> is %d, block count: %d",
seconds, TimeUtil.currentTimeMillis(), resourceName, param,
passCountMap.get(param).getAndSet(0), blockCountMap.get(param).getAndSet(0)));
}
System.out.println("=============================");
if (seconds-- <= 0) {
stop = true;
}
Expand Down
10 changes: 10 additions & 0 deletions sentinel-extension/sentinel-parameter-flow-control/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,15 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* @author Eric Zhao
* @since 0.2.0
*/
@CommandMapping(name = "getParamFlowRules", desc = "get param flow rules")
@CommandMapping(name = "getParamFlowRules", desc = "Get all parameter flow rules")
public class GetParamFlowRulesCommandHandler implements CommandHandler<String> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* @author Eric Zhao
* @since 0.2.0
*/
@CommandMapping(name = "setParamFlowRules", desc = "set param flow rules, accept param: data={paramFlowRule Json}")
@CommandMapping(name = "setParamFlowRules", desc = "Set parameter flow rules, while previous rules will be replaced.")
public class ModifyParamFlowRulesCommandHandler implements CommandHandler<String> {

private static WritableDataSource<List<ParamFlowRule>> paramFlowWds = null;
Expand Down
Loading