Skip to content

Commit d18604e

Browse files
authored
Fix port forwarding on Windows (#4373)
Wrap kubectl portforward call in job object for Windows
1 parent aa43689 commit d18604e

File tree

6 files changed

+176
-12
lines changed

6 files changed

+176
-12
lines changed

pkg/skaffold/kubectl/cli.go

+6
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ func (c *CLI) RunOut(ctx context.Context, command string, arg ...string) ([]byte
8080
return util.RunCmdOut(cmd)
8181
}
8282

83+
// CommandWithStrictCancellation ensures for windows OS that all child process get terminated on cancellation
84+
func (c *CLI) CommandWithStrictCancellation(ctx context.Context, command string, arg ...string) *Cmd {
85+
args := c.args(command, "", arg...)
86+
return CommandContext(ctx, "kubectl", args...)
87+
}
88+
8389
// args builds an argument list for calling kubectl and consistently
8490
// adds the `--context` and `--namespace` flags.
8591
func (c *CLI) args(command string, namespace string, arg ...string) []string {

pkg/skaffold/kubectl/cli_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -101,4 +101,26 @@ func TestCLI(t *testing.T) {
101101
t.CheckDeepEqual(string(out), output)
102102
})
103103
}
104+
105+
// test cli.CommandWithStrictCancellation()
106+
for _, test := range tests {
107+
testutil.Run(t, test.name, func(t *testutil.T) {
108+
t.Override(&util.DefaultExecCommand, testutil.CmdRunOut(
109+
test.expectedCommand,
110+
output,
111+
))
112+
113+
cli := NewFromRunContext(&runcontext.RunContext{
114+
Opts: config.SkaffoldOptions{
115+
Namespace: test.namespace,
116+
KubeConfig: test.kubeconfig,
117+
},
118+
KubeContext: kubeContext,
119+
})
120+
cmd := cli.CommandWithStrictCancellation(context.Background(), "exec", "arg1", "arg2")
121+
out, err := util.RunCmdOut(cmd.Cmd)
122+
t.CheckNoError(err)
123+
t.CheckDeepEqual(string(out), output)
124+
})
125+
}
104126
}

pkg/skaffold/kubectl/exec.go

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// +build !windows
2+
3+
/*
4+
Copyright 2020 The Skaffold Authors
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package kubectl
20+
21+
import (
22+
"context"
23+
"os/exec"
24+
)
25+
26+
// Cmd is a wrapper on exec.Cmd
27+
type Cmd struct {
28+
*exec.Cmd
29+
}
30+
31+
// CommandContext creates a new Cmd
32+
func CommandContext(ctx context.Context, name string, arg ...string) *Cmd {
33+
return &Cmd{Cmd: exec.CommandContext(ctx, name, arg...)}
34+
}
35+
36+
// Terminate kills the underlying process
37+
func (c *Cmd) Terminate() error {
38+
return c.Process.Kill()
39+
}

pkg/skaffold/kubectl/exec_windows.go

+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
Copyright 2020 The Skaffold Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kubectl
18+
19+
import (
20+
"context"
21+
"os/exec"
22+
"unsafe"
23+
24+
"golang.org/x/sys/windows"
25+
)
26+
27+
// Cmd represents an external command being prepared to run within a job object
28+
type Cmd struct {
29+
*exec.Cmd
30+
handle windows.Handle
31+
ctx context.Context
32+
}
33+
34+
type process struct {
35+
Pid int
36+
Handle uintptr
37+
}
38+
39+
// CommandContext creates a new Cmd
40+
func CommandContext(ctx context.Context, name string, arg ...string) *Cmd {
41+
return &Cmd{Cmd: exec.CommandContext(ctx, name, arg...), ctx: ctx}
42+
}
43+
44+
// Start starts the specified command in a job object but does not wait for it to complete
45+
func (c *Cmd) Start() (err error) {
46+
var handle windows.Handle
47+
handle, err = windows.CreateJobObject(nil, nil)
48+
if err != nil {
49+
return
50+
}
51+
52+
go func() {
53+
<-c.ctx.Done()
54+
c.Terminate()
55+
}()
56+
57+
// https://gist.github.com/hallazzang/76f3970bfc949831808bbebc8ca15209
58+
info := windows.JOBOBJECT_EXTENDED_LIMIT_INFORMATION{
59+
BasicLimitInformation: windows.JOBOBJECT_BASIC_LIMIT_INFORMATION{
60+
LimitFlags: windows.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
61+
},
62+
}
63+
if _, err = windows.SetInformationJobObject(
64+
handle,
65+
windows.JobObjectExtendedLimitInformation,
66+
uintptr(unsafe.Pointer(&info)),
67+
uint32(unsafe.Sizeof(info))); err != nil {
68+
return
69+
}
70+
71+
if err = c.Cmd.Start(); err != nil {
72+
return
73+
}
74+
75+
if err = windows.AssignProcessToJobObject(
76+
handle,
77+
windows.Handle((*process)(unsafe.Pointer(c.Process)).Handle)); err != nil {
78+
return
79+
}
80+
c.handle = handle
81+
return
82+
}
83+
84+
// Run starts the specified command in a job object and waits for it to complete
85+
func (c *Cmd) Run() error {
86+
if err := c.Start(); err != nil {
87+
return err
88+
}
89+
return c.Wait()
90+
}
91+
92+
// Terminate closes the job object handle which kills all connected processes
93+
func (c *Cmd) Terminate() error {
94+
return windows.CloseHandle(c.handle)
95+
}

pkg/skaffold/kubernetes/portforward/kubectl_forwarder.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"context"
2222
"fmt"
2323
"io"
24-
"os/exec"
2524
"strings"
2625
"time"
2726

@@ -127,7 +126,7 @@ func (k *KubectlForwarder) forward(parentCtx context.Context, pfe *portForwardEn
127126
}
128127
}
129128

130-
func portForwardCommand(ctx context.Context, k *kubectl.CLI, pfe *portForwardEntry, buf io.Writer) *exec.Cmd {
129+
func portForwardCommand(ctx context.Context, k *kubectl.CLI, pfe *portForwardEntry, buf io.Writer) *kubectl.Cmd {
131130
args := []string{
132131
"--pod-running-timeout", "1s",
133132
fmt.Sprintf("%s/%s", pfe.resource.Type, pfe.resource.Name),
@@ -137,7 +136,7 @@ func portForwardCommand(ctx context.Context, k *kubectl.CLI, pfe *portForwardEnt
137136
if pfe.resource.Address != "" && pfe.resource.Address != util.Loopback {
138137
args = append(args, []string{"--address", pfe.resource.Address}...)
139138
}
140-
cmd := k.Command(ctx, "port-forward", args...)
139+
cmd := k.CommandWithStrictCancellation(ctx, "port-forward", args...)
141140
cmd.Stdout = buf
142141
cmd.Stderr = buf
143142
return cmd
@@ -159,7 +158,7 @@ func (*KubectlForwarder) Terminate(p *portForwardEntry) {
159158
// Monitor monitors the logs for a kubectl port forward command
160159
// If it sees an error, it calls back to the EntryManager to
161160
// retry the entire port forward operation.
162-
func (*KubectlForwarder) monitorErrorLogs(ctx context.Context, buf *bytes.Buffer, cmd *exec.Cmd, p *portForwardEntry) {
161+
func (*KubectlForwarder) monitorErrorLogs(ctx context.Context, buf *bytes.Buffer, cmd *kubectl.Cmd, p *portForwardEntry) {
163162
for {
164163
select {
165164
case <-ctx.Done():
@@ -179,7 +178,7 @@ func (*KubectlForwarder) monitorErrorLogs(ctx context.Context, buf *bytes.Buffer
179178
strings.Contains(s, "error upgrading connection") {
180179
// kubectl is having an error. retry the command
181180
logrus.Tracef("killing port forwarding %v", p)
182-
if err := cmd.Process.Kill(); err != nil {
181+
if err := cmd.Terminate(); err != nil {
183182
logrus.Tracef("failed to kill port forwarding %v, err: %s", p, err)
184183
}
185184
return

pkg/skaffold/kubernetes/portforward/kubectl_forwarder_test.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package portforward
1919
import (
2020
"bytes"
2121
"context"
22-
"os/exec"
22+
"runtime"
2323
"sync"
2424
"testing"
2525
"time"
@@ -117,8 +117,11 @@ func TestMonitorErrorLogs(t *testing.T) {
117117
t.Override(&waitErrorLogs, 50*time.Millisecond)
118118

119119
ctx, cancel := context.WithCancel(context.Background())
120-
121-
cmd := exec.Command("sleep", "5")
120+
cmdStr := "sleep"
121+
if runtime.GOOS == "windows" {
122+
cmdStr = "timeout"
123+
}
124+
cmd := kubectl.CommandContext(ctx, cmdStr, "5")
122125
if err := cmd.Start(); err != nil {
123126
t.Fatal("error starting command")
124127
}
@@ -145,21 +148,21 @@ func TestMonitorErrorLogs(t *testing.T) {
145148
// make sure the command is running or killed based on what's expected
146149
if test.cmdRunning {
147150
assertCmdIsRunning(t, cmd)
148-
cmd.Process.Kill()
151+
cmd.Terminate()
149152
} else {
150153
assertCmdWasKilled(t, cmd)
151154
}
152155
})
153156
}
154157
}
155158

156-
func assertCmdIsRunning(t *testutil.T, cmd *exec.Cmd) {
159+
func assertCmdIsRunning(t *testutil.T, cmd *kubectl.Cmd) {
157160
if cmd.ProcessState != nil {
158161
t.Fatal("cmd was killed but expected to continue running")
159162
}
160163
}
161164

162-
func assertCmdWasKilled(t *testutil.T, cmd *exec.Cmd) {
165+
func assertCmdWasKilled(t *testutil.T, cmd *kubectl.Cmd) {
163166
if err := cmd.Wait(); err == nil {
164167
t.Fatal("cmd was not killed but expected to be killed")
165168
}
@@ -189,7 +192,7 @@ func TestDefaultAddressArg(t *testing.T) {
189192
assertCmdContainsArgs(t, cmd, false, "--address")
190193
}
191194

192-
func assertCmdContainsArgs(t *testing.T, cmd *exec.Cmd, expected bool, args ...string) {
195+
func assertCmdContainsArgs(t *testing.T, cmd *kubectl.Cmd, expected bool, args ...string) {
193196
if len(args) == 0 {
194197
return
195198
}

0 commit comments

Comments
 (0)