Skip to content

Add support for asynchronous invocation entry #146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel;

import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;

/**
* The entry for asynchronous resources.
*
* @author Eric Zhao
* @since 0.2.0
*/
public class AsyncEntry extends CtEntry {

private Context asyncContext;

AsyncEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) {
super(resourceWrapper, chain, context);
}

/**
* Remove current entry from local context, but does not exit.
*/
void cleanCurrentEntryInLocal() {
Context originalContext = context;
if (originalContext != null) {
Entry curEntry = originalContext.getCurEntry();
if (curEntry == this) {
Entry parent = this.parent;
originalContext.setCurEntry(parent);
if (parent != null) {
((CtEntry)parent).child = null;
}
} else {
throw new IllegalStateException("Bad async context state");
}
}
}

public Context getAsyncContext() {
return asyncContext;
}

/**
* The async context should not be initialized until the node for current resource has been set to current entry.
*/
void initAsyncContext() {
if (asyncContext == null) {
this.asyncContext = Context.newAsyncContext(context.getEntranceNode(), context.getName())
.setOrigin(context.getOrigin())
.setCurEntry(this);
} else {
throw new IllegalStateException("Duplicate initialize of async context");
}
}

@Override
protected void clearEntryContext() {
super.clearEntryContext();
this.asyncContext = null;
}

@Override
protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException {
exitForContext(asyncContext, count, args);

return parent;
}
}
103 changes: 103 additions & 0 deletions sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtEntry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.csp.sentinel;

import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;

/**
* Linked entry within current context.
*
* @author jialiang.linjl
* @author Eric Zhao
*/
class CtEntry extends Entry {

protected Entry parent = null;
protected Entry child = null;

protected ProcessorSlot<Object> chain;
protected Context context;

CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) {
super(resourceWrapper);
this.chain = chain;
this.context = context;

setUpEntryFor(context);
}

private void setUpEntryFor(Context context) {
this.parent = context.getCurEntry();
if (parent != null) {
((CtEntry)parent).child = this;
}
context.setCurEntry(this);
}

@Override
public void exit(int count, Object... args) throws ErrorEntryFreeException {
trueExit(count, args);
}

protected void exitForContext(Context context, int count, Object... args) throws ErrorEntryFreeException {
if (context != null) {
if (context.getCurEntry() != this) {
// Clean previous call stack.
CtEntry e = (CtEntry)context.getCurEntry();
while (e != null) {
e.exit(count, args);
e = (CtEntry)e.parent;
}
throw new ErrorEntryFreeException("The order of entry free is can't be paired with the order of entry");
} else {
if (chain != null) {
chain.exit(context, resourceWrapper, count, args);
}
// Restore the call stack.
context.setCurEntry(parent);
if (parent != null) {
((CtEntry)parent).child = null;
}
if (parent == null) {
// Auto-created entry indicates immediate exit.
ContextUtil.exit();
}
// Clean the reference of context in current entry to avoid duplicate exit.
clearEntryContext();
}
}
}

protected void clearEntryContext() {
this.context = null;
}

@Override
protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException {
exitForContext(context, count, args);

return parent;
}

@Override
public Node getLastNode() {
return parent == null ? null : parent.getCurNode();
}
}
109 changes: 46 additions & 63 deletions sentinel-core/src/main/java/com/alibaba/csp/sentinel/CtSph.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.context.NullContext;
import com.alibaba.csp.sentinel.node.Node;
import com.alibaba.csp.sentinel.slotchain.MethodResourceWrapper;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlot;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotChain;
Expand Down Expand Up @@ -53,6 +52,46 @@ public class CtSph implements Sph {

private static final Object LOCK = new Object();

private AsyncEntry asyncEntryInternal(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// Init the entry only. No rule checking will occur.
return new AsyncEntry(resourceWrapper, null, context);
}
if (context == null) {
context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
}

// Global switch is turned off, so no rule checking will be done.
if (!Constants.ON) {
return new AsyncEntry(resourceWrapper, null, context);
}

ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

// Means processor cache size exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, so no rule checking will be done.
if (chain == null) {
return new AsyncEntry(resourceWrapper, null, context);
}

AsyncEntry asyncEntry = new AsyncEntry(resourceWrapper, chain, context);
try {
chain.entry(context, resourceWrapper, null, count, args);
// Initiate the async context.
asyncEntry.initAsyncContext();
} catch (BlockException e1) {
asyncEntry.exit(count, args);
throw e1;
} catch (Throwable e1) {
RecordLog.info("Sentinel unexpected exception", e1);
} finally {
// The asynchronous call may take time in background, and current context should not be hanged on it.
// So we need to remove current async entry from current context.
asyncEntry.cleanCurrentEntryInLocal();
}
return asyncEntry;
}

/**
* Do all {@link Rule}s checking about the resource.
*
Expand Down Expand Up @@ -145,68 +184,6 @@ private ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper)
return chain;
}

private static class CtEntry extends Entry {

protected Entry parent = null;
protected Entry child = null;
private ProcessorSlot<Object> chain;
private Context context;

CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) {
super(resourceWrapper);
this.chain = chain;
this.context = context;
parent = context.getCurEntry();
if (parent != null) {
((CtEntry)parent).child = this;
}
context.setCurEntry(this);
}

@Override
public void exit(int count, Object... args) throws ErrorEntryFreeException {
trueExit(count, args);
}

@Override
protected Entry trueExit(int count, Object... args) throws ErrorEntryFreeException {
if (context != null) {
if (context.getCurEntry() != this) {
// Clean previous call stack.
CtEntry e = (CtEntry)context.getCurEntry();
while (e != null) {
e.exit(count, args);
e = (CtEntry)e.parent;
}
throw new ErrorEntryFreeException(
"The order of entry free is can't be paired with the order of entry");
} else {
if (chain != null) {
chain.exit(context, resourceWrapper, count, args);
}
// Modify the call stack.
context.setCurEntry(parent);
if (parent != null) {
((CtEntry)parent).child = null;
}
if (parent == null) {
// Auto-created entry indicates immediate exit.
ContextUtil.exit();
}
// Clean the reference of context in current entry to avoid duplicate exit.
context = null;
}
}
return parent;

}

@Override
public Node getLastNode() {
return parent == null ? null : parent.getCurNode();
}
}

/**
* This class is used for skip context name checking.
*/
Expand Down Expand Up @@ -275,4 +252,10 @@ public Entry entry(String name, EntryType type, int count, Object... args) throw
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return entry(resource, count, args);
}

@Override
public AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException {
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return asyncEntryInternal(resource, count, args);
}
}
19 changes: 16 additions & 3 deletions sentinel-core/src/main/java/com/alibaba/csp/sentinel/Sph.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* @author qinan.qn
* @author jialiang.linjl
* @author leyou
* @author Eric Zhao
*/
public interface Sph {

Expand Down Expand Up @@ -135,11 +136,23 @@ public interface Sph {
* @param type the resource is an inbound or an outbound method. This is used
* to mark whether it can be blocked when the system is unstable
* @param count the count that the resource requires
* @param args the parameters of the method. It can also be counted by setting
* hot parameter rule
* @return entry get.
* @param args the parameters of the method. It can also be counted by setting hot parameter rule
* @return entry get
* @throws BlockException if the block criteria is met
*/
Entry entry(String name, EntryType type, int count, Object... args) throws BlockException;

/**
* Create a protected asynchronous resource.
*
* @param name the unique name for the protected resource
* @param type the resource is an inbound or an outbound method. This is used
* to mark whether it can be blocked when the system is unstable
* @param count the count that the resource requires
* @param args the parameters of the method. It can also be counted by setting hot parameter rule
* @return created asynchronous entry
* @throws BlockException if the block criteria is met
* @since 0.2.0
*/
AsyncEntry asyncEntry(String name, EntryType type, int count, Object... args) throws BlockException;
}
Loading