Skip to content

Commit b26348d

Browse files
nkubalabalopat
authored andcommitted
add back tracking of forwarded ports to avoid race condition (#1780)
1 parent b76ff3c commit b26348d

File tree

4 files changed

+60
-48
lines changed

4 files changed

+60
-48
lines changed

pkg/skaffold/event/server.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"net"
23+
"sync"
2324

2425
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants"
2526
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event/proto"
@@ -69,7 +70,7 @@ func newStatusServer(originalPort int) (func() error, error) {
6970
if originalPort == -1 {
7071
return func() error { return nil }, nil
7172
}
72-
port := util.GetAvailablePort(originalPort)
73+
port := util.GetAvailablePort(originalPort, &sync.Map{})
7374
if port != originalPort && originalPort != constants.DefaultRPCPort {
7475
logrus.Warnf("provided port %d already in use: using %d instead", originalPort, port)
7576
}

pkg/skaffold/kubernetes/port_forward.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"io"
2424
"os/exec"
2525
"strconv"
26+
"sync"
2627

2728
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/color"
2829
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event"
@@ -45,6 +46,9 @@ type PortForwarder struct {
4546

4647
// forwardedPods is a map of portForwardEntry.key() (string) -> portForwardEntry
4748
forwardedPods map[string]*portForwardEntry
49+
50+
// forwardedPorts serves as a synchronized set of ports we've forwarded.
51+
forwardedPorts *sync.Map
4852
}
4953

5054
type portForwardEntry struct {
@@ -69,7 +73,6 @@ type kubectlForwarder struct{}
6973
var (
7074
// For testing
7175
retrieveAvailablePort = util.GetAvailablePort
72-
isPortAvailable = util.IsPortAvailable
7376
)
7477

7578
// Forward port-forwards a pod using kubectl port-forward
@@ -121,11 +124,12 @@ func (*kubectlForwarder) Terminate(p *portForwardEntry) {
121124
// NewPortForwarder returns a struct that tracks and port-forwards pods as they are created and modified
122125
func NewPortForwarder(out io.Writer, podSelector PodSelector, namespaces []string) *PortForwarder {
123126
return &PortForwarder{
124-
Forwarder: &kubectlForwarder{},
125-
output: out,
126-
podSelector: podSelector,
127-
namespaces: namespaces,
128-
forwardedPods: make(map[string]*portForwardEntry),
127+
Forwarder: &kubectlForwarder{},
128+
output: out,
129+
podSelector: podSelector,
130+
namespaces: namespaces,
131+
forwardedPods: make(map[string]*portForwardEntry),
132+
forwardedPorts: &sync.Map{},
129133
}
130134
}
131135

@@ -225,7 +229,7 @@ func (p *PortForwarder) getCurrentEntry(pod *v1.Pod, c v1.Container, port v1.Con
225229
}
226230

227231
// retrieve an open port on the host
228-
entry.localPort = int32(retrieveAvailablePort(int(port.ContainerPort)))
232+
entry.localPort = int32(retrieveAvailablePort(int(port.ContainerPort), p.forwardedPorts))
229233
return entry
230234
}
231235

@@ -248,7 +252,7 @@ func (p *PortForwarder) forward(ctx context.Context, entry *portForwardEntry) er
248252

249253
// Key is an identifier for the lock on a port during the skaffold dev cycle.
250254
func (p *portForwardEntry) key() string {
251-
return fmt.Sprintf("%s-%d", p.containerName, p.port)
255+
return fmt.Sprintf("%s-%s-%s-%d", p.containerName, p.podName, p.namespace, p.port)
252256
}
253257

254258
// String is a utility function that returns the port forward entry as a user-readable string

pkg/skaffold/kubernetes/port_forward_test.go

+29-32
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"io/ioutil"
2323
"reflect"
24+
"sync"
2425
"testing"
2526

2627
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
@@ -47,9 +48,9 @@ func (f *testForwarder) Terminate(pfe *portForwardEntry) {
4748
delete(f.forwardedPorts, pfe.port)
4849
}
4950

50-
func mockRetrieveAvailablePort(taken map[int]struct{}, availablePorts []int) func(int) int {
51+
func mockRetrieveAvailablePort(taken map[int]struct{}, availablePorts []int) func(int, *sync.Map) int {
5152
// Return first available port in ports that isn't taken
52-
return func(int) int {
53+
return func(int, *sync.Map) int {
5354
for _, p := range availablePorts {
5455
if _, ok := taken[p]; ok {
5556
continue
@@ -61,22 +62,6 @@ func mockRetrieveAvailablePort(taken map[int]struct{}, availablePorts []int) fun
6162
}
6263
}
6364

64-
func mockIsPortAvailable(taken map[int]struct{}, availablePorts []int) func(int) bool {
65-
// Return true if p is in availablePorts and is not in taken
66-
return func(p int) bool {
67-
if _, ok := taken[p]; ok {
68-
return false
69-
}
70-
for _, port := range availablePorts {
71-
if p == port {
72-
taken[p] = struct{}{}
73-
return true
74-
}
75-
}
76-
return false
77-
}
78-
}
79-
8065
func newTestForwarder(forwardErr error) *testForwarder {
8166
return &testForwarder{
8267
forwardedEntries: map[string]*portForwardEntry{},
@@ -102,10 +87,11 @@ func TestPortForwardPod(t *testing.T) {
10287
},
10388
availablePorts: []int{8080},
10489
expectedEntries: map[string]*portForwardEntry{
105-
"containername-8080": {
90+
"containername-podname-namespace-8080": {
10691
resourceVersion: 1,
10792
podName: "podname",
10893
containerName: "containername",
94+
namespace: "namespace",
10995
port: 8080,
11096
localPort: 8080,
11197
},
@@ -115,6 +101,7 @@ func TestPortForwardPod(t *testing.T) {
115101
ObjectMeta: metav1.ObjectMeta{
116102
Name: "podname",
117103
ResourceVersion: "1",
104+
Namespace: "namespace",
118105
},
119106
Spec: v1.PodSpec{
120107
Containers: []v1.Container{
@@ -137,10 +124,11 @@ func TestPortForwardPod(t *testing.T) {
137124
9000: true,
138125
},
139126
expectedEntries: map[string]*portForwardEntry{
140-
"containername-8080": {
127+
"containername-podname-namespace-8080": {
141128
resourceVersion: 1,
142129
podName: "podname",
143130
containerName: "containername",
131+
namespace: "namespace",
144132
port: 8080,
145133
localPort: 9000,
146134
},
@@ -151,6 +139,7 @@ func TestPortForwardPod(t *testing.T) {
151139
ObjectMeta: metav1.ObjectMeta{
152140
Name: "podname",
153141
ResourceVersion: "1",
142+
Namespace: "namespace",
154143
},
155144
Spec: v1.PodSpec{
156145
Containers: []v1.Container{
@@ -178,6 +167,7 @@ func TestPortForwardPod(t *testing.T) {
178167
ObjectMeta: metav1.ObjectMeta{
179168
Name: "podname",
180169
ResourceVersion: "10000000000a",
170+
Namespace: "namespace",
181171
},
182172
Spec: v1.PodSpec{
183173
Containers: []v1.Container{
@@ -203,10 +193,11 @@ func TestPortForwardPod(t *testing.T) {
203193
shouldErr: true,
204194
availablePorts: []int{8080},
205195
expectedEntries: map[string]*portForwardEntry{
206-
"containername-8080": {
196+
"containername-podname-namespace-8080": {
207197
resourceVersion: 1,
208198
podName: "podname",
209199
containerName: "containername",
200+
namespace: "namespace",
210201
port: 8080,
211202
localPort: 8080,
212203
},
@@ -216,6 +207,7 @@ func TestPortForwardPod(t *testing.T) {
216207
ObjectMeta: metav1.ObjectMeta{
217208
Name: "podname",
218209
ResourceVersion: "1",
210+
Namespace: "namespace",
219211
},
220212
Spec: v1.PodSpec{
221213
Containers: []v1.Container{
@@ -240,17 +232,19 @@ func TestPortForwardPod(t *testing.T) {
240232
},
241233
availablePorts: []int{8080, 50051},
242234
expectedEntries: map[string]*portForwardEntry{
243-
"containername-8080": {
235+
"containername-podname-namespace-8080": {
244236
resourceVersion: 1,
245237
podName: "podname",
246238
containerName: "containername",
239+
namespace: "namespace",
247240
port: 8080,
248241
localPort: 8080,
249242
},
250-
"containername2-50051": {
243+
"containername2-podname2-namespace2-50051": {
251244
resourceVersion: 1,
252245
podName: "podname2",
253246
containerName: "containername2",
247+
namespace: "namespace2",
254248
port: 50051,
255249
localPort: 50051,
256250
},
@@ -260,6 +254,7 @@ func TestPortForwardPod(t *testing.T) {
260254
ObjectMeta: metav1.ObjectMeta{
261255
Name: "podname",
262256
ResourceVersion: "1",
257+
Namespace: "namespace",
263258
},
264259
Spec: v1.PodSpec{
265260
Containers: []v1.Container{
@@ -278,6 +273,7 @@ func TestPortForwardPod(t *testing.T) {
278273
ObjectMeta: metav1.ObjectMeta{
279274
Name: "podname2",
280275
ResourceVersion: "1",
276+
Namespace: "namespace2",
281277
},
282278
Spec: v1.PodSpec{
283279
Containers: []v1.Container{
@@ -302,17 +298,19 @@ func TestPortForwardPod(t *testing.T) {
302298
},
303299
availablePorts: []int{8080, 9000},
304300
expectedEntries: map[string]*portForwardEntry{
305-
"containername-8080": {
301+
"containername-podname-namespace-8080": {
306302
resourceVersion: 1,
307303
podName: "podname",
308304
containerName: "containername",
305+
namespace: "namespace",
309306
port: 8080,
310307
localPort: 8080,
311308
},
312-
"containername2-8080": {
309+
"containername2-podname2-namespace2-8080": {
313310
resourceVersion: 1,
314311
podName: "podname2",
315312
containerName: "containername2",
313+
namespace: "namespace2",
316314
port: 8080,
317315
localPort: 9000,
318316
},
@@ -322,6 +320,7 @@ func TestPortForwardPod(t *testing.T) {
322320
ObjectMeta: metav1.ObjectMeta{
323321
Name: "podname",
324322
ResourceVersion: "1",
323+
Namespace: "namespace",
325324
},
326325
Spec: v1.PodSpec{
327326
Containers: []v1.Container{
@@ -340,6 +339,7 @@ func TestPortForwardPod(t *testing.T) {
340339
ObjectMeta: metav1.ObjectMeta{
341340
Name: "podname2",
342341
ResourceVersion: "1",
342+
Namespace: "namespace2",
343343
},
344344
Spec: v1.PodSpec{
345345
Containers: []v1.Container{
@@ -363,10 +363,11 @@ func TestPortForwardPod(t *testing.T) {
363363
},
364364
availablePorts: []int{8080},
365365
expectedEntries: map[string]*portForwardEntry{
366-
"containername-8080": {
366+
"containername-podname-namespace-8080": {
367367
resourceVersion: 2,
368368
podName: "podname",
369369
containerName: "containername",
370+
namespace: "namespace",
370371
port: 8080,
371372
localPort: 8080,
372373
},
@@ -376,6 +377,7 @@ func TestPortForwardPod(t *testing.T) {
376377
ObjectMeta: metav1.ObjectMeta{
377378
Name: "podname",
378379
ResourceVersion: "1",
380+
Namespace: "namespace",
379381
},
380382
Spec: v1.PodSpec{
381383
Containers: []v1.Container{
@@ -394,6 +396,7 @@ func TestPortForwardPod(t *testing.T) {
394396
ObjectMeta: metav1.ObjectMeta{
395397
Name: "podname",
396398
ResourceVersion: "2",
399+
Namespace: "namespace",
397400
},
398401
Spec: v1.PodSpec{
399402
Containers: []v1.Container{
@@ -422,12 +425,6 @@ func TestPortForwardPod(t *testing.T) {
422425
retrieveAvailablePort = originalGetAvailablePort
423426
}()
424427

425-
originalIsPortAvailable := isPortAvailable
426-
isPortAvailable = mockIsPortAvailable(taken, test.availablePorts)
427-
defer func() {
428-
isPortAvailable = originalIsPortAvailable
429-
}()
430-
431428
p := NewPortForwarder(ioutil.Discard, NewImageList(), []string{""})
432429
if test.forwarder == nil {
433430
test.forwarder = newTestForwarder(nil)

pkg/skaffold/util/port.go

+17-7
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package util
1919
import (
2020
"fmt"
2121
"net"
22+
"sync"
2223

2324
"github.com/sirupsen/logrus"
2425
)
@@ -29,21 +30,25 @@ import (
2930
// If not, return a random port, which hopefully won't collide with any future containers
3031

3132
// See https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.txt,
32-
func GetAvailablePort(port int) int {
33-
if IsPortAvailable(port) {
33+
func GetAvailablePort(port int, forwardedPorts *sync.Map) int {
34+
if isPortAvailable(port, forwardedPorts) {
35+
forwardedPorts.Store(port, true)
3436
return port
3537
}
3638

3739
// try the next 10 ports after the provided one
38-
for port = 0; port < 10; port++ {
39-
if IsPortAvailable(port) {
40+
for i := 0; i < 10; i++ {
41+
port++
42+
if isPortAvailable(port, forwardedPorts) {
4043
logrus.Debugf("found open port: %d", port)
44+
forwardedPorts.Store(port, true)
4145
return port
4246
}
4347
}
4448

4549
for port = 4503; port <= 4533; port++ {
46-
if IsPortAvailable(port) {
50+
if isPortAvailable(port, forwardedPorts) {
51+
forwardedPorts.Store(port, true)
4752
return port
4853
}
4954
}
@@ -55,10 +60,15 @@ func GetAvailablePort(port int) int {
5560
if err != nil {
5661
return -1
5762
}
58-
return l.Addr().(*net.TCPAddr).Port
63+
p := l.Addr().(*net.TCPAddr).Port
64+
forwardedPorts.Store(p, true)
65+
return p
5966
}
6067

61-
func IsPortAvailable(p int) bool {
68+
func isPortAvailable(p int, forwardedPorts *sync.Map) bool {
69+
if _, ok := forwardedPorts.Load(p); ok {
70+
return false
71+
}
6272
l, err := net.Listen("tcp", fmt.Sprintf(":%d", p))
6373
if l != nil {
6474
defer l.Close()

0 commit comments

Comments
 (0)