Skip to content

Commit a1a3ba1

Browse files
committed
use logs API with Since to collect the very first logs after restart
Signed-off-by: Nicolas De Loof <[email protected]>
1 parent 408a1bd commit a1a3ba1

File tree

6 files changed

+81
-107
lines changed

6 files changed

+81
-107
lines changed

pkg/api/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,7 @@ type ContainerEvent struct {
658658
ID string
659659
Service string
660660
Line string
661-
// ContainerEventExited only
661+
// ExitCode is only set on ContainerEventExited events
662662
ExitCode int
663663
Restarting bool
664664
}

pkg/compose/convergence.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -640,23 +640,23 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P
640640
UseNetworkAliases: true,
641641
Labels: mergeLabels(service.Labels, service.CustomLabels).Add(api.ContainerReplaceLabel, replaced.ID),
642642
}
643-
created, err = s.createMobyContainer(ctx, project, service, tmpName, number, inherited, opts, w)
643+
timeoutInSecond := utils.DurationSecondToInt(timeout)
644+
err = s.apiClient().ContainerStop(ctx, replaced.ID, containerType.StopOptions{Timeout: timeoutInSecond})
644645
if err != nil {
645646
return created, err
646647
}
647648

648-
timeoutInSecond := utils.DurationSecondToInt(timeout)
649-
err = s.apiClient().ContainerStop(ctx, replaced.ID, containerType.StopOptions{Timeout: timeoutInSecond})
649+
err = s.apiClient().ContainerRename(ctx, replaced.ID, tmpName)
650650
if err != nil {
651651
return created, err
652652
}
653653

654-
err = s.apiClient().ContainerRemove(ctx, replaced.ID, containerType.RemoveOptions{})
654+
created, err = s.createMobyContainer(ctx, project, service, name, number, inherited, opts, w)
655655
if err != nil {
656656
return created, err
657657
}
658658

659-
err = s.apiClient().ContainerRename(ctx, created.ID, name)
659+
err = s.apiClient().ContainerRemove(ctx, replaced.ID, containerType.RemoveOptions{})
660660
if err != nil {
661661
return created, err
662662
}

pkg/compose/logs.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"context"
2121
"errors"
2222
"io"
23-
"time"
2423

2524
"github.com/docker/docker/api/types/container"
2625
"github.com/docker/docker/errdefs"
@@ -75,9 +74,10 @@ func (s *composeService) Logs(
7574

7675
if options.Follow {
7776
printer := newLogPrinter(consumer)
78-
eg.Go(printer.Run)
7977

80-
monitor := newMonitor(s.apiClient(), options.Project)
78+
monitor := newMonitor(s.apiClient(), projectName)
79+
monitor.withServices(options.Services)
80+
monitor.withListener(printer.HandleEvent)
8181
monitor.withListener(func(event api.ContainerEvent) {
8282
if event.Type == api.ContainerEventStarted {
8383
eg.Go(func() error {
@@ -88,7 +88,7 @@ func (s *composeService) Logs(
8888

8989
err = s.doLogContainer(ctx, consumer, event.Source, ctr, api.LogOptions{
9090
Follow: options.Follow,
91-
Since: time.Unix(0, event.Time).Format(time.RFC3339Nano),
91+
Since: ctr.State.StartedAt,
9292
Until: options.Until,
9393
Tail: options.Tail,
9494
Timestamps: options.Timestamps,
@@ -103,7 +103,6 @@ func (s *composeService) Logs(
103103
}
104104
})
105105
eg.Go(func() error {
106-
defer printer.Stop()
107106
return monitor.Start(ctx)
108107
})
109108
}

pkg/compose/monitor.go

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"context"
2121
"strconv"
2222

23-
"github.com/compose-spec/compose-go/v2/types"
2423
"github.com/containerd/errdefs"
2524
"github.com/docker/docker/api/types/container"
2625
"github.com/docker/docker/api/types/events"
@@ -34,23 +33,23 @@ import (
3433

3534
type monitor struct {
3635
api client.APIClient
37-
project *types.Project
36+
project string
3837
// services tells us which service to consider and those we can ignore, maybe ran by a concurrent compose command
3938
services map[string]bool
4039
listeners []api.ContainerEventListener
4140
}
4241

43-
func newMonitor(api client.APIClient, project *types.Project) *monitor {
44-
services := map[string]bool{}
45-
if project != nil {
46-
for name := range project.Services {
47-
services[name] = true
48-
}
49-
}
42+
func newMonitor(api client.APIClient, project string) *monitor {
5043
return &monitor{
5144
api: api,
5245
project: project,
53-
services: services,
46+
services: map[string]bool{},
47+
}
48+
}
49+
50+
func (c *monitor) withServices(services []string) {
51+
for _, name := range services {
52+
c.services[name] = true
5453
}
5554
}
5655

@@ -62,7 +61,7 @@ func (c *monitor) Start(ctx context.Context) error {
6261
initialState, err := c.api.ContainerList(ctx, container.ListOptions{
6362
All: true,
6463
Filters: filters.NewArgs(
65-
projectFilter(c.project.Name),
64+
projectFilter(c.project),
6665
oneOffFilter(false),
6766
hasConfigHashLabel(),
6867
),
@@ -84,16 +83,19 @@ func (c *monitor) Start(ctx context.Context) error {
8483
evtCh, errCh := c.api.Events(ctx, events.ListOptions{
8584
Filters: filters.NewArgs(
8685
filters.Arg("type", "container"),
87-
projectFilter(c.project.Name)),
86+
projectFilter(c.project)),
8887
})
8988
for {
89+
if len(containers) == 0 {
90+
return nil
91+
}
9092
select {
9193
case <-ctx.Done():
9294
return nil
9395
case err := <-errCh:
9496
return err
9597
case event := <-evtCh:
96-
if !c.services[event.Actor.Attributes[api.ServiceLabel]] {
98+
if len(c.services) > 0 && !c.services[event.Actor.Attributes[api.ServiceLabel]] {
9799
continue
98100
}
99101
ctr, err := c.getContainerSummary(event)
@@ -103,24 +105,35 @@ func (c *monitor) Start(ctx context.Context) error {
103105

104106
switch event.Action {
105107
case events.ActionCreate:
106-
containers.Add(ctr.ID)
108+
if len(c.services) == 0 || c.services[ctr.Labels[api.ServiceLabel]] {
109+
containers.Add(ctr.ID)
110+
}
111+
evtType := api.ContainerEventCreated
112+
if _, ok := ctr.Labels[api.ContainerReplaceLabel]; ok {
113+
evtType = api.ContainerEventRecreated
114+
}
107115
for _, listener := range c.listeners {
108-
listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventCreated))
116+
listener(newContainerEvent(event.TimeNano, ctr, evtType))
109117
}
110118
logrus.Debugf("container %s created", ctr.Name)
111119
case events.ActionStart:
112120
restarted := restarting.Has(ctr.ID)
113-
for _, listener := range c.listeners {
114-
listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventStarted, func(e *api.ContainerEvent) {
115-
e.Restarting = restarted
116-
}))
117-
}
118121
if restarted {
119122
logrus.Debugf("container %s restarted", ctr.Name)
123+
for _, listener := range c.listeners {
124+
listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventStarted, func(e *api.ContainerEvent) {
125+
e.Restarting = restarted
126+
}))
127+
}
120128
} else {
121129
logrus.Debugf("container %s started", ctr.Name)
130+
for _, listener := range c.listeners {
131+
listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventStarted))
132+
}
133+
}
134+
if len(c.services) == 0 || c.services[ctr.Labels[api.ServiceLabel]] {
135+
containers.Add(ctr.ID)
122136
}
123-
containers.Add(ctr.ID)
124137
case events.ActionRestart:
125138
for _, listener := range c.listeners {
126139
listener(newContainerEvent(event.TimeNano, ctr, api.ContainerEventRestarted))
@@ -159,9 +172,6 @@ func (c *monitor) Start(ctx context.Context) error {
159172
}
160173
}
161174
}
162-
if len(containers) == 0 {
163-
return nil
164-
}
165175
}
166176
}
167177

@@ -192,7 +202,7 @@ func (c *monitor) getContainerSummary(event events.Message) (*api.ContainerSumma
192202
ctr := &api.ContainerSummary{
193203
ID: event.Actor.ID,
194204
Name: event.Actor.Attributes["name"],
195-
Project: c.project.Name,
205+
Project: c.project,
196206
Service: event.Actor.Attributes[api.ServiceLabel],
197207
Labels: event.Actor.Attributes, // More than just labels, but that'c the closest the API gives us
198208
}

pkg/compose/printer.go

Lines changed: 9 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -18,80 +18,36 @@ package compose
1818

1919
import (
2020
"fmt"
21-
"sync"
2221

2322
"github.com/docker/compose/v2/pkg/api"
2423
)
2524

2625
// logPrinter watch application containers and collect their logs
2726
type logPrinter interface {
2827
HandleEvent(event api.ContainerEvent)
29-
Run() error
30-
Stop()
3128
}
3229

3330
type printer struct {
34-
queue chan api.ContainerEvent
3531
consumer api.LogConsumer
36-
stopCh chan struct{} // stopCh is a signal channel for producers to stop sending events to the queue
37-
stop sync.Once
3832
}
3933

4034
// newLogPrinter builds a LogPrinter passing containers logs to LogConsumer
4135
func newLogPrinter(consumer api.LogConsumer) logPrinter {
4236
printer := printer{
4337
consumer: consumer,
44-
queue: make(chan api.ContainerEvent),
45-
stopCh: make(chan struct{}),
46-
stop: sync.Once{},
4738
}
4839
return &printer
4940
}
5041

51-
func (p *printer) Stop() {
52-
p.stop.Do(func() {
53-
close(p.stopCh)
54-
for {
55-
select {
56-
case <-p.queue:
57-
// purge the queue to free producers goroutines
58-
// p.queue will be garbage collected
59-
default:
60-
return
61-
}
62-
}
63-
})
64-
}
65-
6642
func (p *printer) HandleEvent(event api.ContainerEvent) {
67-
select {
68-
case <-p.stopCh:
69-
return
70-
default:
71-
p.queue <- event
72-
}
73-
}
74-
75-
func (p *printer) Run() error {
76-
defer p.Stop()
77-
78-
// containers we are tracking. Use true when container is running, false after we receive a stop|die signal
79-
for {
80-
select {
81-
case <-p.stopCh:
82-
return nil
83-
case event := <-p.queue:
84-
switch event.Type {
85-
case api.ContainerEventExited, api.ContainerEventStopped, api.ContainerEventRecreated, api.ContainerEventRestarted:
86-
p.consumer.Status(event.Source, fmt.Sprintf("exited with code %d", event.ExitCode))
87-
if event.Type == api.ContainerEventRecreated {
88-
p.consumer.Status(event.Source, "has been recreated")
89-
}
90-
case api.ContainerEventLog, api.HookEventLog:
91-
p.consumer.Log(event.Source, event.Line)
92-
case api.ContainerEventErr:
93-
p.consumer.Err(event.Source, event.Line)
94-
}
95-
}
43+
switch event.Type {
44+
case api.ContainerEventExited:
45+
p.consumer.Status(event.Source, fmt.Sprintf("exited with code %d", event.ExitCode))
46+
case api.ContainerEventRecreated:
47+
p.consumer.Status(event.Source, "has been recreated")
48+
case api.ContainerEventLog, api.HookEventLog:
49+
p.consumer.Log(event.Source, event.Line)
50+
case api.ContainerEventErr:
51+
p.consumer.Err(event.Source, event.Line)
9652
}
9753
}

0 commit comments

Comments
 (0)