1
1
import threading
2
2
import kombu
3
3
from unittest import mock
4
- from urllib .parse import quote
5
4
6
5
import pytest
7
6
from celery import Celery , VERSION
8
7
from celery .bin import worker
8
+ from celery .app .task import Task
9
+ from opentelemetry import trace as otel_trace , context
9
10
10
11
import sentry_sdk
11
12
from sentry_sdk import get_current_span
@@ -221,11 +222,13 @@ def dummy_task(x, y):
221
222
assert execution_event ["contexts" ]["trace" ]["status" ] == "ok"
222
223
223
224
assert len (execution_event ["spans" ]) == 1
224
- assert execution_event ["spans" ][0 ] == ApproxDict ({
225
- "trace_id" : str (root_span .trace_id ),
226
- "op" : "queue.process" ,
227
- "description" : "dummy_task" ,
228
- })
225
+ assert execution_event ["spans" ][0 ] == ApproxDict (
226
+ {
227
+ "trace_id" : str (root_span .trace_id ),
228
+ "op" : "queue.process" ,
229
+ "description" : "dummy_task" ,
230
+ }
231
+ )
229
232
assert submission_event ["spans" ] == [
230
233
{
231
234
"data" : ApproxDict (),
@@ -240,7 +243,7 @@ def dummy_task(x, y):
240
243
"status" : "ok" ,
241
244
"tags" : {
242
245
"status" : "ok" ,
243
- }
246
+ },
244
247
}
245
248
]
246
249
@@ -537,6 +540,20 @@ def test_sentry_propagate_traces_override(init_celery):
537
540
propagate_traces = True , traces_sample_rate = 1.0 , release = "abcdef"
538
541
)
539
542
543
+ # Since we're applying the task inline eagerly,
544
+ # we need to cleanup the otel context for this test.
545
+ # and since we patch build_tracer, we need to do this before that runs...
546
+ # TODO: the right way is to not test this inline
547
+ original_apply = Task .apply
548
+
549
+ def cleaned_apply (* args , ** kwargs ):
550
+ token = context .attach (otel_trace .set_span_in_context (otel_trace .INVALID_SPAN ))
551
+ rv = original_apply (* args , ** kwargs )
552
+ context .detach (token )
553
+ return rv
554
+
555
+ Task .apply = cleaned_apply
556
+
540
557
@celery .task (name = "dummy_task" , bind = True )
541
558
def dummy_task (self , message ):
542
559
trace_id = get_current_span ().trace_id
@@ -558,6 +575,8 @@ def dummy_task(self, message):
558
575
).get ()
559
576
assert root_span_trace_id != task_trace_id , "Trace should NOT be propagated"
560
577
578
+ Task .apply = original_apply
579
+
561
580
562
581
def test_apply_async_manually_span (sentry_init ):
563
582
sentry_init (
0 commit comments