Skip to content

Add missing bytes field to backend response log #494

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
May 9, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Unreleased changes are available as `avenga/couper:edge` container.
* Log malformed duration settings ([#487](https://github.com/avenga/couper/pull/487))
* `url` attribute could make use of our wildcard pattern `/**` and relative urls in combination with a backend reference ([#480](https://github.com/avenga/couper/pull/480))
* Error handling for `backend`, `backend_openapi_validation` and `backend_timeout` error types ([#490](https://github.com/avenga/couper/pull/490))
* `response.bytes` log-field to backend logs if read from body, fallback is the `Content-Length` header ([#494](https://github.com/avenga/couper/pull/494))

* **Changed**
* Permission handling: ([#477](https://github.com/avenga/couper/pull/477))
Expand Down
1 change: 1 addition & 0 deletions config/request/context_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const (
ContextType ContextKey = iota
APIName
AccessControls
BackendBytes
BackendName
BackendParams
BackendTokenRequest
Expand Down
175 changes: 88 additions & 87 deletions docs/LOGS.md

Large diffs are not rendered by default.

18 changes: 15 additions & 3 deletions eval/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ import (
"sync"

"github.com/zclconf/go-cty/cty"

"github.com/avenga/couper/config/request"
)

const TokenRequestPrefix = "_tr_"

type SyncedVariables struct {
items sync.Map
}
Expand All @@ -20,11 +24,18 @@ func NewSyncedVariables() *SyncedVariables {
type syncPair struct {
name string
bereq, beresp cty.Value
tokenRequest bool
}

// Set finalized cty req/resp pair.
func (sv *SyncedVariables) Set(beresp *http.Response) {
name, bereqV, berespV := newBerespValues(beresp.Request.Context(), true, beresp)
ctx := beresp.Request.Context()
name, bereqV, berespV := newBerespValues(ctx, true, beresp)

if tr, ok := ctx.Value(request.TokenRequest).(string); ok && tr != "" {
name = TokenRequestPrefix + name
}

sv.items.Store(name, &syncPair{
name: name,
bereq: bereqV,
Expand All @@ -46,12 +57,13 @@ func (sv *SyncedVariables) Sync(variables map[string]cty.Value) {
if bereqs == nil {
bereqs = make(map[string]cty.Value)
}
bereqs[p.name] = p.bereq
name := key.(string)
bereqs[name] = p.bereq

if beresps == nil {
beresps = make(map[string]cty.Value)
}
beresps[p.name] = p.beresp
beresps[name] = p.beresp

return true
})
Expand Down
6 changes: 4 additions & 2 deletions handler/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (e *Endpoint) produce(req *http.Request) (producer.ResultMap, error) {
close(tripCh)

for resultCh := range tripCh {
e.readResults(resultCh, results)
e.readResults(req.Context(), resultCh, results)
}

var err error // TODO: prefer default resp err
Expand All @@ -240,10 +240,12 @@ func (e *Endpoint) produce(req *http.Request) (producer.ResultMap, error) {
return results, err
}

func (e *Endpoint) readResults(requestResults producer.Results, beresps producer.ResultMap) {
func (e *Endpoint) readResults(ctx context.Context, requestResults producer.Results, beresps producer.ResultMap) {
i := 0
for {
select {
case <-ctx.Done():
return
case r, more := <-requestResults:
if !more {
return
Expand Down
23 changes: 13 additions & 10 deletions handler/transport/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func (b *Backend) RoundTrip(req *http.Request) (*http.Response, error) {
}

if !eval.IsUpgradeResponse(req, beresp) {
beresp.Body = logging.NewBytesCountReader(beresp)
if err = setGzipReader(beresp); err != nil {
b.upstreamLog.LogEntry().WithContext(req.Context()).WithError(err).Error()
}
Expand Down Expand Up @@ -373,18 +374,20 @@ func (b *Backend) withTimeout(req *http.Request, conf *Config) <-chan error {
return
}

go func(done <-chan struct{}) {
go func(c context.Context, timeoutCh chan time.Time) {
ttfbTimer.Reset(conf.TTFBTimeout)
select {
case <-done:
if !ttfbTimer.Stop() {
<-ttfbTimer.C
case <-c.Done():
ttfbTimer.Stop()
select {
case <-ttfbTimer.C:
default:
}
return
case ttfbTimeout <- <-ttfbTimer.C:
case t := <-ttfbTimer.C:
// buffered, no select done required
timeoutCh <- t
}

}(req.Context().Done())
}(ctx, ttfbTimeout)
},
GotFirstResponseByte: func() {
if downstreamTrace != nil && downstreamTrace.GotFirstResponseByte != nil {
Expand All @@ -396,7 +399,7 @@ func (b *Backend) withTimeout(req *http.Request, conf *Config) <-chan error {

*req = *req.WithContext(httptrace.WithClientTrace(ctx, ctxTrace))

go func(cancelFn func(), c context.Context, ec chan error) {
go func(c context.Context, cancelFn func(), ec chan error) {
defer cancelFn()
deadline := make(<-chan time.Time)
if timeout > 0 {
Expand All @@ -415,7 +418,7 @@ func (b *Backend) withTimeout(req *http.Request, conf *Config) <-chan error {
case <-c.Done():
return
}
}(cancel, ctx, errCh)
}(ctx, cancel, errCh)
return errCh
}

Expand Down
40 changes: 40 additions & 0 deletions logging/bytes_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package logging

import (
"context"
"io"
"net/http"
"sync/atomic"

"github.com/avenga/couper/config/request"
)

var _ io.ReadCloser = &BytesCountReader{}

type BytesCountReader struct {
c context.Context
n int64
r io.ReadCloser
}

// NewBytesCountReader just counts the raw read bytes from given response body for logging purposes.
func NewBytesCountReader(beresp *http.Response) io.ReadCloser {
return &BytesCountReader{
c: beresp.Request.Context(),
r: beresp.Body,
}
}

func (b *BytesCountReader) Read(p []byte) (n int, err error) {
n, err = b.r.Read(p)
b.n += int64(n)
return n, err
}

func (b *BytesCountReader) Close() error {
bytesPtr, ok := b.c.Value(request.BackendBytes).(*int64)
if ok {
atomic.StoreInt64(bytesPtr, b.n)
}
return b.r.Close()
}
14 changes: 14 additions & 0 deletions logging/hooks/context.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package hooks

import (
"sync/atomic"

"github.com/sirupsen/logrus"

"github.com/avenga/couper/config/request"
"github.com/avenga/couper/logging"
)

var _ logrus.Hook = &Context{}
Expand All @@ -21,5 +24,16 @@ func (c *Context) Fire(entry *logrus.Entry) error {
entry.Data["uid"] = uid
}
}

if field, ok := entry.Data["type"]; ok && field == beTypeField {
if bytes, i := entry.Context.Value(request.BackendBytes).(*int64); i {
response, r := entry.Data["response"].(logging.Fields)
b := atomic.LoadInt64(bytes)
if r && b > 0 {
response["bytes"] = b
}
}
}

return nil
}
19 changes: 17 additions & 2 deletions logging/hooks/custom_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,21 @@ func fireUpstream(entry *logrus.Entry) {
// syncedUpstreamContext prepares the local backend variable.
func syncedUpstreamContext(evalCtx *eval.Context, entry *logrus.Entry) *hcl.EvalContext {
ctx := evalCtx.HCLContextSync()

tr, _ := entry.Context.Value(request.TokenRequest).(string)
rtName, _ := entry.Context.Value(request.RoundTripName).(string)
isTr := tr != ""

if rtName == "" {
return ctx
}

if _, ok := ctx.Variables[eval.BackendRequests]; ok {
for k, v := range ctx.Variables[eval.BackendRequests].AsValueMap() {
if k == entry.Context.Value(request.RoundTripName) {
if isTr && k == eval.TokenRequestPrefix+rtName {
ctx.Variables[eval.BackendRequest] = v
break
} else if k == rtName {
ctx.Variables[eval.BackendRequest] = v
break
}
Expand All @@ -113,7 +125,10 @@ func syncedUpstreamContext(evalCtx *eval.Context, entry *logrus.Entry) *hcl.Eval

if _, ok := ctx.Variables[eval.BackendResponses]; ok {
for k, v := range ctx.Variables[eval.BackendResponses].AsValueMap() {
if k == entry.Context.Value(request.RoundTripName) {
if isTr && k == eval.TokenRequestPrefix+rtName {
ctx.Variables[eval.BackendResponse] = v
break
} else if k == rtName {
ctx.Variables[eval.BackendResponse] = v
break
}
Expand Down
55 changes: 55 additions & 0 deletions logging/stack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package logging

import (
"context"
"sync"

"github.com/sirupsen/logrus"
)

type entry struct {
logEntry *logrus.Entry
}

func (e *entry) Level(lvl logrus.Level) {
e.logEntry.Level = lvl
}

type Level interface {
Level(level logrus.Level)
}

type Stack struct {
entries []*entry
mu sync.Mutex
}

const logStack = "logStack"

func NewStack(ctx context.Context) (context.Context, *Stack) {
s := &Stack{}
return context.WithValue(ctx, logStack, s), s
}

func (s *Stack) Push(e *logrus.Entry) Level {
s.mu.Lock()
defer s.mu.Unlock()

item := &entry{logEntry: e}
s.entries = append(s.entries, item)
return item
}

func (s *Stack) Fire() {
s.mu.Lock()
defer s.mu.Unlock()

for _, item := range s.entries {
item.logEntry.Log(item.logEntry.Level)
}
}

func FromContext(ctx context.Context) (*Stack, bool) {
s, exist := ctx.Value(logStack).(*Stack)
return s, exist
}
30 changes: 25 additions & 5 deletions logging/upstream_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ func (u *UpstreamLog) RoundTrip(req *http.Request) (*http.Response, error) {

fields["request"] = requestFields

berespBytes := int64(0)
logCtxCh := make(chan hcl.Body, 10)
outctx := context.WithValue(req.Context(), request.LogCustomUpstream, logCtxCh)
outctx = context.WithValue(outctx, request.BackendBytes, &berespBytes)
oCtx, openAPIContext := validation.NewWithContext(outctx)
outreq := req.WithContext(httptrace.WithClientTrace(oCtx, clientTrace))

Expand Down Expand Up @@ -116,15 +118,20 @@ func (u *UpstreamLog) RoundTrip(req *http.Request) (*http.Response, error) {
if tr, ok := outreq.Context().Value(request.TokenRequest).(string); ok && tr != "" {
fields["token_request"] = tr

if retries, ok := outreq.Context().Value(request.TokenRequestRetries).(uint8); ok && retries > 0 {
if retries, exist := outreq.Context().Value(request.TokenRequestRetries).(uint8); exist && retries > 0 {
fields["token_request_retry"] = retries
}
}

fields["status"] = 0
if beresp != nil {
fields["status"] = beresp.StatusCode
cl := int64(0)
if beresp.ContentLength > 0 {
cl = beresp.ContentLength
}
responseFields := Fields{
"bytes": cl,
"headers": filterHeader(u.config.ResponseHeaders, beresp.Header),
"status": beresp.StatusCode,
}
Expand All @@ -146,16 +153,29 @@ func (u *UpstreamLog) RoundTrip(req *http.Request) (*http.Response, error) {
fields["timings"] = timingResults
//timings["ttlb"] = roundMS(rtDone.Sub(timeTTFB)) // TODO: depends on stream or buffer

entry := u.log.WithFields(logrus.Fields(fields)).WithContext(outreq.Context())
entry.Time = startTime
entry := u.log.
WithFields(logrus.Fields(fields)).
WithContext(outreq.Context()).
WithTime(startTime)

stack, stacked := FromContext(outreq.Context())

if err != nil {
if _, ok := err.(errors.GoError); !ok {
err = errors.Backend.With(err)
}
entry.WithError(err).Error()
entry = entry.WithError(err)
if stacked {
stack.Push(entry).Level(logrus.ErrorLevel)
} else {
entry.Error()
}
} else {
entry.Info()
if stacked {
stack.Push(entry).Level(logrus.InfoLevel)
} else {
entry.Info()
}
}

return beresp, err
Expand Down
Loading