Skip to content

Commit 3105f24

Browse files
committed
APM: wait to shutdown trace-agent components until all payloads are processed to avoid lost data and panics
1 parent 87af075 commit 3105f24

File tree

3 files changed

+63
-2
lines changed

3 files changed

+63
-2
lines changed

pkg/trace/agent/agent.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ type Agent struct {
128128
ctx context.Context
129129

130130
firstSpanMap sync.Map
131+
132+
processWg *sync.WaitGroup
131133
}
132134

133135
// SpanModifier is an interface that allows to modify spans while they are
@@ -168,6 +170,7 @@ func NewAgent(ctx context.Context, conf *config.AgentConfig, telemetryCollector
168170
DebugServer: api.NewDebugServer(conf),
169171
Statsd: statsd,
170172
Timing: timing,
173+
processWg: &sync.WaitGroup{},
171174
}
172175
agnt.SamplerMetrics.Add(agnt.PrioritySampler, agnt.ErrorsSampler, agnt.NoPrioritySampler, agnt.RareSampler)
173176
agnt.Receiver = api.NewHTTPReceiver(conf, dynConf, in, agnt, telemetryCollector, statsd, timing)
@@ -203,6 +206,7 @@ func (a *Agent) Run() {
203206
workers := max(runtime.GOMAXPROCS(0), 1)
204207

205208
log.Infof("Processing Pipeline configured with %d workers", workers)
209+
a.processWg.Add(workers)
206210
for i := 0; i < workers; i++ {
207211
go a.work()
208212
}
@@ -241,24 +245,29 @@ func (a *Agent) UpdateAPIKey(oldKey, newKey string) {
241245
}
242246

243247
func (a *Agent) work() {
248+
defer a.processWg.Done()
244249
for {
245250
p, ok := <-a.In
246251
if !ok {
247252
return
248253
}
249254
a.Process(p)
250255
}
251-
252256
}
253257

254258
func (a *Agent) loop() {
255259
<-a.ctx.Done()
256260
log.Info("Exiting...")
257261

258-
a.OTLPReceiver.Stop() // Stop OTLPReceiver before Receiver to avoid sending to closed channel
262+
// Stop the receiver first so we do not receive more payloads and start to shut down the workers
259263
if err := a.Receiver.Stop(); err != nil {
260264
log.Error(err)
261265
}
266+
267+
//Wait to process any leftover payloads in flight before closing components that might be needed
268+
a.processWg.Wait()
269+
270+
a.OTLPReceiver.Stop() // Stop OTLPReceiver before Receiver to avoid sending to closed channel
262271
for _, stopper := range []interface{ Stop() }{
263272
a.Concentrator,
264273
a.ClientStatsAggregator,
@@ -383,6 +392,7 @@ func (a *Agent) Process(p *api.Payload) {
383392
if a.SpanModifier != nil {
384393
a.SpanModifier.ModifySpan(chunk, span)
385394
}
395+
time.Sleep(100 * time.Millisecond)
386396
a.obfuscateSpan(span)
387397
a.Truncate(span)
388398
if p.ClientComputedTopLevel {

pkg/trace/agent/agent_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,46 @@ func TestFormatTrace(t *testing.T) {
129129
assert.Contains(result.Meta["sql.query"], "SELECT name FROM people WHERE age = ?")
130130
}
131131

132+
func TestStopWaits(t *testing.T) {
133+
cfg := config.New()
134+
cfg.Endpoints[0].APIKey = "test"
135+
cfg.Obfuscation.Cache.Enabled = true
136+
cfg.Obfuscation.Cache.MaxSize = 1_000
137+
ctx, cancel := context.WithCancel(context.Background())
138+
agnt := NewTestAgent(ctx, cfg, telemetry.NewNoopCollector())
139+
140+
wg := &sync.WaitGroup{}
141+
wg.Add(1)
142+
go func() {
143+
defer wg.Done()
144+
agnt.Run()
145+
}()
146+
147+
now := time.Now()
148+
span := &pb.Span{
149+
TraceID: 1,
150+
SpanID: 1,
151+
Resource: "SELECT name FROM people WHERE age = 42 AND extra = 55",
152+
Type: "sql",
153+
Start: now.Add(-time.Second).UnixNano(),
154+
Duration: (500 * time.Millisecond).Nanoseconds(),
155+
}
156+
157+
agnt.In <- &api.Payload{
158+
TracerPayload: testutil.TracerPayloadWithChunk(testutil.TraceChunkWithSpan(span)),
159+
Source: info.NewReceiverStats().GetTagStats(info.Tags{}),
160+
}
161+
162+
cancel()
163+
wg.Wait() // Wait for agent to completely exit
164+
165+
mtw := agnt.TraceWriter.(*mockTraceWriter)
166+
167+
assert := assert.New(t)
168+
assert.Len(mtw.payloads, 1)
169+
assert.Equal("SELECT name FROM people WHERE age = ? AND extra = ?", mtw.payloads[0].TracerPayload.Chunks[0].Spans[0].Meta["sql.query"])
170+
}
171+
132172
func TestProcess(t *testing.T) {
133173
t.Run("Replacer", func(t *testing.T) {
134174
// Ensures that for "sql" type spans:
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# Each section from every release note are combined when the
2+
# CHANGELOG.rst is rendered. So the text needs to be worded so that
3+
# it does not depend on any information only available in another
4+
# section. This may mean repeating some details, but each section
5+
# must be readable independently of the other.
6+
#
7+
# Each section note must be formatted as reStructuredText.
8+
---
9+
fixes:
10+
- |
11+
APM: Fix an issue where the trace-agent could panic during shutdown trying to obfuscate a sql payload.

0 commit comments

Comments
 (0)