Skip to content

Commit 9aa910f

Browse files
authored
Fix default memberlist configuration value for RetransmitMult. (#4269)
* Fix default memberlist configuration value for RetransmitMult. If configuration is not explicitly given for RetransmitMult (via `-memberlist.retransmit_factor`), then it is intended to be picked up from `DefaultLANConfig`. However, though the correct value was being used to configure `memberlist` itself, zero would be passed into the `TransmitLimitedQueue` used for broadcasting ring updates. This essentially means that ring updates are only ever gossiped once. Signed-off-by: Steve Simpson <[email protected]> * Add simplified integration test case. Signed-off-by: Steve Simpson <[email protected]> * Changelog. Signed-off-by: Steve Simpson <[email protected]> * Review comments. Signed-off-by: Steve Simpson <[email protected]> * Fix to race condition in unit test. The test was shutting down the KV store then attempting to read form it. Sometimes this would work if the KV took some time to shutdown, which it often will, but if it shuts down quickly, then the read will fail. Signed-off-by: Steve Simpson <[email protected]>
1 parent cae36dc commit 9aa910f

File tree

4 files changed

+94
-14
lines changed

4 files changed

+94
-14
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
* [BUGFIX] Ingester: fix issue where runtime limits erroneously override default limits. #4246
5050
* [BUGFIX] Ruler: fix startup in single-binary mode when the new `ruler_storage` is used. #4252
5151
* [BUGFIX] Querier: fix queries failing with "at least 1 healthy replica required, could only find 0" error right after scaling up store-gateways until they're ACTIVE in the ring. #4263
52+
* [BUGFIX] Memberlist: fix to setting the default configuration value for `-memberlist.retransmit-factor` when not provided. This should improve propagation delay of the ring state (including, but not limited to, tombstones). Note that if the configuration is already explicitly given, this fix has no effect. #4269
5253

5354
## Blocksconvert
5455

integration/integration_memberlist_single_binary_test.go

Lines changed: 89 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ package integration
55
import (
66
"crypto/x509"
77
"crypto/x509/pkix"
8+
"fmt"
89
"os"
910
"path/filepath"
1011
"testing"
1112
"time"
1213

1314
"github.com/stretchr/testify/require"
15+
"golang.org/x/sync/errgroup"
1416

1517
"github.com/cortexproject/cortex/integration/ca"
1618
"github.com/cortexproject/cortex/integration/e2e"
@@ -109,16 +111,16 @@ func testSingleBinaryEnv(t *testing.T, tlsEnabled bool) {
109111

110112
func newSingleBinary(name string, servername string, join string) *e2ecortex.CortexService {
111113
flags := map[string]string{
112-
"-ingester.final-sleep": "0s",
113-
"-ingester.join-after": "0s", // join quickly
114-
"-ingester.min-ready-duration": "0s",
115-
"-ingester.concurrent-flushes": "10",
116-
"-ingester.max-transfer-retries": "0", // disable
117-
"-ingester.num-tokens": "512",
118-
"-ingester.observe-period": "5s", // to avoid conflicts in tokens
119-
"-ring.store": "memberlist",
120-
"-memberlist.bind-port": "8000",
121-
"-memberlist.pullpush-interval": "3s", // speed up state convergence to make test faster and avoid flakiness
114+
"-ingester.final-sleep": "0s",
115+
"-ingester.join-after": "0s", // join quickly
116+
"-ingester.min-ready-duration": "0s",
117+
"-ingester.concurrent-flushes": "10",
118+
"-ingester.max-transfer-retries": "0", // disable
119+
"-ingester.num-tokens": "512",
120+
"-ingester.observe-period": "5s", // to avoid conflicts in tokens
121+
"-ring.store": "memberlist",
122+
"-memberlist.bind-port": "8000",
123+
"-memberlist.left-ingesters-timeout": "600s", // effectively disable
122124
}
123125

124126
if join != "" {
@@ -145,3 +147,80 @@ func newSingleBinary(name string, servername string, join string) *e2ecortex.Cor
145147
serv.SetBackoff(backOff)
146148
return serv
147149
}
150+
151+
func TestSingleBinaryWithMemberlistScaling(t *testing.T) {
152+
s, err := e2e.NewScenario(networkName)
153+
require.NoError(t, err)
154+
defer s.Close()
155+
156+
dynamo := e2edb.NewDynamoDB()
157+
require.NoError(t, s.StartAndWaitReady(dynamo))
158+
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))
159+
160+
// Scale up instances. These numbers seem enough to reliably reproduce some unwanted
161+
// consequences of slow propagation, such as missing tombstones.
162+
163+
maxCortex := 20
164+
minCortex := 3
165+
instances := make([]*e2ecortex.CortexService, 0)
166+
167+
for i := 0; i < maxCortex; i++ {
168+
name := fmt.Sprintf("cortex-%d", i+1)
169+
join := ""
170+
if i > 0 {
171+
join = e2e.NetworkContainerHostPort(networkName, "cortex-1", 8000)
172+
}
173+
c := newSingleBinary(name, "", join)
174+
require.NoError(t, s.StartAndWaitReady(c))
175+
instances = append(instances, c)
176+
}
177+
178+
// Sanity check the ring membership and give each instance time to see every other instance.
179+
180+
for _, c := range instances {
181+
require.NoError(t, c.WaitSumMetrics(e2e.Equals(float64(maxCortex)), "cortex_ring_members"))
182+
require.NoError(t, c.WaitSumMetrics(e2e.Equals(0), "memberlist_client_kv_store_value_tombstones"))
183+
}
184+
185+
// Scale down as fast as possible but cleanly, in order to send out tombstones.
186+
187+
stop := errgroup.Group{}
188+
for len(instances) > minCortex {
189+
i := len(instances) - 1
190+
c := instances[i]
191+
instances = instances[:i]
192+
stop.Go(func() error { return s.Stop(c) })
193+
}
194+
require.NoError(t, stop.Wait())
195+
196+
// If all is working as expected, then tombstones should have propagated easily within this time period.
197+
// The logging is mildly spammy, but it has proven extremely useful for debugging convergence cases.
198+
// We don't use WaitSumMetrics [over all instances] here so we can log the per-instance metrics.
199+
200+
expectedRingMembers := float64(minCortex)
201+
expectedTombstones := float64(maxCortex - minCortex)
202+
203+
require.Eventually(t, func() bool {
204+
ok := true
205+
for _, c := range instances {
206+
metrics, err := c.SumMetrics([]string{
207+
"cortex_ring_members", "memberlist_client_kv_store_value_tombstones",
208+
})
209+
require.NoError(t, err)
210+
t.Logf("%s: cortex_ring_members=%f memberlist_client_kv_store_value_tombstones=%f\n",
211+
c.Name(), metrics[0], metrics[1])
212+
213+
// Don't short circuit the check, so we log the state for all instances.
214+
if metrics[0] != expectedRingMembers {
215+
ok = false
216+
}
217+
if metrics[1] != expectedTombstones {
218+
ok = false
219+
}
220+
221+
}
222+
return ok
223+
}, 30*time.Second, 2*time.Second,
224+
"expected all instances to have %f ring members and %f tombstones",
225+
expectedRingMembers, expectedTombstones)
226+
}

pkg/ring/kv/memberlist/memberlist_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ func (m *KV) starting(_ context.Context) error {
415415
m.memberlist = list
416416
m.broadcasts = &memberlist.TransmitLimitedQueue{
417417
NumNodes: list.NumMembers,
418-
RetransmitMult: m.cfg.RetransmitMult,
418+
RetransmitMult: mlCfg.RetransmitMult,
419419
}
420420
m.initWG.Done()
421421

pkg/ring/kv/memberlist/memberlist_client_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -561,9 +561,6 @@ func TestMultipleClients(t *testing.T) {
561561
})
562562
cancel() // make linter happy
563563

564-
// Let clients exchange messages for a while
565-
close(stop)
566-
567564
t.Logf("Ring updates observed: %d", updates)
568565

569566
if updates < members {
@@ -615,6 +612,9 @@ func TestMultipleClients(t *testing.T) {
615612
}
616613
}
617614
}
615+
616+
// We cannot shutdown the KV until now in order for Get() to work reliably.
617+
close(stop)
618618
}
619619

620620
func TestJoinMembersWithRetryBackoff(t *testing.T) {

0 commit comments

Comments
 (0)