Skip to content

fix: Wait for context cancel in k8s pod watcher #6643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions pkg/skaffold/kubernetes/debugging/container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,23 @@ func (d *ContainerManager) Start(ctx context.Context) error {
}

d.podWatcher.Register(d.events)
stopWatcher, err := d.podWatcher.Start(d.kubeContext, *d.namespaces)
stopWatcher, err := d.podWatcher.Start(ctx, d.kubeContext, *d.namespaces)
if err != nil {
return err
}
d.stopWatcher = stopWatcher

go func() {
defer stopWatcher()

l := log.Entry(ctx)
defer l.Tracef("containerManager: cease waiting for pod events")
l.Tracef("containerManager: waiting for pod events")
for {
select {
case <-ctx.Done():
return
l.Tracef("containerManager: context canceled, ignoring")
case evt, ok := <-d.events:
if !ok {
l.Tracef("containerManager: channel closed, returning")
return
}

Expand All @@ -98,7 +100,8 @@ func (d *ContainerManager) Stop() {
if d == nil {
return
}
d.stopWatcher()
d.podWatcher.Deregister(d.events)
close(d.events) // the receiver shouldn't really be the one to close the channel
}

func (d *ContainerManager) Name() string {
Expand Down
15 changes: 9 additions & 6 deletions pkg/skaffold/kubernetes/logger/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,23 @@ func (a *LogAggregator) Start(ctx context.Context, out io.Writer) error {
a.output = out

a.podWatcher.Register(a.events)
stopWatcher, err := a.podWatcher.Start(a.kubectlcli.KubeContext, *a.namespaces)
a.stopWatcher = stopWatcher
stopWatcher, err := a.podWatcher.Start(ctx, a.kubectlcli.KubeContext, *a.namespaces)
if err != nil {
return err
}

go func() {
defer stopWatcher()

l := olog.Entry(ctx)
defer l.Tracef("logAggregator: cease waiting for pod events")
l.Tracef("logAggregator: waiting for pod events")
for {
select {
case <-ctx.Done():
return
l.Tracef("logAggregator: context canceled, ignoring")
case evt, ok := <-a.events:
if !ok {
l.Tracef("logAggregator: channel closed, returning")
return
}

Expand Down Expand Up @@ -164,13 +166,14 @@ func (a *LogAggregator) Start(ctx context.Context, out io.Writer) error {

// Stop stops the logger.
func (a *LogAggregator) Stop() {
l := olog.Entry(context.Background())
if a == nil {
// Logs are not activated.
return
}
a.stopWatcher()
a.podWatcher.Deregister(a.events)
close(a.events)
l.Tracef("logAggregator: Stop() close channel")
close(a.events) // the receiver shouldn't really be the one to close the channel
}

func sinceSeconds(d time.Duration) int64 {
Expand Down
11 changes: 8 additions & 3 deletions pkg/skaffold/kubernetes/portforward/pod_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,23 @@ func NewWatchingPodForwarder(entryManager *EntryManager, kubeContext string, pod
func (p *WatchingPodForwarder) Start(ctx context.Context, out io.Writer, namespaces []string) error {
p.podWatcher.Register(p.events)
p.output = out
stopWatcher, err := p.podWatcher.Start(p.kubeContext, namespaces)
stopWatcher, err := p.podWatcher.Start(ctx, p.kubeContext, namespaces)
if err != nil {
return err
}

go func() {
defer stopWatcher()

l := log.Entry(ctx)
defer l.Tracef("podForwarder: cease waiting for pod events")
l.Tracef("podForwarder: waiting for pod events")
for {
select {
case <-ctx.Done():
return
l.Tracef("podForwarder: context canceled, ignoring")
case evt, ok := <-p.events:
if !ok {
l.Tracef("podForwarder: channel closed, returning")
return
}

Expand All @@ -103,6 +106,8 @@ func (p *WatchingPodForwarder) Start(ctx context.Context, out io.Writer, namespa

func (p *WatchingPodForwarder) Stop() {
p.entryManager.Stop()
p.podWatcher.Deregister(p.events)
close(p.events) // the receiver shouldn't really be the one to close the channel
}

func (p *WatchingPodForwarder) portForwardPod(ctx context.Context, pod *v1.Pod) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ func (f *fakePodWatcher) Register(receiver chan<- kubernetes.PodEvent) {

func (f *fakePodWatcher) Deregister(_ chan<- kubernetes.PodEvent) {} // noop

func (f *fakePodWatcher) Start(kubeContext string, namespaces []string) (func(), error) {
func (f *fakePodWatcher) Start(_ context.Context, _ string, _ []string) (func(), error) {
go func() {
for _, event := range f.events {
f.receiver <- event
Expand Down
119 changes: 67 additions & 52 deletions pkg/skaffold/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ import (
type PodWatcher interface {
Register(receiver chan<- PodEvent)
Deregister(receiver chan<- PodEvent)
Start(kubeContext string, ns []string) (func(), error)
Start(ctx context.Context, kubeContext string, ns []string) (func(), error)
}

// podWatcher is a pod watcher for multiple namespaces.
type podWatcher struct {
podSelector PodSelector
receivers map[chan<- PodEvent]bool
receiverLock sync.Mutex
receiverLock sync.RWMutex
}

type PodEvent struct {
Expand All @@ -67,7 +67,7 @@ func (w *podWatcher) Deregister(receiver chan<- PodEvent) {
w.receiverLock.Unlock()
}

func (w *podWatcher) Start(kubeContext string, namespaces []string) (func(), error) {
func (w *podWatcher) Start(ctx context.Context, kubeContext string, namespaces []string) (func(), error) {
if len(w.receivers) == 0 {
return func() {}, errors.New("no receiver was registered")
}
Expand Down Expand Up @@ -97,64 +97,79 @@ func (w *podWatcher) Start(kubeContext string, namespaces []string) (func(), err

watchers = append(watchers, watcher)
go func() {
l := log.Entry(context.TODO())
for evt := range watcher.ResultChan() {
// If the event's type is "ERROR", log and continue.
if evt.Type == watch.Error {
// These errors sem to arise from the watch stream being closed from a ^C.
// evt.Object seems likely to be a https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Status
// Status{
// Status:Failure,
// Code:500,
// Reason:InternalError,
// Message:an error on the server ("unable to decode an event from the watch stream: http2: response body closed") has prevented the request from succeeding,
// Details:&StatusDetails{
// Causes:[]StatusCause{
// {Type:UnexpectedServerResponse,Message:unable to decode an event from the watch stream: http2: response body closed},
// {Type:ClientWatchDecoding,Message:unable to decode an event from the watch stream: http2: response body closed}},
// RetryAfterSeconds:0}}
l.Debugf("podWatcher: got unexpected event of type %s: %v", evt.Type, evt.Object)
continue
}

// Grab the pod from the event.
pod, ok := evt.Object.(*v1.Pod)
if !ok {
continue
}
l := log.Entry(ctx)
defer l.Tracef("podWatcher: cease waiting")
l.Tracef("podWatcher: waiting")
for {
select {
case <-ctx.Done():
l.Tracef("podWatcher: context canceled, returning")
return
case evt, ok := <-watcher.ResultChan():
if !ok {
l.Tracef("podWatcher: channel closed, returning")
return
}
// If the event's type is "ERROR", log and continue.
if evt.Type == watch.Error {
// These errors sem to arise from the watch stream being closed from a ^C.
// evt.Object seems likely to be a https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Status
// Status{
// Status:Failure,
// Code:500,
// Reason:InternalError,
// Message:an error on the server ("unable to decode an event from the watch stream: http2: response body closed") has prevented the request from succeeding,
// Details:&StatusDetails{
// Causes:[]StatusCause{
// {Type:UnexpectedServerResponse,Message:unable to decode an event from the watch stream: http2: response body closed},
// {Type:ClientWatchDecoding,Message:unable to decode an event from the watch stream: http2: response body closed}},
// RetryAfterSeconds:0}}
l.Debugf("podWatcher: got unexpected event of type %s: %v", evt.Type, evt.Object)
continue
}

if !w.podSelector.Select(pod) {
continue
}
// Grab the pod from the event.
pod, ok := evt.Object.(*v1.Pod)
if !ok {
continue
}

if log.IsTraceLevelEnabled() {
st := fmt.Sprintf("podWatcher[%s/%s:%v] phase:%v ", pod.Namespace, pod.Name, evt.Type, pod.Status.Phase)
if len(pod.Status.Reason) > 0 {
st += fmt.Sprintf("reason:%s ", pod.Status.Reason)
if !w.podSelector.Select(pod) {
continue
}
for _, c := range append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...) {
switch {
case c.State.Waiting != nil:
st += fmt.Sprintf("%s<waiting> ", c.Name)
case c.State.Running != nil:
st += fmt.Sprintf("%s<running> ", c.Name)
case c.State.Terminated != nil:
st += fmt.Sprintf("%s<terminated> ", c.Name)

if log.IsTraceLevelEnabled() {
st := fmt.Sprintf("podWatcher[%s/%s:%v] phase:%v ", pod.Namespace, pod.Name, evt.Type, pod.Status.Phase)
if len(pod.Status.Reason) > 0 {
st += fmt.Sprintf("reason:%s ", pod.Status.Reason)
}
for _, c := range append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...) {
switch {
case c.State.Waiting != nil:
st += fmt.Sprintf("%s<waiting> ", c.Name)
case c.State.Running != nil:
st += fmt.Sprintf("%s<running> ", c.Name)
case c.State.Terminated != nil:
st += fmt.Sprintf("%s<terminated> ", c.Name)
}
}
l.Trace(st)
}
l.Trace(st)
}

w.receiverLock.Lock()
for receiver, open := range w.receivers {
if open {
receiver <- PodEvent{
Type: evt.Type,
Pod: pod,
l.Tracef("podWatcher: sending to all receivers")
w.receiverLock.RLock()
for receiver, open := range w.receivers {
if open {
l.Tracef("podWatcher: sending event type %v pod name %v namespace %v", evt.Type, pod.GetName(), pod.GetNamespace())
receiver <- PodEvent{
Type: evt.Type,
Pod: pod,
}
}
}
w.receiverLock.RUnlock()
l.Tracef("podWatcher: done sending to all receivers")
}
w.receiverLock.Unlock()
}
}()
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/skaffold/kubernetes/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (h *hasName) Select(pod *v1.Pod) bool {
func TestPodWatcher(t *testing.T) {
testutil.Run(t, "need to register first", func(t *testutil.T) {
watcher := NewPodWatcher(&anyPod{})
cleanup, err := watcher.Start("", []string{"ns"})
cleanup, err := watcher.Start(context.Background(), "", []string{"ns"})
defer cleanup()

t.CheckErrorContains("no receiver was registered", err)
Expand All @@ -72,7 +72,7 @@ func TestPodWatcher(t *testing.T) {

watcher := NewPodWatcher(&anyPod{})
watcher.Register(make(chan PodEvent))
cleanup, err := watcher.Start("", []string{"ns"})
cleanup, err := watcher.Start(context.Background(), "", []string{"ns"})
defer cleanup()

t.CheckErrorContains("unable to get client", err)
Expand All @@ -88,7 +88,7 @@ func TestPodWatcher(t *testing.T) {

watcher := NewPodWatcher(&anyPod{})
watcher.Register(make(chan PodEvent))
cleanup, err := watcher.Start("", []string{"ns"})
cleanup, err := watcher.Start(context.Background(), "", []string{"ns"})
defer cleanup()

t.CheckErrorContains("unable to watch", err)
Expand All @@ -104,7 +104,7 @@ func TestPodWatcher(t *testing.T) {
events := make(chan PodEvent)
watcher := NewPodWatcher(podSelector)
watcher.Register(events)
cleanup, err := watcher.Start("", []string{"ns1", "ns2"})
cleanup, err := watcher.Start(context.Background(), "", []string{"ns1", "ns2"})
defer cleanup()
t.CheckNoError(err)

Expand Down