Skip to content

Commit 8bfb7a0

Browse files
committed
move application logs to their own endpoint
1 parent 540a4a0 commit 8bfb7a0

File tree

6 files changed

+303
-155
lines changed

6 files changed

+303
-155
lines changed

pkg/skaffold/event/v2/application_logs_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ func TestHandleApplicationLogEvent(t *testing.T) {
4242
})
4343
}
4444
wait(t, func() bool {
45-
testHandler.logLock.Lock()
46-
logLen := len(testHandler.eventLog)
47-
testHandler.logLock.Unlock()
45+
testHandler.applicationLogsLock.Lock()
46+
logLen := len(testHandler.applicationLogs)
47+
testHandler.applicationLogsLock.Unlock()
4848
return logLen == len(messages)
4949
})
5050
}

pkg/skaffold/event/v2/event.go

+45-19
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,18 @@ func newHandler() *eventHandler {
6666
}
6767

6868
type eventHandler struct {
69-
eventLog []proto.Event
70-
logLock sync.Mutex
71-
cfg event.Config
69+
eventLog []proto.Event
70+
logLock sync.Mutex
71+
applicationLogs []proto.Event
72+
applicationLogsLock sync.Mutex
73+
cfg event.Config
7274

73-
iteration int
74-
state proto.State
75-
stateLock sync.Mutex
76-
eventChan chan *proto.Event
77-
listeners []*listener
75+
iteration int
76+
state proto.State
77+
stateLock sync.Mutex
78+
eventChan chan *proto.Event
79+
eventListeners []*listener
80+
applicationLogListeners []*listener
7881
}
7982

8083
type listener struct {
@@ -96,6 +99,10 @@ func ForEachEvent(callback func(*proto.Event) error) error {
9699
return handler.forEachEvent(callback)
97100
}
98101

102+
func ForEachApplicationLog(callback func(*proto.Event) error) error {
103+
return handler.forEachApplicationLog(callback)
104+
}
105+
99106
func Handle(event *proto.Event) error {
100107
if event != nil {
101108
handler.handle(event)
@@ -115,10 +122,10 @@ func (ev *eventHandler) getState() proto.State {
115122
return state
116123
}
117124

118-
func (ev *eventHandler) logEvent(event *proto.Event) {
119-
ev.logLock.Lock()
125+
func (ev *eventHandler) log(event *proto.Event, listeners *[]*listener, log *[]proto.Event, lock sync.Locker) {
126+
lock.Lock()
120127

121-
for _, listener := range ev.listeners {
128+
for _, listener := range *listeners {
122129
if listener.closed {
123130
continue
124131
}
@@ -128,24 +135,32 @@ func (ev *eventHandler) logEvent(event *proto.Event) {
128135
listener.closed = true
129136
}
130137
}
131-
ev.eventLog = append(ev.eventLog, *event)
138+
*log = append(*log, *event)
132139

133-
ev.logLock.Unlock()
140+
lock.Unlock()
134141
}
135142

136-
func (ev *eventHandler) forEachEvent(callback func(*proto.Event) error) error {
143+
func (ev *eventHandler) logEvent(event *proto.Event) {
144+
ev.log(event, &ev.eventListeners, &ev.eventLog, &ev.logLock)
145+
}
146+
147+
func (ev *eventHandler) logApplicationLog(event *proto.Event) {
148+
ev.log(event, &ev.applicationLogListeners, &ev.applicationLogs, &ev.applicationLogsLock)
149+
}
150+
151+
func (ev *eventHandler) forEach(listeners *[]*listener, log *[]proto.Event, lock sync.Locker, callback func(*proto.Event) error) error {
137152
listener := &listener{
138153
callback: callback,
139154
errors: make(chan error),
140155
}
141156

142-
ev.logLock.Lock()
157+
lock.Lock()
143158

144-
oldEvents := make([]proto.Event, len(ev.eventLog))
145-
copy(oldEvents, ev.eventLog)
146-
ev.listeners = append(ev.listeners, listener)
159+
oldEvents := make([]proto.Event, len(*log))
160+
copy(oldEvents, *log)
161+
*listeners = append(*listeners, listener)
147162

148-
ev.logLock.Unlock()
163+
lock.Unlock()
149164

150165
for i := range oldEvents {
151166
if err := callback(&oldEvents[i]); err != nil {
@@ -157,6 +172,14 @@ func (ev *eventHandler) forEachEvent(callback func(*proto.Event) error) error {
157172
return <-listener.errors
158173
}
159174

175+
func (ev *eventHandler) forEachEvent(callback func(*proto.Event) error) error {
176+
return ev.forEach(&ev.eventListeners, &ev.eventLog, &ev.logLock, callback)
177+
}
178+
179+
func (ev *eventHandler) forEachApplicationLog(callback func(*proto.Event) error) error {
180+
return ev.forEach(&ev.applicationLogListeners, &ev.applicationLogs, &ev.applicationLogsLock, callback)
181+
}
182+
160183
func emptyState(cfg event.Config) proto.State {
161184
builds := map[string]string{}
162185
for _, p := range cfg.GetPipelines() {
@@ -365,6 +388,9 @@ func (ev *eventHandler) handleBuildSubtaskEvent(e *proto.BuildSubtaskEvent) {
365388

366389
func (ev *eventHandler) handleExec(event *proto.Event) {
367390
switch e := event.GetEventType().(type) {
391+
case *proto.Event_ApplicationLogEvent:
392+
ev.logApplicationLog(event)
393+
return
368394
case *proto.Event_BuildSubtaskEvent:
369395
be := e.BuildSubtaskEvent
370396
ev.stateLock.Lock()

pkg/skaffold/server/v2/endpoints.go

+4
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ func (s *Server) Events(_ *empty.Empty, stream proto.SkaffoldV2Service_EventsSer
4242
return event.ForEachEvent(stream.Send)
4343
}
4444

45+
func (s *Server) ApplicationLogs(_ *empty.Empty, stream proto.SkaffoldV2Service_ApplicationLogsServer) error {
46+
return event.ForEachApplicationLog(stream.Send)
47+
}
48+
4549
func (s *Server) Handle(ctx context.Context, e *proto.Event) (*empty.Empty, error) {
4650
return &empty.Empty{}, event.Handle(e)
4751
}

0 commit comments

Comments
 (0)