Skip to content

Initial prototype for pod health check hook up #4223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 5, 2020
116 changes: 89 additions & 27 deletions pkg/skaffold/deploy/resource/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,24 @@ import (
"strings"
"time"

"github.com/sirupsen/logrus"

"github.com/GoogleContainerTools/skaffold/pkg/diag"
"github.com/GoogleContainerTools/skaffold/pkg/diag/validator"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubectl"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/runcontext"
"github.com/GoogleContainerTools/skaffold/proto"
)

const (
deploymentType = "deployment"
rollOutSuccess = "successfully rolled out"
connectionErrMsg = "Unable to connect to the server"
killedErrMsg = "signal: killed"
deploymentType = "deployment"
rollOutSuccess = "successfully rolled out"
connectionErrMsg = "Unable to connect to the server"
killedErrMsg = "signal: killed"
defaultPodCheckDeadline = 30 * time.Second
tabHeader = " -"
tab = " "
)

var (
Expand All @@ -40,38 +49,53 @@ var (
)

type Deployment struct {
name string
namespace string
rType string
status Status
done bool
deadline time.Duration
name string
namespace string
rType string
status Status
done bool
deadline time.Duration
pods map[string]validator.Resource
podValidator diag.Diagnose
}

func (d *Deployment) Deadline() time.Duration {
return d.deadline
}

func (d *Deployment) UpdateStatus(details string, err error) {
updated := newStatus(details, err)
if !d.status.Equal(updated) {
d.status = updated
if strings.Contains(details, rollOutSuccess) || isErrAndNotRetryAble(err) {
d.done = true
}
errCode := proto.StatusCode_STATUSCHECK_SUCCESS
if err != nil {
errCode = proto.StatusCode_STATUSCHECK_UNKNOWN
}
updated := newStatus(details, errCode, err)
if d.status.Equal(updated) {
d.status.changed = false
return
}
d.status.changed = true
d.status = updated
if strings.Contains(details, rollOutSuccess) || isErrAndNotRetryAble(err) {
d.done = true
}
}

func NewDeployment(name string, ns string, deadline time.Duration) *Deployment {
return &Deployment{
name: name,
namespace: ns,
rType: deploymentType,
status: newStatus("", nil),
deadline: deadline,
name: name,
namespace: ns,
rType: deploymentType,
status: newStatus("", proto.StatusCode_STATUSCHECK_UNKNOWN, nil),
deadline: deadline,
podValidator: diag.New(nil),
}
}

func (d *Deployment) WithValidator(pd diag.Diagnose) *Deployment {
d.podValidator = pd
return d
}

func (d *Deployment) CheckStatus(ctx context.Context, runCtx *runcontext.RunContext) {
kubeCtl := kubectl.NewFromRunContext(runCtx)

Expand All @@ -81,20 +105,20 @@ func (d *Deployment) CheckStatus(ctx context.Context, runCtx *runcontext.RunCont
}

details := d.cleanupStatus(string(b))

err = parseKubectlRolloutError(err)
if err == errKubectlKilled {
err = fmt.Errorf("received Ctrl-C or deployments could not stabilize within %v: %w", d.deadline, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to be able to separate these conditions.

}

d.UpdateStatus(details, err)
if err := d.fetchPods(ctx); err != nil {
logrus.Debugf("pod statuses could be fetched this time due to %s", err)
}
}

func (d *Deployment) String() string {
if d.namespace == "default" {
return fmt.Sprintf("%s/%s", d.rType, d.name)
}

return fmt.Sprintf("%s:%s/%s", d.namespace, d.rType, d.name)
}

Expand All @@ -110,12 +134,25 @@ func (d *Deployment) IsStatusCheckComplete() bool {
return d.done
}

// This returns a string representing deployment status along with tab header
// e.g.
// - testNs:deployment/leeroy-app: waiting for rollout to complete. (1/2) pending
// - testNs:pod/leeroy-app-xvbg : error pulling container image
func (d *Deployment) ReportSinceLastUpdated() string {
if d.status.reported {
if !d.status.changed {
return ""
}
d.status.reported = true
return fmt.Sprintf("%s: %s", d, d.status)
if d.status.String() == "" {
return ""
}
var result strings.Builder
result.WriteString(fmt.Sprintf("%s %s: %s", tabHeader, d, d.status))
for _, p := range d.pods {
if p.Error() != nil {
result.WriteString(fmt.Sprintf("%s %s %s: %s\n", tab, tabHeader, p, p.Error()))
}
}
return result.String()
}

func (d *Deployment) cleanupStatus(msg string) string {
Expand Down Expand Up @@ -145,3 +182,28 @@ func isErrAndNotRetryAble(err error) bool {
}
return err != ErrKubectlConnection
}

func (d *Deployment) fetchPods(ctx context.Context) error {
timeoutContext, cancel := context.WithTimeout(ctx, defaultPodCheckDeadline)
defer cancel()
pods, err := d.podValidator.Run(timeoutContext)
if err != nil {
return err
}

newPods := map[string]validator.Resource{}
d.status.changed = false
for _, p := range pods {
originalPod, ok := d.pods[p.String()]
if !ok {
d.status.changed = true
event.ResourceStatusCheckEventCompleted(p.String(), p.Error())
} else if originalPod.StatusCode != p.StatusCode {
d.status.changed = true
event.ResourceStatusCheckEventCompleted(p.String(), p.Error())
}
newPods[p.String()] = p
}
d.pods = newPods
return nil
}
11 changes: 6 additions & 5 deletions pkg/skaffold/deploy/resource/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ func TestReportSinceLastUpdated(t *testing.T) {
description: "updating an error status",
message: "cannot pull image",
err: errors.New("cannot pull image"),
expected: "test-ns:deployment/test: cannot pull image",
expected: " - test-ns:deployment/test: cannot pull image",
},
{
description: "updating a non error status",
message: "waiting for container",
expected: "test-ns:deployment/test: waiting for container",
expected: " - test-ns:deployment/test: waiting for container",
},
}
for _, test := range tests {
Expand All @@ -206,7 +206,7 @@ func TestReportSinceLastUpdated(t *testing.T) {
dep.UpdateStatus(test.message, test.err)

t.CheckDeepEqual(test.expected, dep.ReportSinceLastUpdated())
t.CheckTrue(dep.status.reported)
t.CheckTrue(dep.status.changed)
})
}
}
Expand All @@ -220,7 +220,7 @@ func TestReportSinceLastUpdatedMultipleTimes(t *testing.T) {
{
description: "report first time should return status",
times: 1,
expected: "test-ns:deployment/test: cannot pull image",
expected: " - test-ns:deployment/test: cannot pull image",
},
{
description: "report 2nd time should not return",
Expand All @@ -231,9 +231,10 @@ func TestReportSinceLastUpdatedMultipleTimes(t *testing.T) {
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
dep := NewDeployment("test", "test-ns", 1)
dep.UpdateStatus("cannot pull image", nil)
var actual string
for i := 0; i < test.times; i++ {
// update to same status
dep.UpdateStatus("cannot pull image", nil)
actual = dep.ReportSinceLastUpdated()
}
t.CheckDeepEqual(test.expected, actual)
Expand Down
16 changes: 12 additions & 4 deletions pkg/skaffold/deploy/resource/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,22 @@ limitations under the License.

package resource

import "github.com/GoogleContainerTools/skaffold/proto"

type Status struct {
err error
details string
reported bool
err error
details string
errCode proto.StatusCode
changed bool
}

func (rs Status) Error() error {
return rs.err
}

func (rs Status) ErrorCode() proto.StatusCode {
return rs.errCode
}
func (rs Status) String() string {
if rs.err != nil {
return rs.err.Error()
Expand All @@ -43,9 +49,11 @@ func (rs Status) Equal(other Status) bool {
return rs.err == other.err
}

func newStatus(msg string, err error) Status {
func newStatus(msg string, errCode proto.StatusCode, err error) Status {
return Status{
details: msg,
err: err,
errCode: errCode,
changed: true,
}
}
2 changes: 1 addition & 1 deletion pkg/skaffold/deploy/resource/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestString(t *testing.T) {
}
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
status := newStatus(test.details, test.err)
status := newStatus(test.details, 0, test.err)
t.CheckDeepEqual(test.expected, status.String())
})
}
Expand Down
27 changes: 19 additions & 8 deletions pkg/skaffold/deploy/status_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/GoogleContainerTools/skaffold/pkg/diag"
"github.com/GoogleContainerTools/skaffold/pkg/diag/validator"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/deploy/resource"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event"
pkgkubernetes "github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes"
Expand Down Expand Up @@ -70,11 +72,11 @@ func statusCheck(ctx context.Context, defaultLabeller *DefaultLabeller, runCtx *
return fmt.Errorf("getting Kubernetes client: %w", err)
}

deployments, err := getDeployments(client, runCtx.Opts.Namespace, defaultLabeller,
getDeadline(runCtx.Cfg.Deploy.StatusCheckDeadlineSeconds))
deployments, err := getDeployments(client, runCtx.Opts.Namespace, defaultLabeller, getDeadline(runCtx.Cfg.Deploy.StatusCheckDeadlineSeconds))
if err != nil {
return fmt.Errorf("could not fetch deployments: %w", err)
}

deadline := statusCheckMaxDeadline(runCtx.Cfg.Deploy.StatusCheckDeadlineSeconds, deployments)

var wg sync.WaitGroup
Expand All @@ -85,18 +87,19 @@ func statusCheck(ctx context.Context, defaultLabeller *DefaultLabeller, runCtx *
wg.Add(1)
go func(r *resource.Deployment) {
defer wg.Done()
// keep updating the resource status until it fails/succeeds/times out
pollDeploymentStatus(ctx, runCtx, r)
rcCopy := c.markProcessed(r.Status().Error())
printStatusCheckSummary(out, r, rcCopy)
}(d)
}

// Retrieve pending resource states
// Retrieve pending deployments statuses
go func() {
printDeploymentStatus(ctx, out, deployments, deadline)
}()

// Wait for all deployment status to be fetched
// Wait for all deployment statuses to be fetched
wg.Wait()
return getSkaffoldDeployStatus(c)
}
Expand All @@ -109,15 +112,23 @@ func getDeployments(client kubernetes.Interface, ns string, l *DefaultLabeller,
return nil, fmt.Errorf("could not fetch deployments: %w", err)
}

deployments := make([]*resource.Deployment, 0, len(deps.Items))
for _, d := range deps.Items {
deployments := make([]*resource.Deployment, len(deps.Items))
for i, d := range deps.Items {
var deadline time.Duration
if d.Spec.ProgressDeadlineSeconds == nil || *d.Spec.ProgressDeadlineSeconds == kubernetesMaxDeadline {
deadline = deadlineDuration
} else {
deadline = time.Duration(*d.Spec.ProgressDeadlineSeconds) * time.Second
}
deployments = append(deployments, resource.NewDeployment(d.Name, d.Namespace, deadline))
pd := diag.New([]string{d.Namespace}).
WithLabel(RunIDLabel, l.Labels()[RunIDLabel]).
WithValidators([]validator.Validator{validator.NewPodValidator(client)})

for k, v := range d.Spec.Template.Labels {
pd = pd.WithLabel(k, v)
}

deployments[i] = resource.NewDeployment(d.Name, d.Namespace, deadline).WithValidator(pd)
}

return deployments, nil
Expand Down Expand Up @@ -208,7 +219,7 @@ func printStatus(deployments []*resource.Deployment, out io.Writer) bool {
allDone = false
if str := r.ReportSinceLastUpdated(); str != "" {
event.ResourceStatusCheckEventUpdated(r.String(), str)
fmt.Fprintln(out, tabHeader, trimNewLine(str))
fmt.Fprintln(out, trimNewLine(str))
}
}
return allDone
Expand Down
5 changes: 4 additions & 1 deletion pkg/skaffold/deploy/status_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
utilpointer "k8s.io/utils/pointer"

"github.com/GoogleContainerTools/skaffold/pkg/diag"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/config"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/deploy/resource"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event"
Expand Down Expand Up @@ -214,7 +216,8 @@ func TestGetDeployments(t *testing.T) {
client := fakekubeclientset.NewSimpleClientset(objs...)
actual, err := getDeployments(client, "test", labeller, 200*time.Second)
t.CheckErrorAndDeepEqual(test.shouldErr, err, &test.expected, &actual,
cmp.AllowUnexported(resource.Deployment{}, resource.Status{}))
cmp.AllowUnexported(resource.Deployment{}, resource.Status{}),
cmpopts.IgnoreInterfaces(struct{ diag.Diagnose }{}))
})
}
}
Expand Down