1
1
package com .twitter .finagle .filter
2
2
3
3
import com .twitter .finagle .FailureFlags
4
- import com .twitter .finagle .stats . StatsReceiver
5
- import com .twitter .finagle .param
4
+ import com .twitter .finagle .Filter
5
+ import com .twitter .finagle .Filter . TypeAgnostic
6
6
import com .twitter .finagle .Service
7
7
import com .twitter .finagle .ServiceFactory
8
8
import com .twitter .finagle .SimpleFilter
@@ -11,6 +11,7 @@ import com.twitter.finagle.Stackable
11
11
import com .twitter .finagle .service .ReqRep
12
12
import com .twitter .finagle .service .ResponseClass
13
13
import com .twitter .finagle .service .ResponseClassifier
14
+ import com .twitter .finagle .stats .StatsReceiver
14
15
import com .twitter .util .Duration
15
16
import com .twitter .util .Future
16
17
import com .twitter .util .Stopwatch
@@ -26,7 +27,7 @@ private[twitter] object SLOStatsFilter {
26
27
27
28
object Param {
28
29
case class Configured (
29
- latency : Duration )
30
+ requestToSLODefinition : PartialFunction [ Any , SLODefinition ] )
30
31
extends Param
31
32
32
33
case object Disabled extends Param
@@ -36,71 +37,103 @@ private[twitter] object SLOStatsFilter {
36
37
37
38
val Disabled : Param = Param .Disabled
38
39
39
- def configured (latency : Duration ): Param = {
40
- Param .Configured (latency)
40
+ def configured (
41
+ requestToSLODefinition : PartialFunction [Any , SLODefinition ],
42
+ ): Param = {
43
+ Param .Configured (requestToSLODefinition)
44
+ }
45
+
46
+ def configured (SLODefinition : SLODefinition ): Param = {
47
+ Param .Configured ({
48
+ case _ => SLODefinition
49
+ })
50
+ }
51
+
52
+ def typeAgnostic (
53
+ statsReceiver : StatsReceiver ,
54
+ requestToSLODefinition : PartialFunction [Any , SLODefinition ],
55
+ responseClassifier : ResponseClassifier ,
56
+ nowNanos : () => Long = Stopwatch .systemNanos
57
+ ): TypeAgnostic = new TypeAgnostic {
58
+ def toFilter [Req , Rep ]: Filter [Req , Rep , Req , Rep ] =
59
+ new SLOStatsFilter [Req , Rep ](
60
+ requestToSLODefinition,
61
+ responseClassifier,
62
+ statsReceiver,
63
+ nowNanos)
41
64
}
42
65
43
66
def module [Req , Rep ]: Stackable [ServiceFactory [Req , Rep ]] =
44
- new Stack .Module3 [param.Stats , param.ResponseClassifier , Param , ServiceFactory [Req , Rep ]] {
67
+ new Stack .Module3 [
68
+ com.twitter.finagle.param.Stats ,
69
+ com.twitter.finagle.param.ResponseClassifier ,
70
+ Param ,
71
+ ServiceFactory [Req , Rep ]
72
+ ] {
45
73
val role = SLOStatsFilter .role
46
74
val description =
47
75
" Record number of SLO violations of underlying service"
48
76
override def make (
49
- _stats : param.Stats ,
50
- _responseClassifier : param.ResponseClassifier ,
77
+ _stats : com.twitter.finagle. param.Stats ,
78
+ _responseClassifier : com.twitter.finagle. param.ResponseClassifier ,
51
79
params : Param ,
52
80
next : ServiceFactory [Req , Rep ]
53
81
): ServiceFactory [Req , Rep ] = {
54
82
params match {
55
83
case Param .Disabled => next
56
- case Param .Configured (latency) =>
57
- val param .Stats (statsReceiver) = _stats
58
- val param .ResponseClassifier (responseClassifier) = _responseClassifier
84
+ case Param .Configured (requestToSLODefinition) =>
85
+ val com .twitter.finagle.param.Stats (statsReceiver) = _stats
86
+ val com .twitter.finagle.param.ResponseClassifier (responseClassifier) =
87
+ _responseClassifier
88
+
59
89
new SLOStatsFilter (
60
- statsReceiver.scope( " slo " ) ,
61
- latency.inNanoseconds ,
62
- responseClassifier ).andThen(next)
90
+ requestToSLODefinition ,
91
+ responseClassifier ,
92
+ statsReceiver.scope( " slo " ) ).andThen(next)
63
93
}
64
94
}
65
95
}
66
96
}
67
97
98
+ case class SLODefinition (scope : String , latency : Duration )
99
+
68
100
/**
69
- * A [[com.twitter.finagle.Filter ]] that records the number of slo violations from the underlying
70
- * service. A request is classified as violating the slo if any of the following occur:
101
+ * A [[com.twitter.finagle.Filter ]] that records the number of slo violations (as determined from
102
+ * `requestToSLODefinition`) from the underlying service. A request is classified as violating the
103
+ * slo if any of the following occur:
71
104
* - The response returns after `latency` duration has elapsed
72
105
* - The response is classified as a failure according to the ResponseClassifier (but is not
73
106
* ignorable or interrupted)
74
107
*/
75
- private [finagle] class SLOStatsFilter [Req , Rep ](
76
- statsReceiver : StatsReceiver ,
77
- latencyNanos : Long ,
108
+ class SLOStatsFilter [Req , Rep ](
109
+ requestToSLODefinition : PartialFunction [Any , SLODefinition ],
78
110
responseClassifier : ResponseClassifier ,
111
+ statsReceiver : StatsReceiver ,
79
112
nowNanos : () => Long = Stopwatch .systemNanos)
80
113
extends SimpleFilter [Req , Rep ] {
81
114
82
- private [this ] val violationsScope = statsReceiver.scope(" violations" )
83
- private [this ] val violationsTotalCounter = violationsScope.counter(" total" )
84
- private [this ] val violationsFailuresCounter = violationsScope.counter(" failures" )
85
- private [this ] val violationsLatencyCounter = violationsScope.counter(" latency" )
86
-
87
115
def apply (request : Req , service : Service [Req , Rep ]): Future [Rep ] = {
88
116
val start = nowNanos()
89
117
service(request).respond { response =>
90
118
if (! isIgnorable(response)) {
91
- var violated = false
92
- if (nowNanos() - start > latencyNanos) {
93
- violated = true
94
- violationsLatencyCounter.incr()
95
- }
96
-
97
- if (isFailure(request, response)) {
98
- violated = true
99
- violationsFailuresCounter.incr()
100
- }
101
-
102
- if (violated) {
103
- violationsTotalCounter.incr()
119
+ if (requestToSLODefinition.isDefinedAt(request)) {
120
+ val sloDefinition = requestToSLODefinition(request)
121
+ var violated = false
122
+ if (nowNanos() - start > sloDefinition.latency.inNanoseconds) {
123
+ violated = true
124
+ statsReceiver.counter(sloDefinition.scope, " violations" , " latency" ).incr()
125
+ }
126
+
127
+ if (isFailure(request, response)) {
128
+ violated = true
129
+ statsReceiver.counter(sloDefinition.scope, " violations" , " failures" ).incr()
130
+ }
131
+
132
+ if (violated) {
133
+ statsReceiver.counter(sloDefinition.scope, " violations" , " total" ).incr()
134
+ }
135
+
136
+ statsReceiver.counter(sloDefinition.scope, " total" ).incr()
104
137
}
105
138
}
106
139
}
0 commit comments