Skip to content

Commit 732140e

Browse files
authored
refactor: service group resource subscribe ratio (#6479)
* fix: mysql resource config error Signed-off-by: Ash <[email protected]> * chore: optimize unit test Signed-off-by: Ash <[email protected]> * refactor: subscribe rations Signed-off-by: Ash <[email protected]> * change base workload resources getter Signed-off-by: Ash <[email protected]> * add subscriberatios test cases Signed-off-by: Ash <[email protected]> * fix: subscribe ration cause resource zero Signed-off-by: Ash <[email protected]> * feat: sidecar use new over commit fn Signed-off-by: Ash <[email protected]> * fix: addon unit test Signed-off-by: Ash <[email protected]> * fix: sidecar resources inject Signed-off-by: Ash <[email protected]> * fix: deployment unit test Signed-off-by: Ash <[email protected]> * fix: new job unit test error Signed-off-by: Ash <[email protected]> * fix: scale application subscribe ration error Signed-off-by: Ash <[email protected]> * feat: remove AdjustCPUSize logic Signed-off-by: Ash <[email protected]> * rename to OverSubscribeRatios Signed-off-by: Ash <[email protected]> * move pod scale logic to new file Signed-off-by: Ash <[email protected]> * remove unuse func Signed-off-by: Ash <[email protected]> * move affinity logic to new file Signed-off-by: Ash <[email protected]> * move static function to new utils file Signed-off-by: Ash <[email protected]> * move check quota to module file and remove unuse func Signed-off-by: Ash <[email protected]> * move image&registry func to new file Signed-off-by: Ash <[email protected]> * move isito to new file Signed-off-by: Ash <[email protected]> * move service group operations to new file Signed-off-by: Ash <[email protected]> * remove unuse Addr func Signed-off-by: Ash <[email protected]> * refactor: over subscribe ratio module Signed-off-by: Ash <[email protected]> * add OverSubscribeRatios Signed-off-by: Ash <[email protected]> * fix: GetOverSubscribeRatios nil Signed-off-by: Ash <[email protected]> * optimize function location Signed-off-by: Ash <[email protected]> * fix: kubernetes unit test Signed-off-by: Ash <[email protected]> --------- Signed-off-by: Ash <[email protected]>
1 parent eeb85d7 commit 732140e

38 files changed

+3669
-2242
lines changed

internal/tools/orchestrator/scheduler/executor/plugins/k8s/addon/canal/canal.go

+32-58
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,23 @@ import (
2525

2626
"github.com/sirupsen/logrus"
2727
corev1 "k8s.io/api/core/v1"
28-
"k8s.io/apimachinery/pkg/api/resource"
2928
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3029

3130
"github.com/erda-project/erda/apistructs"
3231
"github.com/erda-project/erda/internal/tools/orchestrator/scheduler/executor/plugins/k8s/addon"
3332
canalv1 "github.com/erda-project/erda/internal/tools/orchestrator/scheduler/executor/plugins/k8s/addon/canal/v1"
33+
"github.com/erda-project/erda/internal/tools/orchestrator/scheduler/executor/util"
3434
"github.com/erda-project/erda/pkg/http/httpclient"
3535
"github.com/erda-project/erda/pkg/schedule/schedulepolicy/constraintbuilders"
36-
"github.com/erda-project/erda/pkg/strutil"
3736
)
3837

3938
type CanalOperator struct {
40-
k8s addon.K8SUtil
41-
ns addon.NamespaceUtil
42-
secret addon.SecretUtil
43-
pvc addon.PVCUtil
44-
client *httpclient.HTTPClient
39+
k8s addon.K8SUtil
40+
ns addon.NamespaceUtil
41+
overcommit addon.OverCommitUtil
42+
secret addon.SecretUtil
43+
pvc addon.PVCUtil
44+
client *httpclient.HTTPClient
4545
}
4646

4747
func (c *CanalOperator) Name(sg *apistructs.ServiceGroup) string {
@@ -58,13 +58,15 @@ func (c *CanalOperator) NamespacedName(sg *apistructs.ServiceGroup) string {
5858
return c.Namespace(sg) + "/" + c.Name(sg)
5959
}
6060

61-
func New(k8s addon.K8SUtil, ns addon.NamespaceUtil, secret addon.SecretUtil, pvc addon.PVCUtil, client *httpclient.HTTPClient) *CanalOperator {
61+
func New(k8s addon.K8SUtil, ns addon.NamespaceUtil, overcommit addon.OverCommitUtil,
62+
secret addon.SecretUtil, pvc addon.PVCUtil, client *httpclient.HTTPClient) *CanalOperator {
6263
return &CanalOperator{
63-
k8s: k8s,
64-
ns: ns,
65-
secret: secret,
66-
pvc: pvc,
67-
client: client,
64+
k8s: k8s,
65+
ns: ns,
66+
overcommit: overcommit,
67+
secret: secret,
68+
pvc: pvc,
69+
client: client,
6870
}
6971
}
7072

@@ -131,54 +133,13 @@ func (c *CanalOperator) Validate(sg *apistructs.ServiceGroup) error {
131133
return nil
132134
}
133135

134-
func (c *CanalOperator) Convert(sg *apistructs.ServiceGroup) interface{} {
136+
func (c *CanalOperator) Convert(sg *apistructs.ServiceGroup) (any, error) {
135137
canal := sg.Services[0]
136138

137139
scheinfo := sg.ScheduleInfo2
138140
scheinfo.Stateful = true
139141
affinity := constraintbuilders.K8S(&scheinfo, nil, nil, nil).Affinity
140142

141-
resources := corev1.ResourceRequirements{
142-
Requests: corev1.ResourceList{},
143-
Limits: corev1.ResourceList{},
144-
}
145-
adminResources := corev1.ResourceRequirements{
146-
Requests: corev1.ResourceList{},
147-
Limits: corev1.ResourceList{},
148-
}
149-
if canal.Resources.Cpu != 0 {
150-
cpu := resource.MustParse(strutil.Concat(strconv.Itoa(int(canal.Resources.Cpu*1000)), "m"))
151-
resources.Requests[corev1.ResourceCPU] = cpu
152-
153-
// 1/4
154-
cpu = resource.MustParse(strutil.Concat(strconv.Itoa(int(canal.Resources.Cpu*1000/4)), "m"))
155-
adminResources.Requests[corev1.ResourceCPU] = cpu
156-
}
157-
if canal.Resources.MaxCPU != 0 {
158-
maxCpu := resource.MustParse(strutil.Concat(strconv.Itoa(int(canal.Resources.MaxCPU*1000)), "m"))
159-
resources.Limits[corev1.ResourceCPU] = maxCpu
160-
161-
// 1/2
162-
maxCpu = resource.MustParse(strutil.Concat(strconv.Itoa(int(canal.Resources.MaxCPU*1000/2)), "m"))
163-
adminResources.Limits[corev1.ResourceCPU] = maxCpu
164-
}
165-
if canal.Resources.Mem != 0 {
166-
mem := resource.MustParse(strutil.Concat(strconv.Itoa(int(canal.Resources.Mem)), "Mi"))
167-
resources.Requests[corev1.ResourceMemory] = mem
168-
169-
// 1/3
170-
mem = resource.MustParse(strutil.Concat(strconv.Itoa(int(canal.Resources.Mem/3)), "Mi"))
171-
adminResources.Requests[corev1.ResourceMemory] = mem
172-
}
173-
if canal.Resources.MaxMem != 0 {
174-
maxMem := resource.MustParse(strutil.Concat(strconv.Itoa(int(canal.Resources.MaxMem)), "Mi"))
175-
resources.Limits[corev1.ResourceMemory] = maxMem
176-
177-
// 2/3
178-
maxMem = resource.MustParse(strutil.Concat(strconv.Itoa(int(canal.Resources.MaxMem*2/3)), "Mi"))
179-
adminResources.Limits[corev1.ResourceMemory] = maxMem
180-
}
181-
182143
v := "v1.1.5"
183144
if canal.Env["CANAL_VERSION"] != "" {
184145
v = canal.Env["CANAL_VERSION"]
@@ -206,6 +167,19 @@ func (c *CanalOperator) Convert(sg *apistructs.ServiceGroup) interface{} {
206167
}
207168
}
208169

170+
workspace, _ := util.GetDiceWorkspaceFromEnvs(canal.Env)
171+
containerResources, err := c.overcommit.ResourceOverCommit(workspace, canal.Resources)
172+
if err != nil {
173+
return nil, fmt.Errorf("failed to calc container resources, err: %v", err)
174+
}
175+
adminContainerResources, err := c.overcommit.ResourceOverCommit(workspace, apistructs.Resources{
176+
Cpu: canal.Resources.Cpu / 3,
177+
Mem: canal.Resources.Mem * 2 / 3,
178+
})
179+
if err != nil {
180+
return nil, fmt.Errorf("failed to calc admin container resources, err: %v", err)
181+
}
182+
209183
obj := &canalv1.Canal{
210184
TypeMeta: metav1.TypeMeta{
211185
APIVersion: "database.erda.cloud/v1",
@@ -221,8 +195,8 @@ func (c *CanalOperator) Convert(sg *apistructs.ServiceGroup) interface{} {
221195
Replicas: canal.Scale,
222196

223197
Affinity: &affinity,
224-
Resources: resources,
225-
AdminResources: adminResources,
198+
Resources: containerResources,
199+
AdminResources: adminContainerResources,
226200
Labels: make(map[string]string),
227201
CanalOptions: canalOptions,
228202
AdminOptions: adminOptions,
@@ -235,7 +209,7 @@ func (c *CanalOperator) Convert(sg *apistructs.ServiceGroup) interface{} {
235209

236210
addon.SetAddonLabelsAndAnnotations(canal, obj.Spec.Labels, obj.Spec.Annotations)
237211

238-
return obj
212+
return obj, nil
239213
}
240214

241215
func (c *CanalOperator) Create(k8syml interface{}) error {

internal/tools/orchestrator/scheduler/executor/plugins/k8s/addon/canal/canal_test.go

+145-58
Original file line numberDiff line numberDiff line change
@@ -18,83 +18,170 @@ import (
1818
"testing"
1919

2020
"github.com/golang/mock/gomock"
21+
"github.com/stretchr/testify/assert"
22+
corev1 "k8s.io/api/core/v1"
23+
"k8s.io/apimachinery/pkg/api/resource"
2124

2225
"github.com/erda-project/erda/apistructs"
23-
"github.com/erda-project/erda/internal/tools/orchestrator/scheduler/executor/plugins/k8s/addon/sourcecov/mock"
26+
"github.com/erda-project/erda/internal/tools/orchestrator/scheduler/executor/plugins/k8s/addon/mock"
2427
"github.com/erda-project/erda/pkg/http/httpclient"
2528
)
2629

27-
type k8s struct{}
30+
var sg = &apistructs.ServiceGroup{
31+
Dice: apistructs.Dice{
32+
ID: "mock-canal",
33+
Labels: map[string]string{
34+
"USE_OPERATOR": "canal",
35+
},
36+
Services: []apistructs.Service{
37+
{
38+
Name: "canal",
39+
Resources: apistructs.Resources{
40+
Cpu: 3,
41+
Mem: 3072,
42+
},
43+
Scale: 2,
44+
Env: map[string]string{
45+
apistructs.DiceWorkspaceEnvKey: apistructs.WORKSPACE_DEV,
46+
"CANAL_DESTINATION": "example",
47+
"canal.instance.master.address": "mock-mysql.svc.cluster.local:3306",
48+
"canal.instance.dbUsername": "erda",
49+
"canal.instance.dbPassword": "password",
50+
},
51+
},
52+
},
53+
},
54+
}
55+
56+
var sgCanalAdmin = &apistructs.ServiceGroup{
57+
Dice: apistructs.Dice{
58+
ID: "mock-canal",
59+
Labels: map[string]string{
60+
"USE_OPERATOR": "canal",
61+
},
62+
Services: []apistructs.Service{
63+
{
64+
Name: "canal",
65+
Resources: apistructs.Resources{
66+
Cpu: 1,
67+
Mem: 2048,
68+
},
69+
Scale: 2,
70+
Env: map[string]string{
71+
apistructs.DiceWorkspaceEnvKey: apistructs.WORKSPACE_DEV,
72+
"canal.admin.manager": "127.0.0.1:8089",
73+
"spring.datasource.address": "mock-mysql.svc.cluster.local:3306",
74+
"spring.datasource.username": "erda",
75+
"spring.datasource.password": "",
76+
},
77+
},
78+
},
79+
},
80+
}
2881

29-
func (k8s) GetK8SAddr() string {
30-
return ""
82+
var mockResourceRequirements = corev1.ResourceRequirements{
83+
Limits: corev1.ResourceList{
84+
corev1.ResourceCPU: resource.MustParse("3"),
85+
corev1.ResourceMemory: resource.MustParse("3072Mi"),
86+
},
87+
Requests: corev1.ResourceList{
88+
corev1.ResourceCPU: resource.MustParse("3"),
89+
corev1.ResourceMemory: resource.MustParse("3072Mi"),
90+
},
91+
}
92+
93+
var mockAdminResourceRequirements = corev1.ResourceRequirements{
94+
Limits: corev1.ResourceList{
95+
corev1.ResourceCPU: resource.MustParse("1"),
96+
corev1.ResourceMemory: resource.MustParse("2048Mi"),
97+
},
98+
Requests: corev1.ResourceList{
99+
corev1.ResourceCPU: resource.MustParse("1"),
100+
corev1.ResourceMemory: resource.MustParse("2048Mi"),
101+
},
31102
}
32103

33104
func TestCanalOperator(t *testing.T) {
34105
ctrl := gomock.NewController(t)
35106
defer ctrl.Finish()
36107

37-
ns := mock.NewMockNamespaceUtil(ctrl)
108+
// Create mock
109+
namespaceUtil := mock.NewMockNamespaceUtil(ctrl)
110+
overCommitUtil := mock.NewMockOverCommitUtil(ctrl)
111+
overCommitUtil.EXPECT().ResourceOverCommit(apistructs.DevWorkspace, sg.Services[0].Resources).
112+
Return(mockResourceRequirements, nil).AnyTimes()
113+
overCommitUtil.EXPECT().ResourceOverCommit(apistructs.DevWorkspace, sgCanalAdmin.Services[0].Resources).
114+
Return(mockAdminResourceRequirements, nil).AnyTimes()
115+
116+
k8sUtil := mock.NewMockK8SUtil(ctrl)
117+
k8sUtil.EXPECT().GetK8SAddr().Return("mock-k8s-addr").AnyTimes()
118+
119+
mo := New(k8sUtil, namespaceUtil, overCommitUtil, nil, nil, httpclient.New())
120+
121+
t.Run("Test Name and NamespacedName", func(t *testing.T) {
122+
assert.NotPanics(t, func() { mo.Name(sg) })
123+
assert.NotPanics(t, func() { mo.NamespacedName(sg) })
124+
})
125+
126+
t.Run("Test IsSupported", func(t *testing.T) {
127+
assert.NotPanics(t, func() { mo.IsSupported() })
128+
})
129+
130+
t.Run("Test Validate", func(t *testing.T) {
131+
assert.NotPanics(t, func() { mo.Validate(sg) })
132+
})
38133

39-
mo := New(new(k8s), ns, nil, nil, httpclient.New())
40-
sg := new(apistructs.ServiceGroup)
41-
sg.Services = append(sg.Services, apistructs.Service{
42-
Name: "canal",
134+
t.Run("Test Convert", func(t *testing.T) {
135+
_, err := mo.Convert(sg)
136+
assert.NoError(t, err)
137+
})
138+
139+
t.Run("Test CRUD Operations", func(t *testing.T) {
140+
assert.NotPanics(t, func() { mo.Create(sg) })
141+
assert.NotPanics(t, func() { mo.Inspect(sg) })
142+
assert.NotPanics(t, func() { mo.Update(sg) })
143+
assert.NotPanics(t, func() { mo.Remove(sg) })
43144
})
44-
sg.ID = "abcdefghigklmn"
45-
mo.Name(sg)
46-
mo.NamespacedName(sg)
47-
mo.IsSupported()
48-
mo.Validate(sg)
49-
sg.Labels = make(map[string]string)
50-
sg.Labels["USE_OPERATOR"] = "canal"
51-
mo.Validate(sg)
52-
sg.Services[0].Env = make(map[string]string)
53-
mo.Validate(sg)
54-
sg.Services[0].Env["CANAL_DESTINATION"] = "b"
55-
sg.Services[0].Env["canal.instance.master.address"] = "1"
56-
sg.Services[0].Env["canal.instance.master.address"] = "1"
57-
sg.Services[0].Env["canal.instance.dbUsername"] = "2"
58-
sg.Services[0].Env["canal.instance.dbPassword"] = "3"
59-
mo.Validate(sg)
60-
mo.Convert(sg)
61-
mo.Create(sg)
62-
mo.Inspect(sg)
63-
mo.Update(sg)
64-
mo.Remove(sg)
65145
}
66146

67-
func TestCanalOperator2(t *testing.T) {
147+
func TestCanalOperatorAdmin(t *testing.T) {
68148
ctrl := gomock.NewController(t)
69149
defer ctrl.Finish()
70150

71-
ns := mock.NewMockNamespaceUtil(ctrl)
151+
// Create mock
152+
namespaceUtil := mock.NewMockNamespaceUtil(ctrl)
153+
overCommitUtil := mock.NewMockOverCommitUtil(ctrl)
154+
overCommitUtil.EXPECT().ResourceOverCommit(apistructs.DevWorkspace, sg.Services[0].Resources).
155+
Return(mockResourceRequirements, nil).AnyTimes()
156+
overCommitUtil.EXPECT().ResourceOverCommit(apistructs.DevWorkspace, sgCanalAdmin.Services[0].Resources).
157+
Return(mockAdminResourceRequirements, nil).AnyTimes()
158+
159+
k8sUtil := mock.NewMockK8SUtil(ctrl)
160+
k8sUtil.EXPECT().GetK8SAddr().Return("mock-k8s-addr").AnyTimes()
161+
162+
mo := New(k8sUtil, namespaceUtil, overCommitUtil, nil, nil, httpclient.New())
163+
164+
t.Run("Test Name and NamespacedName", func(t *testing.T) {
165+
assert.NotPanics(t, func() { mo.Name(sg) })
166+
assert.NotPanics(t, func() { mo.NamespacedName(sg) })
167+
})
168+
169+
t.Run("Test IsSupported", func(t *testing.T) {
170+
assert.NotPanics(t, func() { mo.IsSupported() })
171+
})
172+
173+
t.Run("Test Validate", func(t *testing.T) {
174+
assert.NotPanics(t, func() { mo.Validate(sg) })
175+
})
176+
177+
t.Run("Test Convert", func(t *testing.T) {
178+
assert.NotPanics(t, func() { mo.Convert(sg) })
179+
})
72180

73-
mo := New(new(k8s), ns, nil, nil, httpclient.New())
74-
sg := new(apistructs.ServiceGroup)
75-
sg.Services = append(sg.Services, apistructs.Service{
76-
Name: "canal",
181+
t.Run("Test CRUD Operations", func(t *testing.T) {
182+
assert.NotPanics(t, func() { mo.Create(sg) })
183+
assert.NotPanics(t, func() { mo.Inspect(sg) })
184+
assert.NotPanics(t, func() { mo.Update(sg) })
185+
assert.NotPanics(t, func() { mo.Remove(sg) })
77186
})
78-
sg.ID = "abcdefghigklmn"
79-
mo.Name(sg)
80-
mo.NamespacedName(sg)
81-
mo.IsSupported()
82-
mo.Validate(sg)
83-
sg.Labels = make(map[string]string)
84-
sg.Labels["USE_OPERATOR"] = "canal"
85-
mo.Validate(sg)
86-
sg.Services[0].Env = make(map[string]string)
87-
mo.Validate(sg)
88-
sg.Services[0].Env["CANAL_DESTINATION"] = "b"
89-
sg.Services[0].Env["canal.admin.manager"] = "127.0.0.1:8089"
90-
sg.Services[0].Env["spring.datasource.address"] = "1"
91-
sg.Services[0].Env["spring.datasource.address"] = "1"
92-
sg.Services[0].Env["spring.datasource.username"] = "2"
93-
sg.Services[0].Env["spring.datasource.password"] = "3"
94-
mo.Validate(sg)
95-
mo.Convert(sg)
96-
mo.Create(sg)
97-
mo.Inspect(sg)
98-
mo.Update(sg)
99-
mo.Remove(sg)
100187
}

0 commit comments

Comments
 (0)