Skip to content

Move application logs to their own endpoint for API V2 #5868

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/skaffold/event/v2/application_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ func TestHandleApplicationLogEvent(t *testing.T) {
})
}
wait(t, func() bool {
testHandler.logLock.Lock()
logLen := len(testHandler.eventLog)
testHandler.logLock.Unlock()
testHandler.applicationLogsLock.Lock()
logLen := len(testHandler.applicationLogs)
testHandler.applicationLogsLock.Unlock()
return logLen == len(messages)
})
}
64 changes: 45 additions & 19 deletions pkg/skaffold/event/v2/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,18 @@ func newHandler() *eventHandler {
}

type eventHandler struct {
eventLog []proto.Event
logLock sync.Mutex
cfg event.Config
eventLog []proto.Event
logLock sync.Mutex
applicationLogs []proto.Event
applicationLogsLock sync.Mutex
cfg event.Config

iteration int
state proto.State
stateLock sync.Mutex
eventChan chan *proto.Event
listeners []*listener
iteration int
state proto.State
stateLock sync.Mutex
eventChan chan *proto.Event
eventListeners []*listener
applicationLogListeners []*listener
}

type listener struct {
Expand All @@ -96,6 +99,10 @@ func ForEachEvent(callback func(*proto.Event) error) error {
return handler.forEachEvent(callback)
}

func ForEachApplicationLog(callback func(*proto.Event) error) error {
return handler.forEachApplicationLog(callback)
}

func Handle(event *proto.Event) error {
if event != nil {
handler.handle(event)
Expand All @@ -115,10 +122,10 @@ func (ev *eventHandler) getState() proto.State {
return state
}

func (ev *eventHandler) logEvent(event *proto.Event) {
ev.logLock.Lock()
func (ev *eventHandler) log(event *proto.Event, listeners *[]*listener, log *[]proto.Event, lock sync.Locker) {
lock.Lock()

for _, listener := range ev.listeners {
for _, listener := range *listeners {
if listener.closed {
continue
}
Expand All @@ -128,24 +135,32 @@ func (ev *eventHandler) logEvent(event *proto.Event) {
listener.closed = true
}
}
ev.eventLog = append(ev.eventLog, *event)
*log = append(*log, *event)

ev.logLock.Unlock()
lock.Unlock()
}

func (ev *eventHandler) forEachEvent(callback func(*proto.Event) error) error {
func (ev *eventHandler) logEvent(event *proto.Event) {
ev.log(event, &ev.eventListeners, &ev.eventLog, &ev.logLock)
}

func (ev *eventHandler) logApplicationLog(event *proto.Event) {
ev.log(event, &ev.applicationLogListeners, &ev.applicationLogs, &ev.applicationLogsLock)
}

func (ev *eventHandler) forEach(listeners *[]*listener, log *[]proto.Event, lock sync.Locker, callback func(*proto.Event) error) error {
listener := &listener{
callback: callback,
errors: make(chan error),
}

ev.logLock.Lock()
lock.Lock()

oldEvents := make([]proto.Event, len(ev.eventLog))
copy(oldEvents, ev.eventLog)
ev.listeners = append(ev.listeners, listener)
oldEvents := make([]proto.Event, len(*log))
copy(oldEvents, *log)
*listeners = append(*listeners, listener)

ev.logLock.Unlock()
lock.Unlock()

for i := range oldEvents {
if err := callback(&oldEvents[i]); err != nil {
Expand All @@ -157,6 +172,14 @@ func (ev *eventHandler) forEachEvent(callback func(*proto.Event) error) error {
return <-listener.errors
}

func (ev *eventHandler) forEachEvent(callback func(*proto.Event) error) error {
return ev.forEach(&ev.eventListeners, &ev.eventLog, &ev.logLock, callback)
}

func (ev *eventHandler) forEachApplicationLog(callback func(*proto.Event) error) error {
return ev.forEach(&ev.applicationLogListeners, &ev.applicationLogs, &ev.applicationLogsLock, callback)
}

func emptyState(cfg event.Config) proto.State {
builds := map[string]string{}
for _, p := range cfg.GetPipelines() {
Expand Down Expand Up @@ -365,6 +388,9 @@ func (ev *eventHandler) handleBuildSubtaskEvent(e *proto.BuildSubtaskEvent) {

func (ev *eventHandler) handleExec(event *proto.Event) {
switch e := event.GetEventType().(type) {
case *proto.Event_ApplicationLogEvent:
ev.logApplicationLog(event)
return
case *proto.Event_BuildSubtaskEvent:
be := e.BuildSubtaskEvent
ev.stateLock.Lock()
Expand Down
4 changes: 4 additions & 0 deletions pkg/skaffold/server/v2/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func (s *Server) Events(_ *empty.Empty, stream proto.SkaffoldV2Service_EventsSer
return event.ForEachEvent(stream.Send)
}

func (s *Server) ApplicationLogs(_ *empty.Empty, stream proto.SkaffoldV2Service_ApplicationLogsServer) error {
return event.ForEachApplicationLog(stream.Send)
}

func (s *Server) Handle(ctx context.Context, e *proto.Event) (*empty.Empty, error) {
return &empty.Empty{}, event.Handle(e)
}
Expand Down
Loading