Skip to content

Commit 1bc03c2

Browse files
committed
AMLII-2166 - Add UDS Streams support to the DogStatsD client
Includes full support for the unix://, unixstream://, and unixgram:// socket_path prefixes utilized by DD_DOGSTATSD_URL in preparation to support that feature. Autodetects SOCK_DGRAM vs SOCK_STREAM for users currently providing a raw socket path.
1 parent 362e187 commit 1bc03c2

File tree

3 files changed

+190
-30
lines changed

3 files changed

+190
-30
lines changed

datadog/dogstatsd/base.py

+77-7
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import os
1414
import socket
1515
import errno
16+
import struct
1617
import threading
1718
import time
1819
from threading import Lock, RLock
@@ -49,6 +50,11 @@
4950
DEFAULT_HOST = "localhost"
5051
DEFAULT_PORT = 8125
5152

53+
# Socket prefixes
54+
UNIX_ADDRESS_SCHEME = "unix://"
55+
UNIX_ADDRESS_DATAGRAM_SCHEME = "unixgram://"
56+
UNIX_ADDRESS_STREAM_SCHEME = "unixstream://"
57+
5258
# Buffering-related values (in seconds)
5359
DEFAULT_BUFFERING_FLUSH_INTERVAL = 0.3
5460
MIN_FLUSH_INTERVAL = 0.0001
@@ -489,6 +495,30 @@ def socket_path(self, path):
489495
self._transport = "uds"
490496
self._max_payload_size = self._max_buffer_len or UDS_OPTIMAL_PAYLOAD_LENGTH
491497

498+
@property
499+
def socket(self):
500+
return self._socket
501+
502+
@socket.setter
503+
def socket(self, new_socket):
504+
self._socket = new_socket
505+
if new_socket:
506+
self._socket_kind = new_socket.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
507+
else:
508+
self._socket_kind = None
509+
510+
@property
511+
def telemetry_socket(self):
512+
return self._telemetry_socket
513+
514+
@telemetry_socket.setter
515+
def telemetry_socket(self, t_socket):
516+
self._telemetry_socket = t_socket
517+
if t_socket:
518+
self._telemetry_socket_kind = t_socket.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
519+
else:
520+
self._telemetry_socket_kind = None
521+
492522
def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0):
493523
"""
494524
Use a background thread to communicate with the dogstatsd server.
@@ -731,11 +761,37 @@ def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE):
731761

732762
@classmethod
733763
def _get_uds_socket(cls, socket_path, timeout):
734-
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
735-
sock.settimeout(timeout)
736-
cls._ensure_min_send_buffer_size(sock)
737-
sock.connect(socket_path)
738-
return sock
764+
valid_socket_kinds = [socket.SOCK_DGRAM, socket.SOCK_STREAM]
765+
if socket_path.startswith(UNIX_ADDRESS_DATAGRAM_SCHEME):
766+
valid_socket_kinds = [socket.SOCK_DGRAM]
767+
socket_path = socket_path[len(UNIX_ADDRESS_DATAGRAM_SCHEME):]
768+
elif socket_path.startswith(UNIX_ADDRESS_STREAM_SCHEME):
769+
valid_socket_kinds = [socket.SOCK_STREAM]
770+
socket_path = socket_path[len(UNIX_ADDRESS_STREAM_SCHEME):]
771+
elif socket_path.startswith(UNIX_ADDRESS_SCHEME):
772+
socket_path = socket_path[len(UNIX_ADDRESS_SCHEME):]
773+
774+
last_error = ValueError("Invalid socket path")
775+
for socket_kind in valid_socket_kinds:
776+
# py2 stores socket kinds differently than py3, determine the name independently from version
777+
sk_name = {socket.SOCK_STREAM: "stream", socket.SOCK_DGRAM: "datagram"}[socket_kind]
778+
779+
try:
780+
sock = socket.socket(socket.AF_UNIX, socket_kind)
781+
sock.settimeout(timeout)
782+
cls._ensure_min_send_buffer_size(sock)
783+
sock.connect(socket_path)
784+
log.debug("Connected to socket %s with kind %s", socket_path, sk_name)
785+
return sock
786+
except Exception as e:
787+
if sock is not None:
788+
sock.close()
789+
log.debug("Failed to connect to %s with kind %s: %s", socket_path, sk_name, e)
790+
if e.errno == errno.EPROTOTYPE:
791+
last_error = e
792+
continue
793+
raise e
794+
raise last_error
739795

740796
@classmethod
741797
def _get_udp_socket(cls, host, port, timeout):
@@ -1216,14 +1272,22 @@ def _xmit_packet_with_telemetry(self, packet):
12161272
self.packets_dropped_writer += 1
12171273

12181274
def _xmit_packet(self, packet, is_telemetry):
1275+
socket_kind = None
12191276
try:
12201277
if is_telemetry and self._dedicated_telemetry_destination():
12211278
mysocket = self.telemetry_socket or self.get_socket(telemetry=True)
1279+
socket_kind = self._telemetry_socket_kind
12221280
else:
12231281
# If set, use socket directly
12241282
mysocket = self.socket or self.get_socket()
1283+
socket_kind = self._socket_kind
12251284

1226-
mysocket.send(packet.encode(self.encoding))
1285+
encoded_packet = packet.encode(self.encoding)
1286+
if socket_kind == socket.SOCK_STREAM:
1287+
mysocket.sendall(struct.pack('<I', len(encoded_packet)))
1288+
mysocket.sendall(encoded_packet)
1289+
else:
1290+
mysocket.send(encoded_packet)
12271291

12281292
if not is_telemetry and self._telemetry:
12291293
self.packets_sent += 1
@@ -1256,13 +1320,19 @@ def _xmit_packet(self, packet, is_telemetry):
12561320
)
12571321
self.close_socket()
12581322
except Exception as exc:
1259-
print("Unexpected error: %s", exc)
1323+
print("Unexpected error: ", exc)
12601324
log.error("Unexpected error: %s", str(exc))
12611325

12621326
if not is_telemetry and self._telemetry:
12631327
self.bytes_dropped_writer += len(packet)
12641328
self.packets_dropped_writer += 1
12651329

1330+
# if in stream mode we need to shut down the socket; we can't recover from a
1331+
# partial send
1332+
if socket_kind == socket.SOCK_STREAM:
1333+
log.debug("Confirming socket closure after error streaming")
1334+
self.close_socket()
1335+
12661336
return False
12671337

12681338
def _send_to_buffer(self, packet):

tests/integration/dogstatsd/test_statsd_sender.py

+47-4
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
1+
from contextlib import closing
12
import itertools
3+
import os
4+
import shutil
25
import socket
6+
import tempfile
37
from threading import Thread
8+
import uuid
49

510
import pytest
611

712
from datadog.dogstatsd.base import DogStatsd
813

914
@pytest.mark.parametrize(
10-
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop",
11-
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False])),
15+
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind",
16+
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False], [socket.SOCK_DGRAM, socket.SOCK_STREAM])),
1217
)
13-
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop):
18+
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind):
1419
# Test basic sender operation with an assortment of options
15-
foo, bar = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM, 0)
20+
foo, bar = socket.socketpair(socket.AF_UNIX, socket_kind, 0)
1621
statsd = DogStatsd(
1722
telemetry_min_flush_interval=0,
1823
disable_background_sender=disable_background_sender,
@@ -101,3 +106,41 @@ def test_buffering_with_context():
101106
bar.settimeout(5)
102107
msg = bar.recv(8192)
103108
assert msg == b"first:1|c\n"
109+
110+
@pytest.fixture()
111+
def socket_dir():
112+
tempdir = tempfile.mkdtemp()
113+
yield tempdir
114+
shutil.rmtree(tempdir)
115+
116+
@pytest.mark.parametrize(
117+
"socket_prefix, socket_kind, success",
118+
[
119+
("", socket.SOCK_DGRAM, True),
120+
("", socket.SOCK_STREAM, True),
121+
("unix://", socket.SOCK_DGRAM, True),
122+
("unix://", socket.SOCK_STREAM, True),
123+
("unixstream://", socket.SOCK_DGRAM, False),
124+
("unixstream://", socket.SOCK_STREAM, True),
125+
("unixgram://", socket.SOCK_DGRAM, True),
126+
("unixgram://", socket.SOCK_STREAM, False)
127+
]
128+
)
129+
def test_socket_connection(socket_dir, socket_prefix, socket_kind, success):
130+
socket_path = os.path.join(socket_dir, str(uuid.uuid1()) + ".sock")
131+
listener_socket = socket.socket(socket.AF_UNIX, socket_kind)
132+
listener_socket.bind(socket_path)
133+
134+
if socket_kind == socket.SOCK_STREAM:
135+
listener_socket.listen(1)
136+
137+
with closing(listener_socket):
138+
statsd = DogStatsd(
139+
socket_path = socket_prefix + socket_path
140+
)
141+
142+
if success:
143+
assert statsd.get_socket() is not None
144+
else:
145+
with pytest.raises(socket.error):
146+
statsd.get_socket()

tests/unit/dogstatsd/test_statsd.py

+66-19
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
# Standard libraries
1111
from collections import deque
1212
from contextlib import closing
13+
import struct
1314
from threading import Thread
1415
import errno
1516
import os
@@ -41,13 +42,17 @@ class FakeSocket(object):
4142

4243
FLUSH_GRACE_PERIOD = 0.2
4344

44-
def __init__(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL):
45+
def __init__(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL, socket_kind=socket.SOCK_DGRAM):
4546
self.payloads = deque()
4647

4748
self._flush_interval = flush_interval
4849
self._flush_wait = False
50+
self._socket_kind = socket_kind
4951
self.timeout = () # unit tuple = settimeout was not called
5052

53+
def sendall(self, payload):
54+
self.send(payload)
55+
5156
def send(self, payload):
5257
if is_p3k():
5358
assert isinstance(payload, bytes)
@@ -64,17 +69,29 @@ def recv(self, count=1, reset_wait=False, no_wait=False):
6469
time.sleep(self._flush_interval+self.FLUSH_GRACE_PERIOD)
6570
self._flush_wait = True
6671

67-
if count > len(self.payloads):
72+
payload_len = len(self.payloads)
73+
if self._socket_kind == socket.SOCK_STREAM:
74+
if payload_len % 2 != 0 or count > (payload_len / 2):
75+
return None
76+
elif count > len(self.payloads):
6877
return None
6978

7079
out = []
7180
for _ in range(count):
72-
out.append(self.payloads.popleft().decode('utf-8'))
81+
if self._socket_kind == socket.SOCK_DGRAM:
82+
out.append(self.payloads.popleft().decode('utf-8'))
83+
else:
84+
length = struct.unpack('<I', self.payloads.popleft())[0]
85+
pl = self.payloads.popleft()[:length].decode('utf-8')
86+
out.append(pl)
7387
return '\n'.join(out)
7488

7589
def close(self):
7690
pass
7791

92+
def getsockopt(self, *args):
93+
return self._socket_kind
94+
7895
def __repr__(self):
7996
return str(self.payloads)
8097

@@ -1061,47 +1078,71 @@ def test_batching(self):
10611078
telemetry=telemetry_metrics(metrics=2, bytes_sent=len(expected))
10621079
)
10631080

1064-
def test_flush(self):
1081+
def test_flush_dgram(self):
1082+
self._test_flush(socket.SOCK_DGRAM)
1083+
1084+
def test_flush_stream(self):
1085+
self._test_flush(socket.SOCK_STREAM)
1086+
1087+
def _test_flush(self, socket_kind):
10651088
dogstatsd = DogStatsd(disable_buffering=False, telemetry_min_flush_interval=0)
1066-
fake_socket = FakeSocket()
1089+
fake_socket = FakeSocket(socket_kind=socket_kind)
10671090
dogstatsd.socket = fake_socket
10681091

1069-
dogstatsd.increment('page.views')
1092+
dogstatsd.increment('page.®views®')
10701093
self.assertIsNone(fake_socket.recv(no_wait=True))
10711094
dogstatsd.flush()
1072-
self.assert_equal_telemetry('page.views:1|c\n', fake_socket.recv(2))
1095+
self.assert_equal_telemetry('page.®views®:1|c\n', fake_socket.recv(2))
1096+
1097+
def test_flush_interval_dgram(self):
1098+
self._test_flush_interval(socket.SOCK_DGRAM)
1099+
1100+
def test_flush_interval_stream(self):
1101+
self._test_flush_interval(socket.SOCK_STREAM)
10731102

1074-
def test_flush_interval(self):
1103+
def _test_flush_interval(self, socket_kind):
10751104
dogstatsd = DogStatsd(disable_buffering=False, flush_interval=1, telemetry_min_flush_interval=0)
1076-
fake_socket = FakeSocket()
1105+
fake_socket = FakeSocket(socket_kind=socket_kind)
10771106
dogstatsd.socket = fake_socket
10781107

1079-
dogstatsd.increment('page.views')
1108+
dogstatsd.increment('page.®views®')
10801109
self.assertIsNone(fake_socket.recv(no_wait=True))
10811110

10821111
time.sleep(0.3)
10831112
self.assertIsNone(fake_socket.recv(no_wait=True))
10841113

10851114
time.sleep(1)
10861115
self.assert_equal_telemetry(
1087-
'page.views:1|c\n',
1116+
'page.®views®:1|c\n',
10881117
fake_socket.recv(2, no_wait=True)
10891118
)
10901119

1091-
def test_aggregation_buffering_simultaneously(self):
1120+
def test_aggregation_buffering_simultaneously_dgram(self):
1121+
self._test_aggregation_buffering_simultaneously(socket.SOCK_DGRAM)
1122+
1123+
def test_aggregation_buffering_simultaneously_stream(self):
1124+
self._test_aggregation_buffering_simultaneously(socket.SOCK_STREAM)
1125+
1126+
def _test_aggregation_buffering_simultaneously(self, socket_kind):
10921127
dogstatsd = DogStatsd(disable_buffering=False, disable_aggregation=False, telemetry_min_flush_interval=0)
1093-
fake_socket = FakeSocket()
1128+
fake_socket = FakeSocket(socket_kind=socket_kind)
10941129
dogstatsd.socket = fake_socket
10951130
for _ in range(10):
1096-
dogstatsd.increment('test.aggregation_and_buffering')
1131+
dogstatsd.increment('test.ÀggregÀtion_and_buffering')
10971132
self.assertIsNone(fake_socket.recv(no_wait=True))
10981133
dogstatsd.flush_aggregated_metrics()
10991134
dogstatsd.flush()
1100-
self.assert_equal_telemetry('test.aggregation_and_buffering:10|c\n', fake_socket.recv(2))
1135+
self.assert_equal_telemetry('test.ÀggregÀtion_and_buffering:10|c\n', fake_socket.recv(2))
1136+
1137+
def test_aggregation_buffering_simultaneously_with_interval_dgram(self):
1138+
self._test_aggregation_buffering_simultaneously_with_interval(socket.SOCK_DGRAM)
11011139

1102-
def test_aggregation_buffering_simultaneously_with_interval(self):
1140+
def test_aggregation_buffering_simultaneously_with_interval_stream(self):
1141+
self._test_aggregation_buffering_simultaneously_with_interval(socket.SOCK_STREAM)
1142+
1143+
def _test_aggregation_buffering_simultaneously_with_interval(self, socket_kind):
11031144
dogstatsd = DogStatsd(disable_buffering=False, disable_aggregation=False, flush_interval=1, telemetry_min_flush_interval=0)
1104-
fake_socket = FakeSocket()
1145+
fake_socket = FakeSocket(socket_kind=socket_kind)
11051146
dogstatsd.socket = fake_socket
11061147
for _ in range(10):
11071148
dogstatsd.increment('test.aggregation_and_buffering_with_interval')
@@ -1185,12 +1226,18 @@ def test_batching_sequential(self):
11851226
)
11861227
)
11871228

1188-
def test_batching_runtime_changes(self):
1229+
def test_batching_runtime_changes_dgram(self):
1230+
self._test_batching_runtime_changes(socket.SOCK_DGRAM)
1231+
1232+
def test_batching_runtime_changes_stream(self):
1233+
self._test_batching_runtime_changes(socket.SOCK_STREAM)
1234+
1235+
def _test_batching_runtime_changes(self, socket_kind):
11891236
dogstatsd = DogStatsd(
11901237
disable_buffering=True,
11911238
telemetry_min_flush_interval=0
11921239
)
1193-
dogstatsd.socket = FakeSocket()
1240+
dogstatsd.socket = FakeSocket(socket_kind=socket_kind)
11941241

11951242
# Send some unbuffered metrics and verify we got it immediately
11961243
last_telemetry_size = self.send_and_assert(

0 commit comments

Comments
 (0)