Skip to content

Commit 5e031ab

Browse files
authored
Merge pull request #33385 Introduce a BoundedTrie metric.
Introduce a BoundedTrie metric which is used to efficiently store and aggregate a collection of string sequences (FQNs) with a limited size. It is recommended to review this PR by commits. BoundedTrie is a space-saving way to store many string sequences (like FQN/file paths). It acts like a tree with branches, holding sequences within a size limit. It can efficiently add, combine, and search and perform trimming of children when the size increases beyond defined max. Let's say we want to store these sequences, with a size limit of 3: "folder1/file1.txt" "folder1/file2.txt" "folder2/file3.txt" Here's how the BoundedTrie might look: root - folder1 - file1.txt - file2.txt - folder2 - file3.txt If we try to add "folder1/file4.txt", the trie might trim to "folder1", dropping all children to stay within the size limit. This will be used to replace the StringSet metric for lineage tracking for very large lineage graphs to overcome the size limits.
2 parents 3321224 + f68fe75 commit 5e031ab

File tree

39 files changed

+2921
-36
lines changed

39 files changed

+2921
-36
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.core.metrics;
19+
20+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
21+
import java.util.Arrays;
22+
import java.util.Objects;
23+
import org.apache.beam.sdk.metrics.BoundedTrie;
24+
import org.apache.beam.sdk.metrics.MetricName;
25+
import org.checkerframework.checker.nullness.qual.Nullable;
26+
27+
/**
28+
* Tracks the current value for a {@link BoundedTrie} metric.
29+
*
30+
* <p>This class generally shouldn't be used directly. The only exception is within a runner where a
31+
* counter is being reported for a specific step (rather than the counter in the current context).
32+
* In that case retrieving the underlying cell and reporting directly to it avoids a step of
33+
* indirection.
34+
*/
35+
@SuppressFBWarnings(
36+
value = "IS2_INCONSISTENT_SYNC",
37+
justification = "Some access on purpose are left unsynchronized")
38+
public class BoundedTrieCell implements BoundedTrie, MetricCell<BoundedTrieData> {
39+
40+
private final DirtyState dirty = new DirtyState();
41+
private BoundedTrieData value;
42+
private final MetricName name;
43+
44+
public BoundedTrieCell(MetricName name) {
45+
this.name = name;
46+
this.value = new BoundedTrieData();
47+
}
48+
49+
public synchronized void update(BoundedTrieData other) {
50+
// although BoundedTrieData is thread-safe the cell is made thread safe too because combine
51+
// returns a reference to the combined BoundedTrieData and want the reference update here to
52+
// be thread safe too.
53+
this.value = this.value.combine(other);
54+
dirty.afterModification();
55+
}
56+
57+
@Override
58+
public synchronized void reset() {
59+
value.clear();
60+
dirty.reset();
61+
}
62+
63+
@Override
64+
public DirtyState getDirty() {
65+
return dirty;
66+
}
67+
68+
/**
69+
* @return Returns a deep copy of the {@link BoundedTrieData} contained in this {@link
70+
* BoundedTrieCell}.
71+
*/
72+
@Override
73+
public synchronized BoundedTrieData getCumulative() {
74+
return value.getCumulative();
75+
}
76+
77+
@Override
78+
public MetricName getName() {
79+
return name;
80+
}
81+
82+
@Override
83+
public boolean equals(@Nullable Object object) {
84+
if (object instanceof BoundedTrieCell) {
85+
BoundedTrieCell boundedTrieCell = (BoundedTrieCell) object;
86+
return Objects.equals(dirty, boundedTrieCell.dirty)
87+
&& Objects.equals(value, boundedTrieCell.value)
88+
&& Objects.equals(name, boundedTrieCell.name);
89+
}
90+
return false;
91+
}
92+
93+
@Override
94+
public int hashCode() {
95+
return Objects.hash(dirty, value, name);
96+
}
97+
98+
@Override
99+
public synchronized void add(Iterable<String> values) {
100+
this.value.add(values);
101+
dirty.afterModification();
102+
}
103+
104+
@Override
105+
public synchronized void add(String... values) {
106+
add(Arrays.asList(values));
107+
}
108+
}

0 commit comments

Comments
 (0)