18
18
import com .alibaba .csp .sentinel .Entry ;
19
19
import com .alibaba .csp .sentinel .EntryType ;
20
20
import com .alibaba .csp .sentinel .SphU ;
21
- import com .alibaba .csp .sentinel .Tracer ;
22
- import com .alibaba .csp .sentinel .adapter .dubbo .config .DubboConfig ;
23
21
import com .alibaba .csp .sentinel .adapter .dubbo .fallback .DubboFallbackRegistry ;
24
22
import com .alibaba .csp .sentinel .log .RecordLog ;
25
23
import com .alibaba .csp .sentinel .slots .block .BlockException ;
26
24
import org .apache .dubbo .common .extension .Activate ;
27
- import org .apache .dubbo .rpc .Filter ;
25
+ import org .apache .dubbo .rpc .AsyncRpcResult ;
28
26
import org .apache .dubbo .rpc .Invocation ;
29
27
import org .apache .dubbo .rpc .Invoker ;
30
28
import org .apache .dubbo .rpc .Result ;
29
+ import org .apache .dubbo .rpc .RpcContext ;
31
30
import org .apache .dubbo .rpc .RpcException ;
31
+ import org .apache .dubbo .rpc .support .RpcUtils ;
32
32
33
33
/**
34
34
* <p>Dubbo service consumer filter for Sentinel. Auto activated by default.</p>
42
42
* @author Eric Zhao
43
43
*/
44
44
@ Activate (group = "consumer" )
45
- public class SentinelDubboConsumerFilter implements Filter {
45
+ public class SentinelDubboConsumerFilter extends BaseSentinelDubboFilter {
46
46
47
47
public SentinelDubboConsumerFilter () {
48
48
RecordLog .info ("Sentinel Apache Dubbo consumer filter initialized" );
@@ -52,32 +52,39 @@ public SentinelDubboConsumerFilter() {
52
52
public Result invoke (Invoker <?> invoker , Invocation invocation ) throws RpcException {
53
53
Entry interfaceEntry = null ;
54
54
Entry methodEntry = null ;
55
+ RpcContext rpcContext = RpcContext .getContext ();
55
56
try {
56
- String resourceName = DubboUtils .getResourceName (invoker , invocation , DubboConfig .getDubboConsumerPrefix ());
57
- interfaceEntry = SphU .entry (invoker .getInterface ().getName (), EntryType .OUT );
58
- methodEntry = SphU .entry (resourceName , EntryType .OUT );
59
-
57
+ boolean isAsync = RpcUtils .isAsync (invoker .getUrl (), invocation );
58
+ String resourceName = DubboUtils .getResourceName (invoker , invocation );
59
+ if (!isAsync ) {
60
+ interfaceEntry = SphU .entry (invoker .getUrl ().getEncodedServiceKey (), EntryType .OUT );
61
+ methodEntry = SphU .entry (resourceName , EntryType .OUT );
62
+ } else {
63
+ // should generate the AsyncEntry when the invoke model in future or async
64
+ interfaceEntry = SphU .asyncEntry (invoker .getUrl ().getEncodedServiceKey (), EntryType .OUT );
65
+ methodEntry = SphU .asyncEntry (resourceName , EntryType .OUT );
66
+ }
67
+ rpcContext .set (DubboUtils .DUBBO_INTERFACE_ENTRY_KEY , interfaceEntry );
68
+ rpcContext .set (DubboUtils .DUBBO_METHOD_ENTRY_KEY , methodEntry );
60
69
Result result = invoker .invoke (invocation );
61
- if (result .hasException ()) {
62
- Throwable e = result .getException ();
63
- // Record common exception.
64
- Tracer .traceEntry (e , interfaceEntry );
65
- Tracer .traceEntry (e , methodEntry );
70
+ if (result instanceof AsyncRpcResult ) {
71
+ // catch timeout or nonbiz-exception when in async model
72
+ AsyncRpcResult asyncRpcResult = (AsyncRpcResult ) result ;
73
+ asyncRpcResult .getValueFuture ().whenComplete ((rs ,ex ) -> {
74
+ if (ex != null ){
75
+ trace ((Throwable ) ex , invocation );
76
+ }
77
+ });
66
78
}
67
79
return result ;
68
80
} catch (BlockException e ) {
69
81
return DubboFallbackRegistry .getConsumerFallback ().handle (invoker , invocation , e );
70
82
} catch (RpcException e ) {
71
- Tracer . traceEntry ( e , interfaceEntry );
72
- Tracer . traceEntry (e , methodEntry );
83
+ // catch timeout or nonbiz-exception when in sync model
84
+ trace (e , invocation );
73
85
throw e ;
74
- } finally {
75
- if (methodEntry != null ) {
76
- methodEntry .exit ();
77
- }
78
- if (interfaceEntry != null ) {
79
- interfaceEntry .exit ();
80
- }
81
86
}
82
87
}
88
+
89
+
83
90
}
0 commit comments