Skip to content

Commit b227fdf

Browse files
authored
Introduce payload size metrics (#6745)
* Introduce payload size metrics * code cleanup * formatting
1 parent cc3b004 commit b227fdf

File tree

4 files changed

+762
-5
lines changed

4 files changed

+762
-5
lines changed

common/metrics/defs.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2106,6 +2106,7 @@ const (
21062106
PersistenceSampledCounter
21072107
PersistenceEmptyResponseCounter
21082108
PersistenceResponseRowSize
2109+
PersistenceResponsePayloadSize
21092110

21102111
PersistenceRequestsPerDomain
21112112
PersistenceRequestsPerShard
@@ -2812,6 +2813,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
28122813
PersistenceSampledCounter: {metricName: "persistence_sampled", metricType: Counter},
28132814
PersistenceEmptyResponseCounter: {metricName: "persistence_empty_response", metricType: Counter},
28142815
PersistenceResponseRowSize: {metricName: "persistence_response_row_size", metricType: Histogram, buckets: ResponseRowSizeBuckets},
2816+
PersistenceResponsePayloadSize: {metricName: "persistence_response_payload_size", metricType: Histogram, buckets: ResponsePayloadSizeBuckets},
28152817
PersistenceRequestsPerDomain: {metricName: "persistence_requests_per_domain", metricRollupName: "persistence_requests", metricType: Counter},
28162818
PersistenceRequestsPerShard: {metricName: "persistence_requests_per_shard", metricType: Counter},
28172819
PersistenceFailuresPerDomain: {metricName: "persistence_errors_per_domain", metricRollupName: "persistence_errors", metricType: Counter},
@@ -3535,6 +3537,12 @@ var ResponseRowSizeBuckets = append(
35353537
tally.MustMakeExponentialValueBuckets(1, 2, 17)..., // 1..65536
35363538
)
35373539

3540+
// ResponsePayloadSizeBuckets contains buckets for tracking the size of the payload returned per persistence operation
3541+
var ResponsePayloadSizeBuckets = append(
3542+
tally.ValueBuckets{0}, // need an explicit 0 or zero is reported as 1
3543+
tally.MustMakeExponentialValueBuckets(1024, 2, 20)..., // 1kB..1GB
3544+
)
3545+
35383546
// ErrorClass is an enum to help with classifying SLA vs. non-SLA errors (SLA = "service level agreement")
35393547
type ErrorClass uint8
35403548

common/persistence/metered.go

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
package persistence
2424

2525
import (
26+
"unsafe"
27+
2628
"github.com/uber/cadence/common/log/tag"
2729
"github.com/uber/cadence/common/metrics"
2830
)
@@ -71,6 +73,248 @@ func (r GetAllHistoryTreeBranchesResponse) Len() int {
7173
return len(r.Branches)
7274
}
7375

76+
// For responses that require metrics for payload size ByteSize() uint64 should be defined.
77+
78+
func (r *GetReplicationTasksResponse) ByteSize() uint64 {
79+
if r == nil {
80+
return 0
81+
}
82+
83+
size := uint64(int(unsafe.Sizeof(*r)) + len(r.NextPageToken))
84+
for _, v := range r.Tasks {
85+
size += v.ByteSize()
86+
}
87+
88+
return size
89+
}
90+
91+
func (r *ReplicationTaskInfo) ByteSize() uint64 {
92+
if r == nil {
93+
return 0
94+
}
95+
96+
return uint64(int(unsafe.Sizeof(*r)) + len(r.DomainID) + len(r.WorkflowID) + len(r.RunID) + len(r.BranchToken) + len(r.NewRunBranchToken))
97+
}
98+
99+
func (r *GetTimerIndexTasksResponse) ByteSize() uint64 {
100+
if r == nil {
101+
return 0
102+
}
103+
104+
size := uint64(int(unsafe.Sizeof(*r)) + len(r.NextPageToken))
105+
for _, v := range r.Timers {
106+
size += v.ByteSize()
107+
}
108+
109+
return size
110+
}
111+
112+
func (r *TimerTaskInfo) ByteSize() uint64 {
113+
if r == nil {
114+
return 0
115+
}
116+
117+
return uint64(int(unsafe.Sizeof(*r)) + len(r.DomainID) + len(r.WorkflowID) + len(r.RunID))
118+
}
119+
120+
func (r *GetTasksResponse) ByteSize() uint64 {
121+
if r == nil {
122+
return 0
123+
}
124+
125+
size := uint64(unsafe.Sizeof(*r))
126+
for _, v := range r.Tasks {
127+
size += v.ByteSize()
128+
}
129+
130+
return size
131+
}
132+
133+
func (r *TaskInfo) ByteSize() uint64 {
134+
if r == nil {
135+
return 0
136+
}
137+
138+
return uint64(int(unsafe.Sizeof(*r)) + len(r.DomainID) + len(r.WorkflowID) + len(r.RunID) + estimateStringMapSize(r.PartitionConfig))
139+
}
140+
141+
func (r *ListDomainsResponse) ByteSize() uint64 {
142+
if r == nil {
143+
return 0
144+
}
145+
146+
size := uint64(int(unsafe.Sizeof(*r)) + len(r.NextPageToken))
147+
for _, v := range r.Domains {
148+
size += v.ByteSize()
149+
}
150+
151+
return size
152+
}
153+
154+
func (r *GetDomainResponse) ByteSize() uint64 {
155+
if r == nil {
156+
return 0
157+
}
158+
159+
return uint64(unsafe.Sizeof(*r)) + r.Info.ByteSize() + r.Config.ByteSize() + r.ReplicationConfig.ByteSize()
160+
}
161+
162+
func (i *DomainInfo) ByteSize() uint64 {
163+
if i == nil {
164+
return 0
165+
}
166+
167+
return uint64(int(unsafe.Sizeof(*i)) + len(i.ID) + len(i.Name) + len(i.Description) + len(i.OwnerEmail) + estimateStringMapSize(i.Data))
168+
}
169+
170+
func (c *DomainConfig) ByteSize() uint64 {
171+
if c == nil {
172+
return 0
173+
}
174+
175+
size := int(unsafe.Sizeof(*c)) + len(c.HistoryArchivalURI) + len(c.VisibilityArchivalURI)
176+
177+
asyncWorkflowConfigSize := int(unsafe.Sizeof(c.AsyncWorkflowConfig)) + len(c.AsyncWorkflowConfig.PredefinedQueueName) + len(c.AsyncWorkflowConfig.QueueType)
178+
if c.AsyncWorkflowConfig.QueueConfig != nil {
179+
size += len(c.AsyncWorkflowConfig.QueueConfig.Data)
180+
}
181+
182+
binariesSize := 0
183+
for key, value := range c.BadBinaries.Binaries {
184+
binariesSize += len(key)
185+
if value != nil {
186+
binariesSize += len(value.Reason) + len(value.Operator)
187+
}
188+
}
189+
190+
isolationGroupsSize := 0
191+
for key, value := range c.IsolationGroups {
192+
binariesSize += len(key) + len(value.Name)
193+
}
194+
195+
return uint64(size + asyncWorkflowConfigSize + binariesSize + isolationGroupsSize)
196+
}
197+
198+
func (c *DomainReplicationConfig) ByteSize() uint64 {
199+
if c == nil {
200+
return 0
201+
}
202+
203+
total := len(c.ActiveClusterName)
204+
for _, v := range c.Clusters {
205+
if v == nil {
206+
continue
207+
}
208+
total += len(v.ClusterName)
209+
}
210+
return uint64(total)
211+
}
212+
213+
func estimateStringMapSize(m map[string]string) int {
214+
size := 0
215+
for key, value := range m {
216+
size += len(key) + len(value)
217+
}
218+
return size
219+
}
220+
221+
func (r *ReadRawHistoryBranchResponse) Size2() uint64 {
222+
if r == nil {
223+
return 0
224+
}
225+
226+
total := uint64(int(unsafe.Sizeof(*r)) + len(r.NextPageToken))
227+
for _, v := range r.HistoryEventBlobs {
228+
total += v.ByteSize()
229+
}
230+
return total
231+
}
232+
233+
func (d *DataBlob) ByteSize() uint64 {
234+
if d == nil {
235+
return 0
236+
}
237+
238+
return uint64(int(unsafe.Sizeof(*d)) + len(d.Data))
239+
}
240+
241+
func (r *ListCurrentExecutionsResponse) ByteSize() uint64 {
242+
if r == nil {
243+
return 0
244+
}
245+
246+
total := uint64(int(unsafe.Sizeof(*r)) + len(r.PageToken))
247+
for _, v := range r.Executions {
248+
total += v.ByteSize()
249+
}
250+
251+
return total
252+
}
253+
254+
func (r *CurrentWorkflowExecution) ByteSize() uint64 {
255+
if r == nil {
256+
return 0
257+
}
258+
259+
return uint64(int(unsafe.Sizeof(*r)) + len(r.DomainID) + len(r.WorkflowID) + len(r.RunID) + len(r.CurrentRunID))
260+
}
261+
262+
func (r *GetTransferTasksResponse) ByteSize() uint64 {
263+
if r == nil {
264+
return 0
265+
}
266+
267+
total := uint64(int(unsafe.Sizeof(*r)) + len(r.NextPageToken))
268+
for _, v := range r.Tasks {
269+
total += v.ByteSize()
270+
}
271+
272+
return total
273+
}
274+
275+
func (r *TransferTaskInfo) ByteSize() uint64 {
276+
if r == nil {
277+
return 0
278+
}
279+
280+
return uint64(int(unsafe.Sizeof(*r)) + len(r.DomainID) + len(r.WorkflowID) + len(r.RunID) +
281+
len(r.TargetDomainID) + len(r.TargetWorkflowID) + len(r.TargetRunID) + len(r.TaskList))
282+
}
283+
284+
func (r QueueMessageList) ByteSize() uint64 {
285+
if r == nil {
286+
return 0
287+
}
288+
289+
total := uint64(0)
290+
for _, v := range r {
291+
total += v.ByteSize()
292+
}
293+
294+
return total
295+
}
296+
297+
func (r *QueueMessage) ByteSize() uint64 {
298+
if r == nil {
299+
return 0
300+
}
301+
302+
return uint64(int(unsafe.Sizeof(*r)) + len(r.Payload))
303+
}
304+
305+
func (r GetAllHistoryTreeBranchesResponse) ByteSize() uint64 {
306+
total := uint64(int(unsafe.Sizeof(r)) + len(r.NextPageToken))
307+
for _, v := range r.Branches {
308+
total += v.ByteSize()
309+
}
310+
311+
return total
312+
}
313+
314+
func (r HistoryBranchDetail) ByteSize() uint64 {
315+
return uint64(int(unsafe.Sizeof(r)) + len(r.TreeID) + len(r.BranchID) + len(r.Info))
316+
}
317+
74318
// If MetricTags() []metrics.Tag is defined, then metrics will be emitted for the request.
75319

76320
func (r ReadHistoryBranchRequest) MetricTags() []metrics.Tag {

0 commit comments

Comments
 (0)