Skip to content

Commit bc658ae

Browse files
JmPotatoti-chi-bot
authored andcommitted
This is an automated cherry-pick of #9171
close #9157 Signed-off-by: ti-chi-bot <[email protected]>
1 parent d30b54f commit bc658ae

File tree

14 files changed

+1370
-14
lines changed

14 files changed

+1370
-14
lines changed

errors.toml

+6
Original file line numberDiff line numberDiff line change
@@ -851,6 +851,7 @@ error = '''
851851
reset user timestamp failed, %s
852852
'''
853853

854+
<<<<<<< HEAD
854855
["PD:tso:ErrSetLocalTSOConfig"]
855856
error = '''
856857
set local tso config failed, %s
@@ -859,6 +860,11 @@ set local tso config failed, %s
859860
["PD:tso:ErrSyncMaxTS"]
860861
error = '''
861862
sync max ts failed, %s
863+
=======
864+
["PD:tso:ErrSaveTimestamp"]
865+
error = '''
866+
save timestamp failed, %s
867+
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
862868
'''
863869

864870
["PD:tso:ErrUpdateTimestamp"]

pkg/election/leadership.go

+34-3
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type Leadership struct {
6363
client *clientv3.Client
6464
// leaderKey and leaderValue are key-value pair in etcd
6565
leaderKey string
66-
leaderValue string
66+
leaderValue atomic.Value // Stored as string
6767

6868
keepAliveCtx context.Context
6969
keepAliveCancelFunc context.CancelFunc
@@ -114,6 +114,31 @@ func (ls *Leadership) GetLeaderKey() string {
114114
return ls.leaderKey
115115
}
116116

117+
<<<<<<< HEAD
118+
=======
119+
// GetLeaderValue is used to get the leader value saved in etcd.
120+
func (ls *Leadership) GetLeaderValue() string {
121+
if ls == nil {
122+
return ""
123+
}
124+
leaderValue := ls.leaderValue.Load()
125+
if leaderValue == nil {
126+
return ""
127+
}
128+
return leaderValue.(string)
129+
}
130+
131+
// SetPrimaryWatch sets the primary watch flag.
132+
func (ls *Leadership) SetPrimaryWatch(val bool) {
133+
ls.primaryWatch.Store(val)
134+
}
135+
136+
// IsPrimary gets the primary watch flag.
137+
func (ls *Leadership) IsPrimary() bool {
138+
return ls.primaryWatch.Load()
139+
}
140+
141+
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
117142
// GetCampaignTimesNum is used to get the campaign times of the leader within `campaignTimesRecordTimeout`.
118143
// Need to make sure `AddCampaignTimes` is called before this function.
119144
func (ls *Leadership) GetCampaignTimesNum() int {
@@ -150,7 +175,7 @@ func (ls *Leadership) AddCampaignTimes() {
150175

151176
// Campaign is used to campaign the leader with given lease and returns a leadership
152177
func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error {
153-
ls.leaderValue = leaderData
178+
ls.leaderValue.Store(leaderData)
154179
// Create a new lease to campaign
155180
newLease := &lease{
156181
Purpose: ls.purpose,
@@ -216,7 +241,7 @@ func (ls *Leadership) LeaderTxn(cs ...clientv3.Cmp) clientv3.Txn {
216241
}
217242

218243
func (ls *Leadership) leaderCmp() clientv3.Cmp {
219-
return clientv3.Compare(clientv3.Value(ls.leaderKey), "=", ls.leaderValue)
244+
return clientv3.Compare(clientv3.Value(ls.leaderKey), "=", ls.GetLeaderValue())
220245
}
221246

222247
// DeleteLeaderKey deletes the corresponding leader from etcd by the leaderPath as the key.
@@ -393,5 +418,11 @@ func (ls *Leadership) Reset() {
393418
ls.keepAliveCancelFunc()
394419
}
395420
ls.keepAliveCancelFuncLock.Unlock()
421+
<<<<<<< HEAD
396422
ls.getLease().Close()
423+
=======
424+
ls.GetLease().Close()
425+
ls.SetPrimaryWatch(false)
426+
ls.leaderValue.Store("")
427+
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
397428
}

pkg/errs/errno.go

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ var (
4747
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
4848
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
4949
ErrUpdateTimestamp = errors.Normalize("update timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrUpdateTimestamp"))
50+
ErrSaveTimestamp = errors.Normalize("save timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrSaveTimestamp"))
5051
ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow"))
5152
ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout"))
5253
ErrKeyspaceGroupIDInvalid = errors.Normalize("the keyspace group id is invalid, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIDInvalid"))

pkg/errs/errs.go

+13
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515
package errs
1616

1717
import (
18+
<<<<<<< HEAD
1819
"github.com/pingcap/errors"
20+
=======
21+
"strings"
22+
23+
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
1924
"go.uber.org/zap"
2025
"go.uber.org/zap/zapcore"
2126
)
@@ -34,3 +39,11 @@ func ZapError(err error, causeError ...error) zap.Field {
3439
}
3540
return zap.Field{Key: "error", Type: zapcore.ErrorType, Interface: err}
3641
}
42+
43+
// IsLeaderChanged returns true if the error is due to leader changed.
44+
func IsLeaderChanged(err error) bool {
45+
if err == nil {
46+
return false
47+
}
48+
return strings.Contains(err.Error(), NotLeaderErr)
49+
}

pkg/storage/endpoint/tso.go

+46
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ import (
2121

2222
"github.com/pingcap/errors"
2323
"github.com/pingcap/log"
24+
<<<<<<< HEAD
25+
=======
26+
27+
"github.com/tikv/pd/pkg/election"
28+
"github.com/tikv/pd/pkg/errs"
29+
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
2430
"github.com/tikv/pd/pkg/storage/kv"
2531
"github.com/tikv/pd/pkg/utils/typeutil"
2632
"go.etcd.io/etcd/clientv3"
@@ -29,9 +35,15 @@ import (
2935

3036
// TSOStorage is the interface for timestamp storage.
3137
type TSOStorage interface {
38+
<<<<<<< HEAD
3239
LoadTimestamp(prefix string) (time.Time, error)
3340
SaveTimestamp(key string, ts time.Time) error
3441
DeleteTimestamp(key string) error
42+
=======
43+
LoadTimestamp(groupID uint32) (time.Time, error)
44+
SaveTimestamp(groupID uint32, ts time.Time, leadership *election.Leadership) error
45+
DeleteTimestamp(groupID uint32) error
46+
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
3547
}
3648

3749
var _ TSOStorage = (*StorageEndpoint)(nil)
@@ -67,10 +79,40 @@ func (se *StorageEndpoint) LoadTimestamp(prefix string) (time.Time, error) {
6779
return maxTSWindow, nil
6880
}
6981

82+
<<<<<<< HEAD
7083
// SaveTimestamp saves the timestamp to the storage.
7184
func (se *StorageEndpoint) SaveTimestamp(key string, ts time.Time) error {
7285
return se.RunInTxn(context.Background(), func(txn kv.Txn) error {
7386
value, err := txn.Load(key)
87+
=======
88+
// SaveTimestamp saves the timestamp to the storage. The leadership is used to check if the current server is leader
89+
// before saving the timestamp to ensure a strong consistency for persistence of the TSO timestamp window.
90+
func (se *StorageEndpoint) SaveTimestamp(groupID uint32, ts time.Time, leadership *election.Leadership) error {
91+
logFilds := []zap.Field{
92+
zap.Uint32("group-id", groupID),
93+
zap.Time("ts", ts),
94+
zap.String("leader-key", leadership.GetLeaderKey()),
95+
zap.String("expected-leader-value", leadership.GetLeaderValue()),
96+
}
97+
log.Info("saving timestamp to the storage", logFilds...)
98+
// The PD leadership or TSO primary will always be granted first before the TSO timestamp window is saved.
99+
// So we here check whether the leader value is filled to see if the requirement is met.
100+
if len(leadership.GetLeaderValue()) == 0 {
101+
return errors.Errorf("%s due to leadership has not been granted yet", errs.NotLeaderErr)
102+
}
103+
return se.RunInTxn(context.Background(), func(txn kv.Txn) error {
104+
// Ensure the current server is leader by reading and comparing the leader value.
105+
leaderValue, err := txn.Load(leadership.GetLeaderKey())
106+
if err != nil {
107+
return err
108+
}
109+
if expected := leadership.GetLeaderValue(); leaderValue != expected {
110+
log.Error("leader value does not match", append(logFilds, zap.String("current-leader-value", leaderValue))...)
111+
return errors.Errorf("%s due to leader value does not match, current: %s, expected: %s", errs.NotLeaderErr, leaderValue, expected)
112+
}
113+
114+
value, err := txn.Load(keypath.TimestampPath(groupID))
115+
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
74116
if err != nil {
75117
return err
76118
}
@@ -79,7 +121,11 @@ func (se *StorageEndpoint) SaveTimestamp(key string, ts time.Time) error {
79121
if value != "" {
80122
previousTS, err = typeutil.ParseTimestamp([]byte(value))
81123
if err != nil {
124+
<<<<<<< HEAD
82125
log.Error("parse timestamp failed", zap.String("key", key), zap.String("value", value), zap.Error(err))
126+
=======
127+
log.Error("parse timestamp failed", append(logFilds, zap.String("value", value), zap.Error(err))...)
128+
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
83129
return err
84130
}
85131
}

pkg/storage/storage_tso_test.go

+84-2
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,44 @@ import (
2121
"time"
2222

2323
"github.com/stretchr/testify/require"
24+
<<<<<<< HEAD
2425
"github.com/tikv/pd/pkg/storage/endpoint"
2526
"github.com/tikv/pd/pkg/utils/etcdutil"
2627
)
2728

29+
=======
30+
31+
"github.com/tikv/pd/pkg/election"
32+
"github.com/tikv/pd/pkg/errs"
33+
"github.com/tikv/pd/pkg/utils/etcdutil"
34+
)
35+
36+
const (
37+
testGroupID = uint32(1)
38+
testLeaderKey = "test-leader-key"
39+
testLeaderValue = "test-leader-value"
40+
)
41+
42+
func prepare(t *testing.T) (storage Storage, clean func(), leadership *election.Leadership) {
43+
_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
44+
storage = NewStorageWithEtcdBackend(client)
45+
leadership = election.NewLeadership(client, testLeaderKey, "storage_tso_test")
46+
err := leadership.Campaign(60, testLeaderValue)
47+
require.NoError(t, err)
48+
return storage, clean, leadership
49+
}
50+
51+
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
2852
func TestSaveLoadTimestamp(t *testing.T) {
2953
re := require.New(t)
30-
storage, clean := newTestStorage(t)
54+
storage, clean, leadership := prepare(t)
3155
defer clean()
3256
expectedTS := time.Now().Round(0)
57+
<<<<<<< HEAD
3358
err := storage.SaveTimestamp(endpoint.TimestampKey, expectedTS)
59+
=======
60+
err := storage.SaveTimestamp(testGroupID, expectedTS, leadership)
61+
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
3462
re.NoError(err)
3563
ts, err := storage.LoadTimestamp("")
3664
re.NoError(err)
@@ -67,23 +95,77 @@ func TestGlobalLocalTimestamp(t *testing.T) {
6795

6896
func TestTimestampTxn(t *testing.T) {
6997
re := require.New(t)
70-
storage, clean := newTestStorage(t)
98+
storage, clean, leadership := prepare(t)
7199
defer clean()
72100
globalTS1 := time.Now().Round(0)
101+
<<<<<<< HEAD
73102
err := storage.SaveTimestamp(endpoint.TimestampKey, globalTS1)
74103
re.NoError(err)
75104

76105
globalTS2 := globalTS1.Add(-time.Millisecond).Round(0)
77106
err = storage.SaveTimestamp(endpoint.TimestampKey, globalTS2)
107+
=======
108+
err := storage.SaveTimestamp(testGroupID, globalTS1, leadership)
109+
re.NoError(err)
110+
111+
globalTS2 := globalTS1.Add(-time.Millisecond).Round(0)
112+
err = storage.SaveTimestamp(testGroupID, globalTS2, leadership)
113+
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
78114
re.Error(err)
79115

80116
ts, err := storage.LoadTimestamp("")
81117
re.NoError(err)
82118
re.Equal(globalTS1, ts)
83119
}
84120

121+
<<<<<<< HEAD
85122
func newTestStorage(t *testing.T) (Storage, func()) {
86123
_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
87124
rootPath := path.Join("/pd", strconv.FormatUint(100, 10))
88125
return NewStorageWithEtcdBackend(client, rootPath), clean
126+
=======
127+
func TestSaveTimestampWithLeaderCheck(t *testing.T) {
128+
re := require.New(t)
129+
storage, clean, leadership := prepare(t)
130+
defer clean()
131+
132+
// testLeaderKey -> testLeaderValue
133+
globalTS := time.Now().Round(0)
134+
err := storage.SaveTimestamp(testGroupID, globalTS, leadership)
135+
re.NoError(err)
136+
ts, err := storage.LoadTimestamp(testGroupID)
137+
re.NoError(err)
138+
re.Equal(globalTS, ts)
139+
140+
err = storage.SaveTimestamp(testGroupID, globalTS.Add(time.Second), &election.Leadership{})
141+
re.True(errs.IsLeaderChanged(err))
142+
ts, err = storage.LoadTimestamp(testGroupID)
143+
re.NoError(err)
144+
re.Equal(globalTS, ts)
145+
146+
// testLeaderKey -> ""
147+
storage.Save(leadership.GetLeaderKey(), "")
148+
err = storage.SaveTimestamp(testGroupID, globalTS.Add(time.Second), leadership)
149+
re.True(errs.IsLeaderChanged(err))
150+
ts, err = storage.LoadTimestamp(testGroupID)
151+
re.NoError(err)
152+
re.Equal(globalTS, ts)
153+
154+
// testLeaderKey -> non-existent
155+
storage.Remove(leadership.GetLeaderKey())
156+
err = storage.SaveTimestamp(testGroupID, globalTS.Add(time.Second), leadership)
157+
re.True(errs.IsLeaderChanged(err))
158+
ts, err = storage.LoadTimestamp(testGroupID)
159+
re.NoError(err)
160+
re.Equal(globalTS, ts)
161+
162+
// testLeaderKey -> testLeaderValue
163+
storage.Save(leadership.GetLeaderKey(), testLeaderValue)
164+
globalTS = globalTS.Add(time.Second)
165+
err = storage.SaveTimestamp(testGroupID, globalTS, leadership)
166+
re.NoError(err)
167+
ts, err = storage.LoadTimestamp(testGroupID)
168+
re.NoError(err)
169+
re.Equal(globalTS, ts)
170+
>>>>>>> fda80ebb9 (tso: enhance timestamp persistency with strong leader consistency (#9171))
89171
}

0 commit comments

Comments
 (0)