Skip to content

feat(dsm): context support for sns #13605

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions ddtrace/internal/datastreams/botocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,28 @@ def get_datastreams_context(message):
- message.MessageAttributes._datadog.StringValue (SNS -> SQS)
- message.MessageAttributes._datadog.BinaryValue.decode() (SNS -> SQS, raw)
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
- message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)
- message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)
- message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)
"""
context_json = None
message_body = message
try:
body = message.get("Body")
if body:
message_body = json.loads(body)
except (ValueError, TypeError):
log.debug("Unable to parse message body as JSON, treat as non-json")

message_attributes = message_body.get("MessageAttributes") or message_body.get("messageAttributes")
if "Sns" in message:
message_body = message["Sns"]
else:
try:
body = message.get("Body") or message.get("body")
if body:
message_body = json.loads(body)
except (ValueError, TypeError):
log.debug("Unable to parse message body as JSON, treat as non-json")

message_attributes = (
message_body.get("MessageAttributes")
or message_body.get("messageAttributes")
or message.get("MessageAttributes")
or message.get("messageAttributes")
)
if not message_attributes:
log.debug("DataStreams skipped message: %r", message)
return None
Expand All @@ -170,11 +181,13 @@ def get_datastreams_context(message):
# The message originated from SQS
context_json = json.loads(datadog_attr["StringValue"])
elif "stringValue" in datadog_attr:
# The message originated from Lambda
context_json = json.loads(datadog_attr["stringValue"])
elif "BinaryValue" in datadog_attr:
# Raw message delivery
context_json = json.loads(datadog_attr["BinaryValue"].decode())
elif "binaryValue" in datadog_attr:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it actually valid to expect both this and line 185? Seems like they could be condensed into a single condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can confirm that binaryValue is needed for sns -> sqs -> lambda but am worried about getting rid of the previous condition of BinaryValue, there are too many configurations that could result in the other state being accurate, if possible I would appreciate keeping both conditions

# Raw message delivery to lambda
context_json = json.loads(base64.b64decode(datadog_attr["binaryValue"]).decode())
else:
log.debug("DataStreams did not handle message: %r", message)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
features:
- |
DSM: Add support for context extraction for SNS -> Lambda, SNS -> SQS -> Lambda.
219 changes: 213 additions & 6 deletions tests/datastreams/test_botocore.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,101 @@
import base64
import json

from ddtrace.internal.datastreams.botocore import get_datastreams_context


class TestGetDatastreamsContext:
def test_sqs_to_lambda_format_with_datadog_context(self):
"""Test SQS -> Lambda format with _datadog messageAttributes."""
def test_sqs_body_message_attributes_format(self):
"""Test format: message.Body.MessageAttributes._datadog.Value.decode() (SQS)"""
trace_context = {
"x-datadog-trace-id": "123456789",
"x-datadog-parent-id": "987654321",
"x-datadog-sampling-priority": "1",
"dd-pathway-ctx": "test-pathway-ctx",
}

binary_data = base64.b64encode(json.dumps(trace_context).encode("utf-8")).decode("utf-8")
message_body = {
"Type": "Notification",
"MessageAttributes": {
"_datadog": {
"Type": "Binary",
"Value": binary_data,
}
},
}

sqs_message = {
"MessageId": "sqs-message-id",
"Body": json.dumps(message_body),
"ReceiptHandle": "test-receipt-handle",
}

result = get_datastreams_context(sqs_message)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "123456789"
assert result["x-datadog-parent-id"] == "987654321"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_sns_to_sqs_string_value_format(self):
"""Test format: message.MessageAttributes._datadog.StringValue (SNS -> SQS)"""
trace_context = {
"x-datadog-trace-id": "555444333",
"x-datadog-parent-id": "111222333",
"dd-pathway-ctx": "test-pathway-ctx",
}

sqs_message = {
"MessageId": "12345678-1234-1234-1234-123456789012",
"ReceiptHandle": "AQEB...",
"Body": "Hello from SQS!",
"Attributes": {"SentTimestamp": "1673001234567"},
"MessageAttributes": {
"_datadog": {"StringValue": json.dumps(trace_context), "DataType": "String"},
"CustomAttribute": {"StringValue": "custom-value", "DataType": "String"},
},
}

result = get_datastreams_context(sqs_message)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "555444333"
assert result["x-datadog-parent-id"] == "111222333"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_sns_to_sqs_binary_value_format(self):
"""Test format: message.MessageAttributes._datadog.BinaryValue.decode() (SNS -> SQS, raw)"""
trace_context = {
"x-datadog-trace-id": "333222111",
"x-datadog-parent-id": "666555444",
"dd-pathway-ctx": "test-pathway-ctx",
}

sqs_message = {
"MessageId": "binary-message-id",
"ReceiptHandle": "binary-receipt-handle",
"Body": "Binary message content",
"MessageAttributes": {
"_datadog": {"BinaryValue": json.dumps(trace_context).encode("utf-8"), "DataType": "Binary"}
},
}

result = get_datastreams_context(sqs_message)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "333222111"
assert result["x-datadog-parent-id"] == "666555444"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_sqs_to_lambda_string_value_format(self):
"""Test format: message.messageAttributes._datadog.stringValue (SQS -> lambda)"""
trace_context = {
"x-datadog-trace-id": "789123456",
"x-datadog-parent-id": "321987654",
"dd-pathway-ctx": "test-pathway-ctx",
}

lambda_record = {
Expand Down Expand Up @@ -46,6 +132,127 @@ def test_sqs_to_lambda_format_with_datadog_context(self):

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "123456789"
assert result["x-datadog-parent-id"] == "987654321"
assert result["x-datadog-sampling-priority"] == "1"
assert result["x-datadog-trace-id"] == "789123456"
assert result["x-datadog-parent-id"] == "321987654"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_sns_to_lambda_format(self):
"""Test format: message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)"""
trace_context = {
"x-datadog-trace-id": "111111111",
"x-datadog-parent-id": "222222222",
"dd-pathway-ctx": "test-pathway-ctx",
}
binary_data = base64.b64encode(json.dumps(trace_context).encode("utf-8")).decode("utf-8")

sns_lambda_record = {
"EventSource": "aws:sns",
"EventSubscriptionArn": "arn:aws:sns:us-east-1:123456789012:sns-topic:12345678-1234-1234-1234-123456789012",
"Sns": {
"Type": "Notification",
"MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
"TopicArn": "arn:aws:sns:us-east-1:123456789012:sns-topic",
"Subject": "Test Subject",
"Message": "Hello from SNS!",
"Timestamp": "2023-01-01T12:00:00.000Z",
"MessageAttributes": {"_datadog": {"Type": "Binary", "Value": binary_data}},
},
}

result = get_datastreams_context(sns_lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "111111111"
assert result["x-datadog-parent-id"] == "222222222"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_sns_to_sqs_to_lambda_binary_value_format(self):
"""Test format: message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)"""
trace_context = {
"x-datadog-trace-id": "777666555",
"x-datadog-parent-id": "444333222",
"dd-pathway-ctx": "test-pathway-ctx",
}
binary_data = base64.b64encode(json.dumps(trace_context).encode("utf-8")).decode("utf-8")

lambda_record = {
"messageId": "test-message-id",
"receiptHandle": "test-receipt-handle",
"body": "Test message body",
"messageAttributes": {"_datadog": {"binaryValue": binary_data, "dataType": "Binary"}},
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:test-queue",
}

result = get_datastreams_context(lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "777666555"
assert result["x-datadog-parent-id"] == "444333222"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_sns_to_sqs_to_lambda_body_format(self):
"""Test format: message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)"""
trace_context = {
"x-datadog-trace-id": "123987456",
"x-datadog-parent-id": "654321987",
"x-datadog-sampling-priority": "1",
"dd-pathway-ctx": "test-pathway-ctx",
}

message_body = {
"Type": "Notification",
"MessageId": "test-message-id",
"Message": "Test message from SNS",
"MessageAttributes": {
"_datadog": {
"Type": "Binary",
"Value": base64.b64encode(json.dumps(trace_context).encode("utf-8")).decode("utf-8"),
}
},
}

lambda_record = {
"messageId": "lambda-message-id",
"body": json.dumps(message_body),
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:sns-to-sqs-queue",
}

result = get_datastreams_context(lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "123987456"
assert result["x-datadog-parent-id"] == "654321987"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

# Edge case tests
def test_no_message_attributes(self):
"""Test message without MessageAttributes returns None."""
message = {"messageId": "test-message-id", "body": "Test message without attributes"}

result = get_datastreams_context(message)

assert result is None

def test_no_datadog_attribute(self):
"""Test message with MessageAttributes but no _datadog attribute returns None."""
message = {
"messageId": "test-message-id",
"body": "Test message",
"messageAttributes": {"customAttribute": {"stringValue": "custom-value", "dataType": "String"}},
}

result = get_datastreams_context(message)
assert result is None

def test_empty_datadog_attribute(self):
"""Test message with empty _datadog attribute returns None."""
message = {"messageId": "test-message-id", "messageAttributes": {"_datadog": {}}}

result = get_datastreams_context(message)

assert result is None
Loading