Skip to content

Commit 148f438

Browse files
presztakstgraber
authored andcommitted
incusd: Add cluster rebalance task
Signed-off-by: Piotr Resztak <[email protected]>
1 parent ea55380 commit 148f438

File tree

2 files changed

+394
-0
lines changed

2 files changed

+394
-0
lines changed

cmd/incusd/cluster_rebalance.go

Lines changed: 391 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,391 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"math"
8+
"sort"
9+
"strconv"
10+
"time"
11+
12+
internalInstance "github.com/lxc/incus/v6/internal/instance"
13+
"github.com/lxc/incus/v6/internal/server/cluster"
14+
"github.com/lxc/incus/v6/internal/server/db"
15+
dbCluster "github.com/lxc/incus/v6/internal/server/db/cluster"
16+
"github.com/lxc/incus/v6/internal/server/instance"
17+
"github.com/lxc/incus/v6/internal/server/instance/instancetype"
18+
"github.com/lxc/incus/v6/internal/server/project"
19+
"github.com/lxc/incus/v6/internal/server/state"
20+
"github.com/lxc/incus/v6/internal/server/task"
21+
"github.com/lxc/incus/v6/shared/api"
22+
"github.com/lxc/incus/v6/shared/logger"
23+
)
24+
25+
// ServerScore represents server score taken into account during load balancing.
26+
type ServerScore struct {
27+
NodeInfo db.NodeInfo
28+
Resources *api.Resources
29+
Score uint8
30+
}
31+
32+
// ServerUsage represents current server load.
33+
type ServerUsage struct {
34+
MemoryUsage uint64
35+
MemoryTotal uint64
36+
CPUUsage float64
37+
CPUTotal uint64
38+
}
39+
40+
// sortAndGroupByArch sorts servers by its score and groups them by cpu architecture.
41+
func sortAndGroupByArch(servers []*ServerScore) map[string][]*ServerScore {
42+
sort.Slice(servers, func(i, j int) bool {
43+
return servers[i].Score > servers[j].Score
44+
})
45+
46+
result := make(map[string][]*ServerScore)
47+
for _, s := range servers {
48+
arch := s.Resources.CPU.Architecture
49+
_, ok := result[arch]
50+
if !ok {
51+
result[arch] = []*ServerScore{}
52+
}
53+
54+
result[arch] = append(result[arch], s)
55+
}
56+
57+
return result
58+
}
59+
60+
// calculateScore calculates score for single server.
61+
func calculateScore(su *ServerUsage, au *ServerUsage) uint8 {
62+
memoryUsage := su.MemoryUsage
63+
memoryTotal := su.MemoryTotal
64+
cpuUsage := su.CPUUsage
65+
cpuTotal := su.CPUTotal
66+
67+
if au != nil {
68+
memoryUsage += au.MemoryUsage
69+
memoryTotal += au.MemoryTotal
70+
cpuUsage += au.CPUUsage
71+
cpuTotal += au.CPUTotal
72+
}
73+
74+
memoryScore := uint8(float64(memoryUsage) * 100 / float64(memoryTotal))
75+
cpuScore := uint8((cpuUsage * 100) / float64(cpuTotal))
76+
77+
return (memoryScore + cpuScore) / 2
78+
}
79+
80+
// calculateServersScore calculates score based on memory and CPU usage for servers in cluster.
81+
func calculateServersScore(s *state.State, members []db.NodeInfo) (map[string][]*ServerScore, error) {
82+
scores := []*ServerScore{}
83+
for _, member := range members {
84+
clusterMember, err := cluster.Connect(member.Address, s.Endpoints.NetworkCert(), s.ServerCert(), nil, true)
85+
if err != nil {
86+
return nil, fmt.Errorf("Failed to connect to cluster member: %w", err)
87+
}
88+
89+
res, err := clusterMember.GetServerResources()
90+
if err != nil {
91+
return nil, fmt.Errorf("Failed to get resources for cluster member: %w", err)
92+
}
93+
94+
su := &ServerUsage{
95+
MemoryUsage: res.Memory.Used,
96+
MemoryTotal: res.Memory.Total,
97+
CPUUsage: res.Load.Average1Min,
98+
CPUTotal: res.CPU.Total,
99+
}
100+
101+
serverScore := calculateScore(su, nil)
102+
scores = append(scores, &ServerScore{NodeInfo: member, Resources: res, Score: serverScore})
103+
}
104+
105+
return sortAndGroupByArch(scores), nil
106+
}
107+
108+
// clusterRebalanceServers is responsible for instances migration from most to less busy server.
109+
func clusterRebalanceServers(ctx context.Context, s *state.State, srcServer *ServerScore, dstServer *ServerScore, maxToMigrate int64) (int64, error) {
110+
numOfMigrated := int64(0)
111+
112+
// Keep track of project restrictions.
113+
projectStatuses := map[string]bool{}
114+
115+
// Get a list of migratable instances.
116+
var dbInstances []dbCluster.Instance
117+
err := s.DB.Cluster.Transaction(ctx, func(ctx context.Context, tx *db.ClusterTx) error {
118+
var err error
119+
120+
// Get the instance list.
121+
instType := instancetype.VM
122+
dbInstances, err = dbCluster.GetInstances(ctx, tx.Tx(), dbCluster.InstanceFilter{Node: &dstServer.NodeInfo.Name, Type: &instType})
123+
if err != nil {
124+
return fmt.Errorf("Failed to get instances: %w", err)
125+
}
126+
127+
// Check project restrictions.
128+
for _, dbInst := range dbInstances {
129+
_, ok := projectStatuses[dbInst.Project]
130+
if ok {
131+
continue
132+
}
133+
134+
dbProject, err := dbCluster.GetProject(ctx, tx.Tx(), dbInst.Project)
135+
if err != nil {
136+
return fmt.Errorf("Failed to get project: %w", err)
137+
}
138+
139+
apiProject, err := dbProject.ToAPI(ctx, tx.Tx())
140+
if err != nil {
141+
return fmt.Errorf("Failed to load project: %w", err)
142+
}
143+
144+
_, _, err = project.CheckTarget(ctx, s.Authorizer, nil, tx, apiProject, dstServer.NodeInfo.Name, []db.NodeInfo{dstServer.NodeInfo})
145+
projectStatuses[dbInst.Project] = err == nil
146+
}
147+
148+
return nil
149+
})
150+
if err != nil {
151+
return -1, fmt.Errorf("Failed to get instances: %w", err)
152+
}
153+
154+
// Filter for instances that can be live migrated to the new target.
155+
var instances []instance.Instance
156+
for _, dbInst := range dbInstances {
157+
if !projectStatuses[dbInst.Project] {
158+
// Project restrictions prevent moving to that target.
159+
continue
160+
}
161+
162+
inst, err := instance.LoadByProjectAndName(s, dbInst.Project, dbInst.Name)
163+
if err != nil {
164+
return -1, fmt.Errorf("Failed to load instance: %w", err)
165+
}
166+
167+
// Do not allow to migrate instance which doesn't support live migration.
168+
if inst.CanMigrate() != "live-migrate" {
169+
continue
170+
}
171+
172+
// Check if instance is ready for next migration.
173+
lastMove := inst.LocalConfig()["volatile.rebalance.last_move"]
174+
cooldown := s.GlobalConfig.ClusterRebalanceCooldown()
175+
if lastMove != "" {
176+
v, err := strconv.ParseInt(lastMove, 10, 64)
177+
if err != nil {
178+
return -1, fmt.Errorf("Failed to parse last_move value: %w", err)
179+
}
180+
181+
expiry, err := internalInstance.GetExpiry(time.Unix(v, 0), cooldown)
182+
if err != nil {
183+
return -1, fmt.Errorf("Failed to calculate expiration for cooldown time: %w", err)
184+
}
185+
186+
if time.Now().Before(expiry) {
187+
continue
188+
}
189+
}
190+
191+
instances = append(instances, inst)
192+
}
193+
194+
// Calculate current and target scores.
195+
targetScore := (srcServer.Score + dstServer.Score) / 2
196+
currentScore := dstServer.Score
197+
targetServerUsage := &ServerUsage{
198+
MemoryUsage: dstServer.Resources.Memory.Used,
199+
MemoryTotal: dstServer.Resources.Memory.Total,
200+
CPUUsage: dstServer.Resources.Load.Average1Min,
201+
CPUTotal: dstServer.Resources.CPU.Total,
202+
}
203+
204+
// Prepare the API client.
205+
srcNode, err := cluster.Connect(srcServer.NodeInfo.Address, s.Endpoints.NetworkCert(), s.ServerCert(), nil, true)
206+
if err != nil {
207+
return -1, fmt.Errorf("Failed to connect to cluster member: %w", err)
208+
}
209+
210+
srcNode = srcNode.UseTarget(dstServer.NodeInfo.Name)
211+
212+
for _, inst := range instances {
213+
if numOfMigrated >= maxToMigrate {
214+
// We're done moving instances for now.
215+
return numOfMigrated, nil
216+
}
217+
218+
if currentScore >= targetScore {
219+
// We've balanced the load.
220+
return numOfMigrated, nil
221+
}
222+
223+
// Calculate resource consumption.
224+
cpuUsage, memUsage, _, err := instance.ResourceUsage(inst.ExpandedConfig(), inst.ExpandedDevices().CloneNative(), api.InstanceType(inst.Type().String()))
225+
if err != nil {
226+
return -1, fmt.Errorf("Failed to establish instance resource usage: %w", err)
227+
}
228+
229+
// Calculate impact of migration.
230+
additionalUsage := &ServerUsage{
231+
MemoryUsage: uint64(cpuUsage),
232+
CPUUsage: float64(memUsage),
233+
}
234+
235+
expectedScore := calculateScore(targetServerUsage, additionalUsage)
236+
if expectedScore >= targetScore {
237+
// Skip the instance as it would have too big an impact.
238+
continue
239+
}
240+
241+
// Prepare for live migration.
242+
req := api.InstancePost{
243+
Migration: true,
244+
Live: true,
245+
}
246+
247+
migrationOp, err := srcNode.MigrateInstance(inst.Name(), req)
248+
if err != nil {
249+
return -1, fmt.Errorf("Migration API failure: %w", err)
250+
}
251+
252+
err = migrationOp.Wait()
253+
if err != nil {
254+
return -1, fmt.Errorf("Failed to wait for migration to finish: %w", err)
255+
}
256+
257+
// Record the migration in the instance volatile storage.
258+
err = inst.VolatileSet(map[string]string{"volatile.rebalance.last_move": strconv.FormatInt(time.Now().Unix(), 10)})
259+
if err != nil {
260+
return -1, err
261+
}
262+
263+
// Update counters and scores.
264+
numOfMigrated += 1
265+
currentScore = expectedScore
266+
targetServerUsage.MemoryUsage += additionalUsage.MemoryUsage
267+
targetServerUsage.CPUUsage += additionalUsage.CPUUsage
268+
}
269+
270+
return numOfMigrated, nil
271+
}
272+
273+
// clusterRebalance performs cluster re-balancing.
274+
func clusterRebalance(ctx context.Context, s *state.State, servers map[string][]*ServerScore) error {
275+
rebalanceThreshold := s.GlobalConfig.ClusterRebalanceThreshold()
276+
rebalanceBatch := s.GlobalConfig.ClusterRebalanceBatch()
277+
numOfMigrated := int64(0)
278+
279+
for archName, v := range servers {
280+
if numOfMigrated >= rebalanceBatch {
281+
// Maximum number of instances already migrated in this run.
282+
continue
283+
}
284+
285+
if len(v) < 2 {
286+
// Skip if there isn't at least 2 servers with specific arch.
287+
continue
288+
}
289+
290+
if v[0].Score == 0 {
291+
// Don't migrate anything if most loaded isn't loaded.
292+
continue
293+
}
294+
295+
leastBusyIndex := len(v) - 1
296+
percentageChange := int64(float64(v[0].Score-v[leastBusyIndex].Score) / float64(v[0].Score) * 100)
297+
logger.Debug("Automatic re-balancing", logger.Ctx{"Architecture": archName, "LeastBusy": v[leastBusyIndex].NodeInfo.Name, "LeastBusyScore": v[leastBusyIndex].Score, "MostBusy": v[0].NodeInfo.Name, "MostBusyScore": v[0].Score, "Difference": fmt.Sprintf("%d%%", percentageChange), "Threshold": fmt.Sprintf("%d%%", rebalanceThreshold)})
298+
299+
if percentageChange < rebalanceThreshold {
300+
continue // Skip as threshold condition is not met.
301+
}
302+
303+
n, err := clusterRebalanceServers(ctx, s, v[0], v[leastBusyIndex], rebalanceBatch-numOfMigrated)
304+
if err != nil {
305+
return fmt.Errorf("Failed to rebalance cluster: %w", err)
306+
}
307+
308+
numOfMigrated += n
309+
}
310+
311+
return nil
312+
}
313+
314+
func autoRebalanceCluster(ctx context.Context, d *Daemon) error {
315+
s := d.State()
316+
317+
// Confirm we should run the rebalance.
318+
leader, err := s.Cluster.LeaderAddress()
319+
if err != nil {
320+
if errors.Is(err, cluster.ErrNodeIsNotClustered) {
321+
// Not clustered.
322+
return nil
323+
}
324+
325+
return fmt.Errorf("Failed to get leader cluster member address: %w", err)
326+
}
327+
328+
if s.LocalConfig.ClusterAddress() != leader {
329+
// Not the leader.
330+
return nil
331+
}
332+
333+
// Get all online members
334+
var onlineMembers []db.NodeInfo
335+
err = s.DB.Cluster.Transaction(ctx, func(ctx context.Context, tx *db.ClusterTx) error {
336+
members, err := tx.GetNodes(ctx)
337+
if err != nil {
338+
return fmt.Errorf("Failed getting cluster members: %w", err)
339+
}
340+
341+
onlineMembers, err = tx.GetCandidateMembers(ctx, members, nil, "", nil, s.GlobalConfig.OfflineThreshold())
342+
if err != nil {
343+
return fmt.Errorf("Failed getting online cluster members: %w", err)
344+
}
345+
346+
return nil
347+
})
348+
if err != nil {
349+
return fmt.Errorf("Failed getting cluster members: %w", err)
350+
}
351+
352+
servers, err := calculateServersScore(s, onlineMembers)
353+
if err != nil {
354+
return fmt.Errorf("Failed calculating servers score: %w", err)
355+
}
356+
357+
err = clusterRebalance(ctx, s, servers)
358+
if err != nil {
359+
return fmt.Errorf("Failed rebalancing cluster: %w", err)
360+
}
361+
362+
return nil
363+
}
364+
365+
func autoRebalanceClusterTask(d *Daemon) (task.Func, task.Schedule) {
366+
f := func(ctx context.Context) {
367+
s := d.State()
368+
369+
// Check that we should run now.
370+
interval := s.GlobalConfig.ClusterRebalanceInterval()
371+
if interval <= 0 {
372+
// Re-balance is disabled.
373+
return
374+
}
375+
376+
now := time.Now()
377+
elapsed := int64(math.Round(now.Sub(s.StartTime).Minutes()))
378+
if elapsed%interval != 0 {
379+
// It's not time for a re-balance.
380+
return
381+
}
382+
383+
// Run the rebalance.
384+
err := autoRebalanceCluster(ctx, d)
385+
if err != nil {
386+
logger.Error("Failed during cluster auto rebalancing", logger.Ctx{"err": err})
387+
}
388+
}
389+
390+
return f, task.Every(time.Minute)
391+
}

0 commit comments

Comments
 (0)