Skip to content

feat(kafka,redpanda): support for waiting for mapped ports without external checks #3165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions docs/features/wait/host_port.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,42 @@ req := ContainerRequest{
WaitingFor: wait.ForExposedPort().SkipInternalCheck(),
}
```

## Skipping the external check

_Testcontainers for Go_ checks if the container is listening to the port externally (outside of container,
from the host where _Testcontainers for Go_ is used) before returning the control to the caller.

But there are cases where this external check is not needed.
In this case, the `wait.ForListeningPort.SkipExternalCheck` can be used to skip the external check.

```golang
req := ContainerRequest{
Image: "nginx:alpine",
// Do not check port 80 externally, check it internally only
WaitingFor: wait.ForListeningPort("80/tcp").SkipExternalCheck(),
}
```

If there is a need to wait only for completion of container port mapping (which doesn't happen immediately after container is started),
then both internal and external checks can be skipped:

```golang
req := ContainerRequest{
Image: "nginx:alpine",
ExposedPorts: []string{"80/tcp"},
// Wait only for completion of port 80 mapping (from container runtime perspective), do not connect to 80 port
WaitingFor: wait.ForListeningPort("80/tcp").SkipInternalCheck().SkipExternalCheck(),
}
```

Alternatively, `wait.ForMappedPort` can be used:

```golang
req := ContainerRequest{
Image: "nginx:alpine",
ExposedPorts: []string{"80/tcp"},
// Wait only for completion of port 80 mapping (from container runtime perspective), do not connect to 80 port
WaitingFor: wait.ForMappedPort("80/tcp"),
}
```
5 changes: 2 additions & 3 deletions modules/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,9 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom

// copyStarterScript copies the starter script into the container.
func copyStarterScript(ctx context.Context, c testcontainers.Container) error {
if err := wait.ForListeningPort(publicPort).
SkipInternalCheck().
if err := wait.ForMappedPort(publicPort).
WaitUntilReady(ctx, c); err != nil {
return fmt.Errorf("wait for exposed port: %w", err)
return fmt.Errorf("wait for mapped port: %w", err)
}

host, err := c.Host(ctx)
Expand Down
2 changes: 1 addition & 1 deletion modules/redpanda/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require (
github.com/klauspost/compress v1.18.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.10 // indirect
github.com/mdelapenya/tlscert v0.1.0
github.com/mdelapenya/tlscert v0.2.0
github.com/moby/patternmatcher v0.6.0 // indirect
github.com/moby/sys/sequential v0.6.0 // indirect
github.com/moby/sys/user v0.4.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions modules/redpanda/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/magiconair/properties v1.8.10 h1:s31yESBquKXCV9a/ScB3ESkOjUYYv+X0rg8SYxI99mE=
github.com/magiconair/properties v1.8.10/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mdelapenya/tlscert v0.1.0 h1:YTpF579PYUX475eOL+6zyEO3ngLTOUWck78NBuJVXaM=
github.com/mdelapenya/tlscert v0.1.0/go.mod h1:wrbyM/DwbFCeCeqdPX/8c6hNOqQgbf0rUDErE1uD+64=
github.com/mdelapenya/tlscert v0.2.0 h1:7H81W6Z/4weDvZBNOfQte5GpIMo0lGYEeWbkGp5LJHI=
github.com/mdelapenya/tlscert v0.2.0/go.mod h1:O4njj3ELLnJjGdkN7M/vIVCpZ+Cf0L6muqOG4tLSl8o=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
github.com/moby/go-archive v0.1.0 h1:Kk/5rdW/g+H8NHdJW2gsXyZ7UnzvJNOy6VKJqueWdcQ=
Expand Down
22 changes: 11 additions & 11 deletions modules/redpanda/redpanda.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"fmt"
"math"
"net/http"
"path/filepath"
"path"
"strings"
"text/template"
"time"
Expand Down Expand Up @@ -38,7 +38,6 @@ const (
defaultKafkaAPIPort = "9092/tcp"
defaultAdminAPIPort = "9644/tcp"
defaultSchemaRegistryPort = "8081/tcp"
defaultDockerKafkaAPIPort = "29092"

redpandaDir = "/etc/redpanda"
entrypointFile = "/entrypoint-tc.sh"
Expand Down Expand Up @@ -87,11 +86,12 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
"--memory=1G",
},
WaitingFor: wait.ForAll(
// Wait for the ports to be exposed only as the container needs configuration
// before it will bind to the ports and be ready to serve requests.
wait.ForListeningPort(defaultKafkaAPIPort).SkipInternalCheck(),
wait.ForListeningPort(defaultAdminAPIPort).SkipInternalCheck(),
wait.ForListeningPort(defaultSchemaRegistryPort).SkipInternalCheck(),
// Wait for the ports to be mapped without accessing them,
// because container needs Redpanda configuration before Redpanda is started
// and the mapped ports are part of that configuration.
wait.ForMappedPort(defaultKafkaAPIPort),
wait.ForMappedPort(defaultAdminAPIPort),
wait.ForMappedPort(defaultSchemaRegistryPort),
),
},
Started: true,
Expand Down Expand Up @@ -158,7 +158,7 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
},
testcontainers.ContainerFile{
Reader: bytes.NewReader(bootstrapConfig),
ContainerFilePath: filepath.Join(redpandaDir, bootstrapConfigFile),
ContainerFilePath: path.Join(redpandaDir, bootstrapConfigFile),
FileMode: 600,
},
)
Expand All @@ -168,12 +168,12 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
req.Files = append(req.Files,
testcontainers.ContainerFile{
Reader: bytes.NewReader(settings.cert),
ContainerFilePath: filepath.Join(redpandaDir, certFile),
ContainerFilePath: path.Join(redpandaDir, certFile),
FileMode: 600,
},
testcontainers.ContainerFile{
Reader: bytes.NewReader(settings.key),
ContainerFilePath: filepath.Join(redpandaDir, keyFile),
ContainerFilePath: path.Join(redpandaDir, keyFile),
FileMode: 600,
},
)
Expand Down Expand Up @@ -206,7 +206,7 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
return c, err
}

err = ctr.CopyToContainer(ctx, nodeConfig, filepath.Join(redpandaDir, "redpanda.yaml"), 0o600)
err = ctr.CopyToContainer(ctx, nodeConfig, path.Join(redpandaDir, "redpanda.yaml"), 0o600)
if err != nil {
return c, fmt.Errorf("copy to container: %w", err)
}
Expand Down
72 changes: 50 additions & 22 deletions modules/redpanda/redpanda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ import (
"github.com/twmb/franz-go/pkg/sasl/scram"

"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/log"
"github.com/testcontainers/testcontainers-go/modules/redpanda"
"github.com/testcontainers/testcontainers-go/network"
)

const testImage = "docker.redpanda.com/redpandadata/redpanda:v23.3.3"

func TestRedpanda(t *testing.T) {
ctx := context.Background()

ctr, err := redpanda.Run(ctx, "docker.redpanda.com/redpandadata/redpanda:v23.3.3")
ctr, err := redpanda.Run(ctx, testImage)
testcontainers.CleanupContainer(t, ctr)
require.NoError(t, err)

Expand Down Expand Up @@ -78,7 +81,7 @@ func TestRedpandaWithAuthentication(t *testing.T) {
ctx := context.Background()
// redpandaCreateContainer {
ctr, err := redpanda.Run(ctx,
"docker.redpanda.com/redpandadata/redpanda:v23.3.3",
testImage,
redpanda.WithEnableSASL(),
redpanda.WithEnableKafkaAuthorization(),
redpanda.WithEnableWasmTransform(),
Expand Down Expand Up @@ -192,7 +195,7 @@ func TestRedpandaWithAuthentication(t *testing.T) {
func TestRedpandaWithBootstrapUserAuthentication(t *testing.T) {
ctx := context.Background()
ctr, err := redpanda.Run(ctx,
"docker.redpanda.com/redpandadata/redpanda:v23.3.3",
testImage,
redpanda.WithEnableSASL(),
redpanda.WithEnableKafkaAuthorization(),
redpanda.WithEnableWasmTransform(),
Expand Down Expand Up @@ -427,7 +430,7 @@ func TestRedpandaWithOldVersionAndWasm(t *testing.T) {
func TestRedpandaProduceWithAutoCreateTopics(t *testing.T) {
ctx := context.Background()

ctr, err := redpanda.Run(ctx, "docker.redpanda.com/redpandadata/redpanda:v23.3.3", redpanda.WithAutoCreateTopics())
ctr, err := redpanda.Run(ctx, testImage, redpanda.WithAutoCreateTopics())
testcontainers.CleanupContainer(t, ctr)
require.NoError(t, err)

Expand All @@ -446,17 +449,17 @@ func TestRedpandaProduceWithAutoCreateTopics(t *testing.T) {
}

func TestRedpandaWithTLS(t *testing.T) {
tmp := t.TempDir()
cert := tlscert.SelfSignedFromRequest(tlscert.Request{
Name: "client",
Host: "localhost,127.0.0.1",
ParentDir: tmp,
})
require.NotNil(t, cert, "failed to generate cert")

ctx := context.Background()

ctr, err := redpanda.Run(ctx, "docker.redpanda.com/redpandadata/redpanda:v23.3.3", redpanda.WithTLS(cert.Bytes, cert.KeyBytes))
containerHostAddress, err := containerHost(ctx)
require.NoError(t, err)
cert, err := tlscert.SelfSignedFromRequestE(tlscert.Request{
Name: "client",
Host: "localhost,127.0.0.1," + containerHostAddress,
})
require.NoError(t, err, "failed to generate cert")

ctr, err := redpanda.Run(ctx, testImage, redpanda.WithTLS(cert.Bytes, cert.KeyBytes))
testcontainers.CleanupContainer(t, ctr)
require.NoError(t, err)

Expand Down Expand Up @@ -509,19 +512,18 @@ func TestRedpandaWithTLS(t *testing.T) {
}

func TestRedpandaWithTLSAndSASL(t *testing.T) {
tmp := t.TempDir()
ctx := context.Background()

cert := tlscert.SelfSignedFromRequest(tlscert.Request{
Name: "client",
Host: "localhost,127.0.0.1",
ParentDir: tmp,
containerHostAddress, err := containerHost(ctx)
require.NoError(t, err)
cert, err := tlscert.SelfSignedFromRequestE(tlscert.Request{
Name: "client",
Host: "localhost,127.0.0.1," + containerHostAddress,
})
require.NotNil(t, cert, "failed to generate cert")

ctx := context.Background()
require.NoError(t, err, "failed to generate cert")

ctr, err := redpanda.Run(ctx,
"docker.redpanda.com/redpandadata/redpanda:v23.3.3",
testImage,
redpanda.WithTLS(cert.Bytes, cert.KeyBytes),
redpanda.WithEnableSASL(),
redpanda.WithEnableKafkaAuthorization(),
Expand Down Expand Up @@ -698,3 +700,29 @@ func TestRedpandaBootstrapConfig(t *testing.T) {
require.False(t, needsRestart)
}
}

func containerHost(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (string, error) {
// Use a dummy request to get the provider from options.
var req testcontainers.GenericContainerRequest
for _, opt := range opts {
if err := opt.Customize(&req); err != nil {
return "", err
}
}

logging := req.Logger
if logging == nil {
logging = log.Default()
}
p, err := req.ProviderType.GetProvider(testcontainers.WithLogger(logging))
if err != nil {
return "", err
}

if p, ok := p.(*testcontainers.DockerProvider); ok {
return p.DaemonHost(ctx)
}

// Fall back to localhost.
return "localhost", nil
}
40 changes: 31 additions & 9 deletions wait/host_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ type HostPortStrategy struct {
// a shell is not available in the container or when the container doesn't bind
// the port internally until additional conditions are met.
skipInternalCheck bool

// skipExternalCheck is a flag to skip the external check, which, if used with
// skipInternalCheck, makes strategy waiting only for port mapping completion
// without accessing port.
skipExternalCheck bool
}

// NewHostPortStrategy constructs a default host port strategy that waits for the given
Expand Down Expand Up @@ -70,6 +75,12 @@ func ForExposedPort() *HostPortStrategy {
return NewHostPortStrategy("")
}

// ForMappedPort returns a host port strategy that waits for the given port
// to be mapped without accessing the port itself.
func ForMappedPort(port nat.Port) *HostPortStrategy {
return NewHostPortStrategy(port).SkipInternalCheck().SkipExternalCheck()
}

// SkipInternalCheck changes the host port strategy to skip the internal check,
// which is useful when a shell is not available in the container or when the
// container doesn't bind the port internally until additional conditions are met.
Expand All @@ -79,6 +90,15 @@ func (hp *HostPortStrategy) SkipInternalCheck() *HostPortStrategy {
return hp
}

// SkipExternalCheck changes the host port strategy to skip the external check,
// which, if used with SkipInternalCheck, makes strategy waiting only for port
// mapping completion without accessing port.
func (hp *HostPortStrategy) SkipExternalCheck() *HostPortStrategy {
hp.skipExternalCheck = true

return hp
}

// WithStartupTimeout can be used to change the default startup timeout
func (hp *HostPortStrategy) WithStartupTimeout(startupTimeout time.Duration) *HostPortStrategy {
hp.timeout = &startupTimeout
Expand Down Expand Up @@ -124,16 +144,12 @@ func (hp *HostPortStrategy) WaitUntilReady(ctx context.Context, target StrategyT
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

ipAddress, err := target.Host(ctx)
if err != nil {
return err
}

waitInterval := hp.PollInterval

internalPort := hp.Port
i := 0
if internalPort == "" {
var err error
// Port is not specified, so we need to detect it.
internalPort, err = hp.detectInternalPort(ctx, target)
if err != nil {
Expand All @@ -157,8 +173,7 @@ func (hp *HostPortStrategy) WaitUntilReady(ctx context.Context, target StrategyT
}
}

var port nat.Port
port, err = target.MappedPort(ctx, internalPort)
port, err := target.MappedPort(ctx, internalPort)
i = 0

for port == "" {
Expand All @@ -178,8 +193,15 @@ func (hp *HostPortStrategy) WaitUntilReady(ctx context.Context, target StrategyT
}
}

if err := externalCheck(ctx, ipAddress, port, target, waitInterval); err != nil {
return fmt.Errorf("external check: %w", err)
if !hp.skipExternalCheck {
ipAddress, err := target.Host(ctx)
if err != nil {
return fmt.Errorf("host: %w", err)
}

if err := externalCheck(ctx, ipAddress, port, target, waitInterval); err != nil {
return fmt.Errorf("external check: %w", err)
}
}

if hp.skipInternalCheck {
Expand Down
Loading
Loading