Skip to content

Commit dd9bf96

Browse files
committed
Added new implementation that makes the distributor accept multiple HA pairs (cluster, replica) in the same requets/batch. This can be enabled with a new flag, accept_mixed_ha_samples, an will take effect only if accept_ha_samples is set to true.
Fixed test by reducing the number of ingesters to 2 and replication factor to 2. Added config reference. Do not remove replica label if cluster label is not present. Added more HA mixed replicas tests with no cluster and replica labels and with cluster label only. Signed-off-by: eduardscaueru <[email protected]>
1 parent e070ec6 commit dd9bf96

File tree

5 files changed

+241
-3
lines changed

5 files changed

+241
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
1010
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
1111
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
12+
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
1213
* [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232
1314
* [ENHANCEMENT] Query Frontend: Add info field to query response. #6207
1415
* [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188

docs/configuration/config-file-reference.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3163,6 +3163,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
31633163
# CLI flag: -distributor.ha-tracker.enable-for-all-users
31643164
[accept_ha_samples: <boolean> | default = false]
31653165
3166+
# Flag to enable handling of samples with mixed external labels identifying
3167+
# replicas in an HA Prometheus setup. Supported only if
3168+
# -distributor.ha-tracker.enable-for-all-users is true.
3169+
# CLI flag: -distributor.ha-tracker.mixed-ha-samples
3170+
[accept_mixed_ha_samples: <boolean> | default = false]
3171+
31663172
# Prometheus label to look for in samples to identify a Prometheus HA cluster.
31673173
# CLI flag: -distributor.ha-tracker.cluster
31683174
[ha_cluster_label: <string> | default = "cluster"]

pkg/distributor/distributor.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
652652
// Cache user limit with overrides so we spend less CPU doing locking. See issue #4904
653653
limits := d.limits.GetOverridesForUser(userID)
654654

655-
if limits.AcceptHASamples && len(req.Timeseries) > 0 {
655+
if limits.AcceptHASamples && len(req.Timeseries) > 0 && !limits.AcceptMixedHASamples {
656656
cluster, replica := findHALabels(limits.HAReplicaLabel, limits.HAClusterLabel, req.Timeseries[0].Labels)
657657
removeReplica, err = d.checkSample(ctx, userID, cluster, replica, limits)
658658
if err != nil {
@@ -676,6 +676,8 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
676676
if !removeReplica { // False, Nil
677677
d.nonHASamples.WithLabelValues(userID).Add(float64(numFloatSamples + numHistogramSamples))
678678
}
679+
} else if limits.AcceptHASamples && len(req.Timeseries) > 0 && limits.AcceptMixedHASamples {
680+
removeReplica = true
679681
}
680682

681683
// A WriteRequest can only contain series or metadata but not both. This might change in the future.
@@ -859,6 +861,19 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
859861
// check each sample and discard if outside limits.
860862
skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation()
861863
for _, ts := range req.Timeseries {
864+
removeReplicaTs := removeReplica
865+
if limits.AcceptHASamples && limits.AcceptMixedHASamples {
866+
validCode, cluster := d.isValidHAPair(ts.Labels, ctx, userID, limits)
867+
if validCode == 1 {
868+
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(len(ts.Samples) + len(ts.Histograms)))
869+
continue
870+
} else if validCode == 2 {
871+
removeReplicaTs = false
872+
d.nonHASamples.WithLabelValues(userID).Add(float64(len(ts.Samples) + len(ts.Histograms)))
873+
} else if validCode == 0 {
874+
removeReplicaTs = true
875+
}
876+
}
862877
// Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong.
863878
if len(ts.Samples) > 0 {
864879
latestSampleTimestampMs = max(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
@@ -889,7 +904,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
889904
// If we found both the cluster and replica labels, we only want to include the cluster label when
890905
// storing series in Cortex. If we kept the replica label we would end up with another series for the same
891906
// series we're trying to dedupe when HA tracking moves over to a different replica.
892-
if removeReplica {
907+
if removeReplicaTs {
893908
removeLabel(limits.HAReplicaLabel, &ts.Labels)
894909
}
895910

@@ -1467,6 +1482,19 @@ func findHALabels(replicaLabel, clusterLabel string, labels []cortexpb.LabelAdap
14671482
return cluster, replica
14681483
}
14691484

1485+
func (d *Distributor) isValidHAPair(labels []cortexpb.LabelAdapter, ctx context.Context, userID string, limits *validation.Limits) (int, string) {
1486+
cluster, replica := findHALabels(limits.HAReplicaLabel, limits.HAClusterLabel, labels)
1487+
1488+
if cluster != "" && replica != "" {
1489+
_, err := d.checkSample(ctx, userID, cluster, replica, limits)
1490+
if err != nil {
1491+
return 1, cluster // discard sample
1492+
}
1493+
return 0, cluster // valid HA sample
1494+
}
1495+
return 2, cluster // non HA sample
1496+
}
1497+
14701498
func getLimitFromLabelHints(hints *storage.LabelHints) int {
14711499
if hints != nil {
14721500
return hints.Limit

pkg/distributor/distributor_test.go

Lines changed: 197 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1031,6 +1031,102 @@ func TestDistributor_PushHAInstances(t *testing.T) {
10311031
}
10321032
}
10331033

1034+
func TestDistributor_PushMixedHAInstances(t *testing.T) {
1035+
t.Parallel()
1036+
ctx := user.InjectOrgID(context.Background(), "user")
1037+
1038+
for i, tc := range []struct {
1039+
enableTracker bool
1040+
acceptMixedHASamples bool
1041+
samples int
1042+
expectedResponse *cortexpb.WriteResponse
1043+
expectedCode int32
1044+
}{
1045+
{
1046+
enableTracker: true,
1047+
acceptMixedHASamples: true,
1048+
samples: 5,
1049+
expectedResponse: emptyResponse,
1050+
expectedCode: 202,
1051+
},
1052+
} {
1053+
for _, shardByAllLabels := range []bool{true} {
1054+
tc := tc
1055+
shardByAllLabels := shardByAllLabels
1056+
for _, enableHistogram := range []bool{false} {
1057+
enableHistogram := enableHistogram
1058+
t.Run(fmt.Sprintf("[%d](shardByAllLabels=%v, histogram=%v)", i, shardByAllLabels, enableHistogram), func(t *testing.T) {
1059+
t.Parallel()
1060+
var limits validation.Limits
1061+
flagext.DefaultValues(&limits)
1062+
limits.AcceptHASamples = true
1063+
limits.AcceptMixedHASamples = tc.acceptMixedHASamples
1064+
limits.MaxLabelValueLength = 25
1065+
1066+
ds, ingesters, _, _ := prepare(t, prepConfig{
1067+
numIngesters: 2,
1068+
happyIngesters: 2,
1069+
numDistributors: 1,
1070+
replicationFactor: 2,
1071+
shardByAllLabels: shardByAllLabels,
1072+
limits: &limits,
1073+
enableTracker: tc.enableTracker,
1074+
})
1075+
1076+
d := ds[0]
1077+
1078+
request := makeWriteRequestHAMixedSamples(tc.samples, enableHistogram)
1079+
response, _ := d.Push(ctx, request)
1080+
assert.Equal(t, tc.expectedResponse, response)
1081+
1082+
for i := range ingesters {
1083+
timeseries := ingesters[i].series()
1084+
assert.Equal(t, 5, len(timeseries))
1085+
clusters := make(map[string]int)
1086+
replicas := make(map[string]int)
1087+
for _, v := range timeseries {
1088+
replicaLabel := ""
1089+
clusterLabel := ""
1090+
for _, label := range v.Labels {
1091+
if label.Name == "__replica__" {
1092+
replicaLabel = label.Value
1093+
_, ok := replicas[label.Value]
1094+
if !ok {
1095+
replicas[label.Value] = 1
1096+
} else {
1097+
assert.Fail(t, fmt.Sprintf("Two timeseries with same replica label, %s, were found, but only one should be present", label.Value))
1098+
}
1099+
}
1100+
if label.Name == "cluster" {
1101+
clusterLabel = label.Value
1102+
_, ok := clusters[label.Value]
1103+
if !ok {
1104+
clusters[label.Value] = 1
1105+
} else {
1106+
assert.Fail(t, fmt.Sprintf("Two timeseries with same cluster label, %s, were found, but only one should be present", label.Value))
1107+
}
1108+
}
1109+
}
1110+
if clusterLabel == "" && replicaLabel != "" {
1111+
assert.Equal(t, "replicaNoCluster", replicaLabel)
1112+
}
1113+
assert.Equal(t, tc.samples, len(v.Samples))
1114+
}
1115+
assert.Equal(t, 3, len(clusters))
1116+
for _, nr := range clusters {
1117+
assert.Equal(t, true, nr == 1)
1118+
}
1119+
assert.Equal(t, 1, len(replicas))
1120+
for _, nr := range clusters {
1121+
assert.Equal(t, true, nr == 1)
1122+
}
1123+
}
1124+
})
1125+
}
1126+
}
1127+
}
1128+
}
1129+
10341130
func TestDistributor_PushQuery(t *testing.T) {
10351131
t.Parallel()
10361132
const shuffleShardSize = 5
@@ -2831,7 +2927,7 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []
28312927
EnableHATracker: true,
28322928
KVStore: kv.Config{Mock: mock},
28332929
UpdateTimeout: 100 * time.Millisecond,
2834-
FailoverTimeout: time.Second,
2930+
FailoverTimeout: time.Hour,
28352931
}
28362932
cfg.limits.HAMaxClusters = 100
28372933
}
@@ -2950,6 +3046,106 @@ func makeWriteRequestHA(samples int, replica, cluster string, histogram bool) *c
29503046
return request
29513047
}
29523048

3049+
func makeWriteRequestHAMixedSamples(samples int, histogram bool) *cortexpb.WriteRequest {
3050+
request := &cortexpb.WriteRequest{}
3051+
3052+
for _, haPair := range []struct {
3053+
cluster string
3054+
replica string
3055+
}{
3056+
{
3057+
cluster: "cluster0",
3058+
replica: "replica0",
3059+
},
3060+
{
3061+
cluster: "cluster0",
3062+
replica: "replica1",
3063+
},
3064+
{
3065+
cluster: "cluster1",
3066+
replica: "replica0",
3067+
},
3068+
{
3069+
cluster: "cluster1",
3070+
replica: "replica1",
3071+
},
3072+
{
3073+
cluster: "",
3074+
replica: "replicaNoCluster",
3075+
},
3076+
{
3077+
cluster: "clusterNoReplica",
3078+
replica: "",
3079+
},
3080+
{
3081+
cluster: "",
3082+
replica: "",
3083+
},
3084+
} {
3085+
cluster := haPair.cluster
3086+
replica := haPair.replica
3087+
var ts cortexpb.PreallocTimeseries
3088+
if cluster == "" && replica == "" {
3089+
ts = cortexpb.PreallocTimeseries{
3090+
TimeSeries: &cortexpb.TimeSeries{
3091+
Labels: []cortexpb.LabelAdapter{
3092+
{Name: "__name__", Value: "foo"},
3093+
{Name: "bar", Value: "baz"},
3094+
},
3095+
},
3096+
}
3097+
} else if cluster == "" && replica != "" {
3098+
ts = cortexpb.PreallocTimeseries{
3099+
TimeSeries: &cortexpb.TimeSeries{
3100+
Labels: []cortexpb.LabelAdapter{
3101+
{Name: "__name__", Value: "foo"},
3102+
{Name: "__replica__", Value: replica},
3103+
{Name: "bar", Value: "baz"},
3104+
},
3105+
},
3106+
}
3107+
} else if cluster != "" && replica == "" {
3108+
ts = cortexpb.PreallocTimeseries{
3109+
TimeSeries: &cortexpb.TimeSeries{
3110+
Labels: []cortexpb.LabelAdapter{
3111+
{Name: "__name__", Value: "foo"},
3112+
{Name: "bar", Value: "baz"},
3113+
{Name: "cluster", Value: cluster},
3114+
},
3115+
},
3116+
}
3117+
} else {
3118+
ts = cortexpb.PreallocTimeseries{
3119+
TimeSeries: &cortexpb.TimeSeries{
3120+
Labels: []cortexpb.LabelAdapter{
3121+
{Name: "__name__", Value: "foo"},
3122+
{Name: "__replica__", Value: replica},
3123+
{Name: "bar", Value: "baz"},
3124+
{Name: "cluster", Value: cluster},
3125+
},
3126+
},
3127+
}
3128+
}
3129+
if histogram {
3130+
ts.Histograms = []cortexpb.Histogram{
3131+
cortexpb.HistogramToHistogramProto(int64(samples), tsdbutil.GenerateTestHistogram(samples)),
3132+
}
3133+
} else {
3134+
var s = make([]cortexpb.Sample, 0)
3135+
for i := 0; i < samples; i++ {
3136+
sample := cortexpb.Sample{
3137+
Value: float64(i),
3138+
TimestampMs: int64(i),
3139+
}
3140+
s = append(s, sample)
3141+
}
3142+
ts.Samples = s
3143+
}
3144+
request.Timeseries = append(request.Timeseries, ts)
3145+
}
3146+
return request
3147+
}
3148+
29533149
func makeWriteRequestExemplar(seriesLabels []string, timestamp int64, exemplarLabels []string) *cortexpb.WriteRequest {
29543150
return &cortexpb.WriteRequest{
29553151
Timeseries: []cortexpb.PreallocTimeseries{

pkg/util/validation/limits.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ type Limits struct {
119119
IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"`
120120
IngestionBurstSize int `yaml:"ingestion_burst_size" json:"ingestion_burst_size"`
121121
AcceptHASamples bool `yaml:"accept_ha_samples" json:"accept_ha_samples"`
122+
AcceptMixedHASamples bool `yaml:"accept_mixed_ha_samples" json:"accept_mixed_ha_samples"`
122123
HAClusterLabel string `yaml:"ha_cluster_label" json:"ha_cluster_label"`
123124
HAReplicaLabel string `yaml:"ha_replica_label" json:"ha_replica_label"`
124125
HAMaxClusters int `yaml:"ha_max_clusters" json:"ha_max_clusters"`
@@ -220,6 +221,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
220221
f.StringVar(&l.IngestionRateStrategy, "distributor.ingestion-rate-limit-strategy", "local", "Whether the ingestion rate limit should be applied individually to each distributor instance (local), or evenly shared across the cluster (global).")
221222
f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).")
222223
f.BoolVar(&l.AcceptHASamples, "distributor.ha-tracker.enable-for-all-users", false, "Flag to enable, for all users, handling of samples with external labels identifying replicas in an HA Prometheus setup.")
224+
f.BoolVar(&l.AcceptMixedHASamples, "distributor.ha-tracker.mixed-ha-samples", false, "Flag to enable handling of samples with mixed external labels identifying replicas in an HA Prometheus setup. Supported only if -distributor.ha-tracker.enable-for-all-users is true.")
223225
f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.")
224226
f.StringVar(&l.HAReplicaLabel, "distributor.ha-tracker.replica", "__replica__", "Prometheus label to look for in samples to identify a Prometheus HA replica.")
225227
f.IntVar(&l.HAMaxClusters, "distributor.ha-tracker.max-clusters", 0, "Maximum number of clusters that HA tracker will keep track of for single user. 0 to disable the limit.")
@@ -547,6 +549,11 @@ func (o *Overrides) AcceptHASamples(userID string) bool {
547549
return o.GetOverridesForUser(userID).AcceptHASamples
548550
}
549551

552+
// AcceptMixedHASamples returns whether the distributor should track and accept mixed samples from HA replicas for this user.
553+
func (o *Overrides) AcceptMixedHASamples(userID string) bool {
554+
return o.GetOverridesForUser(userID).AcceptMixedHASamples
555+
}
556+
550557
// HAClusterLabel returns the cluster label to look for when deciding whether to accept a sample from a Prometheus HA replica.
551558
func (o *Overrides) HAClusterLabel(userID string) string {
552559
return o.GetOverridesForUser(userID).HAClusterLabel

0 commit comments

Comments
 (0)