Skip to content

Kubernetes Driver: switch to 'StatefulSet' from 'Deployment' #2938

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions driver/kubernetes/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ type Driver struct {

// if you add fields, remember to update docs:
// https://github.com/docker/docs/blob/main/content/build/drivers/kubernetes.md
minReplicas int
deployment *appsv1.Deployment
configMaps []*corev1.ConfigMap
clientset *kubernetes.Clientset
deploymentClient clientappsv1.DeploymentInterface
podClient clientcorev1.PodInterface
configMapClient clientcorev1.ConfigMapInterface
podChooser podchooser.PodChooser
defaultLoad bool
timeout time.Duration
minReplicas int
statefulSet *appsv1.StatefulSet
configMaps []*corev1.ConfigMap
clientset *kubernetes.Clientset
statefulSetClient clientappsv1.StatefulSetInterface
podClient clientcorev1.PodInterface
configMapClient clientcorev1.ConfigMapInterface
podChooser podchooser.PodChooser
defaultLoad bool
timeout time.Duration
}

func (d *Driver) IsMobyDriver() bool {
Expand All @@ -65,10 +65,10 @@ func (d *Driver) Config() driver.InitConfig {

func (d *Driver) Bootstrap(ctx context.Context, l progress.Logger) error {
return progress.Wrap("[internal] booting buildkit", l, func(sub progress.SubLogger) error {
_, err := d.deploymentClient.Get(ctx, d.deployment.Name, metav1.GetOptions{})
_, err := d.statefulSetClient.Get(ctx, d.statefulSet.Name, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error for bootstrap %q", d.deployment.Name)
return errors.Wrapf(err, "error for bootstrap %q", d.statefulSet.Name)
}

for _, cfg := range d.configMaps {
Expand All @@ -85,9 +85,9 @@ func (d *Driver) Bootstrap(ctx context.Context, l progress.Logger) error {
}
}

_, err = d.deploymentClient.Create(ctx, d.deployment, metav1.CreateOptions{})
_, err = d.statefulSetClient.Create(ctx, d.statefulSet, metav1.CreateOptions{})
if err != nil {
return errors.Wrapf(err, "error while calling deploymentClient.Create for %q", d.deployment.Name)
return errors.Wrapf(err, "error while calling statefulSetClient.Create for %q", d.statefulSet.Name)
}
}
return sub.Wrap(
Expand All @@ -102,7 +102,7 @@ func (d *Driver) wait(ctx context.Context) error {
// TODO: use watch API
var (
err error
depl *appsv1.Deployment
stat *appsv1.StatefulSet
)

timeoutChan := time.After(d.timeout)
Expand All @@ -116,31 +116,31 @@ func (d *Driver) wait(ctx context.Context) error {
case <-timeoutChan:
return err
case <-ticker.C:
depl, err = d.deploymentClient.Get(ctx, d.deployment.Name, metav1.GetOptions{})
stat, err = d.statefulSetClient.Get(ctx, d.statefulSet.Name, metav1.GetOptions{})
if err == nil {
if depl.Status.ReadyReplicas >= int32(d.minReplicas) {
if stat.Status.ReadyReplicas >= int32(d.minReplicas) {
return nil
}
err = errors.Errorf("expected %d replicas to be ready, got %d", d.minReplicas, depl.Status.ReadyReplicas)
err = errors.Errorf("expected %d replicas to be ready, got %d", d.minReplicas, stat.Status.ReadyReplicas)
}
}
}
}

func (d *Driver) Info(ctx context.Context) (*driver.Info, error) {
depl, err := d.deploymentClient.Get(ctx, d.deployment.Name, metav1.GetOptions{})
stat, err := d.statefulSetClient.Get(ctx, d.statefulSet.Name, metav1.GetOptions{})
if err != nil {
// TODO: return err if err != ErrNotFound
return &driver.Info{
Status: driver.Inactive,
}, nil
}
if depl.Status.ReadyReplicas <= 0 {
if stat.Status.ReadyReplicas <= 0 {
return &driver.Info{
Status: driver.Stopped,
}, nil
}
pods, err := podchooser.ListRunningPods(ctx, d.podClient, depl)
pods, err := podchooser.ListRunningPods(ctx, d.podClient, stat)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -182,9 +182,9 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil
}

if err := d.deploymentClient.Delete(ctx, d.deployment.Name, metav1.DeleteOptions{}); err != nil {
if err := d.statefulSetClient.Delete(ctx, d.statefulSet.Name, metav1.DeleteOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error while calling deploymentClient.Delete for %q", d.deployment.Name)
return errors.Wrapf(err, "error while calling statefulSetClient.Delete for %q", d.statefulSet.Name)
}
}
for _, cfg := range d.configMaps {
Expand Down
79 changes: 42 additions & 37 deletions driver/kubernetes/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
Expand Down Expand Up @@ -104,7 +105,7 @@ func (f *factory) New(ctx context.Context, cfg driver.InitConfig) (driver.Driver
}
}

deploymentName, err := buildxNameToDeploymentName(cfg.Name)
statefulSetName, err := buildxNameToStatefulSetName(cfg.Name)
if err != nil {
return nil, err
}
Expand All @@ -128,44 +129,44 @@ func (f *factory) New(ctx context.Context, cfg driver.InitConfig) (driver.Driver
clientset: clientset,
}

deploymentOpt, loadbalance, namespace, defaultLoad, timeout, err := f.processDriverOpts(deploymentName, namespace, cfg)
statefulSetOpt, loadbalance, namespace, defaultLoad, timeout, err := f.processDriverOpts(statefulSetName, namespace, cfg)
if nil != err {
return nil, err
}

d.defaultLoad = defaultLoad
d.timeout = timeout

d.deployment, d.configMaps, err = manifest.NewDeployment(deploymentOpt)
d.statefulSet, d.configMaps, err = manifest.NewStatefulSet(statefulSetOpt)
if err != nil {
return nil, err
}

d.minReplicas = deploymentOpt.Replicas
d.minReplicas = statefulSetOpt.Replicas

d.deploymentClient = clientset.AppsV1().Deployments(namespace)
d.statefulSetClient = clientset.AppsV1().StatefulSets(namespace)
d.podClient = clientset.CoreV1().Pods(namespace)
d.configMapClient = clientset.CoreV1().ConfigMaps(namespace)

switch loadbalance {
case LoadbalanceSticky:
d.podChooser = &podchooser.StickyPodChooser{
Key: cfg.ContextPathHash,
PodClient: d.podClient,
Deployment: d.deployment,
Key: cfg.ContextPathHash,
PodClient: d.podClient,
StatefulSet: d.statefulSet,
}
case LoadbalanceRandom:
d.podChooser = &podchooser.RandomPodChooser{
PodClient: d.podClient,
Deployment: d.deployment,
PodClient: d.podClient,
StatefulSet: d.statefulSet,
}
}
return d, nil
}

func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg driver.InitConfig) (*manifest.DeploymentOpt, string, string, bool, time.Duration, error) {
deploymentOpt := &manifest.DeploymentOpt{
Name: deploymentName,
func (f *factory) processDriverOpts(statefulSetName string, namespace string, cfg driver.InitConfig) (*manifest.StatefulSetOpt, string, string, bool, time.Duration, error) {
statefulSetOpt := &manifest.StatefulSetOpt{
Name: statefulSetName,
Image: bkimage.DefaultImage,
Replicas: 1,
BuildkitFlags: cfg.BuildkitdFlags,
Expand All @@ -177,7 +178,7 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
defaultLoad := false
timeout := defaultTimeout

deploymentOpt.Qemu.Image = bkimage.QemuImage
statefulSetOpt.Qemu.Image = bkimage.QemuImage

loadbalance := LoadbalanceSticky
var err error
Expand All @@ -186,57 +187,61 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
switch k {
case "image":
if v != "" {
deploymentOpt.Image = v
statefulSetOpt.Image = v
}
case "namespace":
namespace = v
case "replicas":
deploymentOpt.Replicas, err = strconv.Atoi(v)
statefulSetOpt.Replicas, err = strconv.Atoi(v)
if err != nil {
return nil, "", "", false, 0, err
}
case "requests.cpu":
deploymentOpt.RequestsCPU = v
statefulSetOpt.RequestsCPU = v
case "requests.memory":
deploymentOpt.RequestsMemory = v
case "requests.ephemeral-storage":
deploymentOpt.RequestsEphemeralStorage = v
statefulSetOpt.RequestsMemory = v
case "requests.persistent-storage":
reqPersistentStorage, err := resource.ParseQuantity(v)
if err != nil {
return nil, "", "", false, 0, err
}
statefulSetOpt.RequestsPersistentStorage = reqPersistentStorage
case "limits.cpu":
deploymentOpt.LimitsCPU = v
statefulSetOpt.LimitsCPU = v
case "limits.memory":
deploymentOpt.LimitsMemory = v
case "limits.ephemeral-storage":
deploymentOpt.LimitsEphemeralStorage = v
statefulSetOpt.LimitsMemory = v
case "limits.persistent-storage":
statefulSetOpt.LimitsPersistentStorage = v
case "rootless":
deploymentOpt.Rootless, err = strconv.ParseBool(v)
statefulSetOpt.Rootless, err = strconv.ParseBool(v)
if err != nil {
return nil, "", "", false, 0, err
}
if _, isImage := cfg.DriverOpts["image"]; !isImage {
deploymentOpt.Image = bkimage.DefaultRootlessImage
statefulSetOpt.Image = bkimage.DefaultRootlessImage
}
case "schedulername":
deploymentOpt.SchedulerName = v
statefulSetOpt.SchedulerName = v
case "serviceaccount":
deploymentOpt.ServiceAccountName = v
statefulSetOpt.ServiceAccountName = v
case "nodeselector":
deploymentOpt.NodeSelector, err = splitMultiValues(v, ",", "=")
statefulSetOpt.NodeSelector, err = splitMultiValues(v, ",", "=")
if err != nil {
return nil, "", "", false, 0, errors.Wrap(err, "cannot parse node selector")
}
case "annotations":
deploymentOpt.CustomAnnotations, err = splitMultiValues(v, ",", "=")
statefulSetOpt.CustomAnnotations, err = splitMultiValues(v, ",", "=")
if err != nil {
return nil, "", "", false, 0, errors.Wrap(err, "cannot parse annotations")
}
case "labels":
deploymentOpt.CustomLabels, err = splitMultiValues(v, ",", "=")
statefulSetOpt.CustomLabels, err = splitMultiValues(v, ",", "=")
if err != nil {
return nil, "", "", false, 0, errors.Wrap(err, "cannot parse labels")
}
case "tolerations":
ts := strings.Split(v, ";")
deploymentOpt.Tolerations = []corev1.Toleration{}
statefulSetOpt.Tolerations = []corev1.Toleration{}
for i := range ts {
kvs := strings.Split(ts[i], ",")

Expand Down Expand Up @@ -267,7 +272,7 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
}
}

deploymentOpt.Tolerations = append(deploymentOpt.Tolerations, t)
statefulSetOpt.Tolerations = append(statefulSetOpt.Tolerations, t)
}
case "loadbalance":
switch v {
Expand All @@ -278,13 +283,13 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
}
loadbalance = v
case "qemu.install":
deploymentOpt.Qemu.Install, err = strconv.ParseBool(v)
statefulSetOpt.Qemu.Install, err = strconv.ParseBool(v)
if err != nil {
return nil, "", "", false, 0, err
}
case "qemu.image":
if v != "" {
deploymentOpt.Qemu.Image = v
statefulSetOpt.Qemu.Image = v
}
case "default-load":
defaultLoad, err = strconv.ParseBool(v)
Expand All @@ -301,7 +306,7 @@ func (f *factory) processDriverOpts(deploymentName string, namespace string, cfg
}
}

return deploymentOpt, loadbalance, namespace, defaultLoad, timeout, nil
return statefulSetOpt, loadbalance, namespace, defaultLoad, timeout, nil
}

func splitMultiValues(in string, itemsep string, kvsep string) (map[string]string, error) {
Expand All @@ -324,7 +329,7 @@ func (f *factory) AllowsInstances() bool {
// buildxNameToDeploymentName converts buildx name to Kubernetes Deployment name.
//
// eg. "buildx_buildkit_loving_mendeleev0" -> "loving-mendeleev0"
func buildxNameToDeploymentName(bx string) (string, error) {
func buildxNameToStatefulSetName(bx string) (string, error) {
// TODO: commands.util.go should not pass "buildx_buildkit_" prefix to drivers
s, err := driver.ParseBuilderName(bx)
if err != nil {
Expand Down
Loading
Loading