Skip to content

Commit d658049

Browse files
authored
Ensure output stream is set before port forwarding (#6061)
* Ensure output stream is set before port forwarding * fix test
1 parent 35400ad commit d658049

File tree

7 files changed

+58
-8
lines changed

7 files changed

+58
-8
lines changed

integration/dev_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ func getLocalPortFromPortForwardEvent(t *testing.T, entries chan *proto.LogEntry
306306
case e := <-entries:
307307
switch e.Event.GetEventType().(type) {
308308
case *proto.Event_PortEvent:
309-
t.Logf("event received %v", e)
309+
t.Logf("port event received: %v", e)
310310
if e.Event.GetPortEvent().ResourceName == resourceName &&
311311
e.Event.GetPortEvent().ResourceType == resourceType &&
312312
e.Event.GetPortEvent().Namespace == namespace {
@@ -316,7 +316,7 @@ func getLocalPortFromPortForwardEvent(t *testing.T, entries chan *proto.LogEntry
316316
return address, int(port)
317317
}
318318
default:
319-
t.Logf("event received %v", e)
319+
t.Logf("event received: %v", e)
320320
}
321321
}
322322
}

pkg/skaffold/kubernetes/portforward/entry_manager.go

+5
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,11 @@ func (b *EntryManager) forwardPortForwardEntry(ctx context.Context, out io.Write
137137
portForwardEventV2(entry)
138138
}
139139

140+
// Start ensures the underlying entryForwarder is ready to forward.
141+
func (b *EntryManager) Start(out io.Writer) {
142+
b.entryForwarder.Start(out)
143+
}
144+
140145
// Stop terminates all kubectl port-forward commands.
141146
func (b *EntryManager) Stop() {
142147
for _, pfe := range b.forwardedResources.resources {

pkg/skaffold/kubernetes/portforward/forwarder_manager.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ type Forwarder interface {
4848

4949
// ForwarderManager manages all forwarders
5050
type ForwarderManager struct {
51-
forwarders []Forwarder
51+
forwarders []Forwarder
52+
entryManager *EntryManager
5253
}
5354

5455
// NewForwarderManager returns a new port manager which handles starting and stopping port forwarding
@@ -73,7 +74,8 @@ func NewForwarderManager(cli *kubectl.CLI, podSelector kubernetes.PodSelector, l
7374
}
7475

7576
return &ForwarderManager{
76-
forwarders: forwarders,
77+
forwarders: forwarders,
78+
entryManager: entryManager,
7779
}
7880
}
7981

@@ -120,6 +122,7 @@ func (p *ForwarderManager) Start(ctx context.Context, out io.Writer, namespaces
120122
ctx, endTrace := instrumentation.StartTrace(ctx, "Start")
121123
defer endTrace()
122124

125+
p.entryManager.Start(out)
123126
for _, f := range p.forwarders {
124127
if err := f.Start(ctx, out, namespaces); err != nil {
125128
eventV2.TaskFailed(constants.PortForward, err)

pkg/skaffold/kubernetes/portforward/kubectl_forwarder.go

+12
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"os"
2626
"sort"
2727
"strings"
28+
"sync/atomic"
2829
"time"
2930

3031
"github.com/sirupsen/logrus"
@@ -41,11 +42,13 @@ import (
4142
)
4243

4344
type EntryForwarder interface {
45+
Start(io.Writer)
4446
Forward(parentCtx context.Context, pfe *portForwardEntry) error
4547
Terminate(p *portForwardEntry)
4648
}
4749

4850
type KubectlForwarder struct {
51+
started int32
4952
out io.Writer
5053
kubectl *kubectl.CLI
5154
}
@@ -66,6 +69,11 @@ var (
6669
waitErrorLogs = 1 * time.Second
6770
)
6871

72+
func (k *KubectlForwarder) Start(out io.Writer) {
73+
atomic.StoreInt32(&k.started, 1)
74+
k.out = out
75+
}
76+
6977
// Forward port-forwards a pod using kubectl port-forward in the background
7078
// It kills the command on errors in the kubectl port-forward log
7179
// It restarts the command if it was not cancelled by skaffold
@@ -77,6 +85,10 @@ func (k *KubectlForwarder) Forward(parentCtx context.Context, pfe *portForwardEn
7785
}
7886

7987
func (k *KubectlForwarder) forward(parentCtx context.Context, pfe *portForwardEntry, errChan chan error) {
88+
if atomic.LoadInt32(&k.started) == 0 {
89+
errChan <- fmt.Errorf("Forward() called before kubectl forwarder was started")
90+
return
91+
}
8092
var notifiedUser bool
8193
defer deferFunc()
8294

pkg/skaffold/kubernetes/portforward/kubectl_forwarder_test.go

+30
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"bytes"
2121
"context"
2222
"errors"
23+
"io/ioutil"
2324
"runtime"
2425
"sort"
2526
"strings"
@@ -69,6 +70,7 @@ func TestUnavailablePort(t *testing.T) {
6970
}
7071
pfe := newPortForwardEntry(0, latestV1.PortForwardResource{}, "", "", "", "", 8080, false)
7172

73+
k.Start(&buf)
7274
go k.Forward(context.Background(), pfe)
7375

7476
// wait for isPortFree to be called
@@ -451,3 +453,31 @@ func mockPod(name string, ports []corev1.ContainerPort, creationTime time.Time)
451453
},
452454
}
453455
}
456+
457+
func TestStartAndForward(t *testing.T) {
458+
tests := []struct {
459+
description string
460+
startFirst bool
461+
}{
462+
{
463+
description: "Forward() before Start() errors",
464+
startFirst: false,
465+
}, {
466+
description: "Start() before Forward()",
467+
startFirst: true,
468+
},
469+
}
470+
471+
for _, test := range tests {
472+
testutil.Run(t, test.description, func(_ *testutil.T) {
473+
k := &KubectlForwarder{}
474+
if test.startFirst {
475+
k.Start(ioutil.Discard)
476+
testutil.CheckDeepEqual(t, k.started, int32(1))
477+
} else {
478+
err := k.Forward(context.Background(), nil)
479+
testutil.CheckError(t, true, err)
480+
}
481+
})
482+
}
483+
}

pkg/skaffold/kubernetes/portforward/resource_forwarder_test.go

+3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package portforward
1919
import (
2020
"context"
2121
"fmt"
22+
"io"
2223
"io/ioutil"
2324
"sync"
2425
"testing"
@@ -60,6 +61,8 @@ func (f *testForwarder) Terminate(pfe *portForwardEntry) {
6061
f.forwardedPorts.Delete(pfe.localPort)
6162
}
6263

64+
func (f *testForwarder) Start(io.Writer) {}
65+
6366
func newTestForwarder() *testForwarder {
6467
return &testForwarder{}
6568
}

pkg/skaffold/runner/v1/dev.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -336,11 +336,8 @@ func (r *SkaffoldRunner) Dev(ctx context.Context, out io.Writer, artifacts []*la
336336

337337
defer r.deployer.GetAccessor().Stop()
338338

339-
// forwarderManager := r.createForwarder(out)
340-
// defer forwarderManager.Stop()
341-
342339
if err := r.deployer.GetAccessor().Start(ctx, out, r.runCtx.GetNamespaces()); err != nil {
343-
logrus.Warnln("Error starting port forwarding:", err)
340+
logrus.Warnln("Error starting resource accessor:", err)
344341
}
345342
if err := r.deployer.GetDebugger().Start(ctx, r.runCtx.GetNamespaces()); err != nil {
346343
logrus.Warnln("Error starting debug container notification:", err)

0 commit comments

Comments
 (0)