Skip to content

Commit 6a51dc0

Browse files
committed
AMLII-2166 - Add UDS Streams support to the DogStatsD client
Includes full support for the unix://, unixstream://, and unixgram:// socket_path prefixes utilized by DD_DOGSTATSD_URL in preparation to support that feature. Autodetects SOCK_DGRAM vs SOCK_STREAM for users currently providing a raw socket path.
1 parent 010d523 commit 6a51dc0

File tree

4 files changed

+245
-33
lines changed

4 files changed

+245
-33
lines changed

datadog/dogstatsd/aggregator.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ def flush_aggregated_sampled_metrics(self):
5555
return metrics
5656

5757
def get_context(self, name, tags):
58-
tags_str = ",".join(tags) if tags is not None else ""
59-
return "{}:{}".format(name, tags_str)
58+
tags_str = u",".join(tags) if tags is not None else ""
59+
return u"{}:{}".format(name, tags_str)
6060

6161
def count(self, name, value, tags, rate, timestamp=0):
6262
return self.add_metric(

datadog/dogstatsd/base.py

+83-7
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import os
1414
import socket
1515
import errno
16+
import struct
1617
import threading
1718
import time
1819
from threading import Lock, RLock
@@ -49,6 +50,11 @@
4950
DEFAULT_HOST = "localhost"
5051
DEFAULT_PORT = 8125
5152

53+
# Socket prefixes
54+
UNIX_ADDRESS_SCHEME = "unix://"
55+
UNIX_ADDRESS_DATAGRAM_SCHEME = "unixgram://"
56+
UNIX_ADDRESS_STREAM_SCHEME = "unixstream://"
57+
5258
# Buffering-related values (in seconds)
5359
DEFAULT_BUFFERING_FLUSH_INTERVAL = 0.3
5460
MIN_FLUSH_INTERVAL = 0.0001
@@ -495,6 +501,36 @@ def socket_path(self, path):
495501
self._transport = "uds"
496502
self._max_payload_size = self._max_buffer_len or UDS_OPTIMAL_PAYLOAD_LENGTH
497503

504+
@property
505+
def socket(self):
506+
return self._socket
507+
508+
@socket.setter
509+
def socket(self, new_socket):
510+
self._socket = new_socket
511+
if new_socket:
512+
try:
513+
self._socket_kind = new_socket.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
514+
return
515+
except AttributeError: # _socket can't have a type if it doesn't have sockopts
516+
log.info("Unexpected socket provided with no support for getsockopt")
517+
self._socket_kind = None
518+
519+
@property
520+
def telemetry_socket(self):
521+
return self._telemetry_socket
522+
523+
@telemetry_socket.setter
524+
def telemetry_socket(self, t_socket):
525+
self._telemetry_socket = t_socket
526+
if t_socket:
527+
try:
528+
self._telemetry_socket_kind = t_socket.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
529+
return
530+
except AttributeError: # _telemetry_socket can't have a kind if it doesn't have sockopts
531+
log.info("Unexpected telemetry socket provided with no support for getsockopt")
532+
self._telemetry_socket_kind = None
533+
498534
def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0):
499535
"""
500536
Use a background thread to communicate with the dogstatsd server.
@@ -738,11 +774,37 @@ def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE):
738774

739775
@classmethod
740776
def _get_uds_socket(cls, socket_path, timeout):
741-
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
742-
sock.settimeout(timeout)
743-
cls._ensure_min_send_buffer_size(sock)
744-
sock.connect(socket_path)
745-
return sock
777+
valid_socket_kinds = [socket.SOCK_DGRAM, socket.SOCK_STREAM]
778+
if socket_path.startswith(UNIX_ADDRESS_DATAGRAM_SCHEME):
779+
valid_socket_kinds = [socket.SOCK_DGRAM]
780+
socket_path = socket_path[len(UNIX_ADDRESS_DATAGRAM_SCHEME):]
781+
elif socket_path.startswith(UNIX_ADDRESS_STREAM_SCHEME):
782+
valid_socket_kinds = [socket.SOCK_STREAM]
783+
socket_path = socket_path[len(UNIX_ADDRESS_STREAM_SCHEME):]
784+
elif socket_path.startswith(UNIX_ADDRESS_SCHEME):
785+
socket_path = socket_path[len(UNIX_ADDRESS_SCHEME):]
786+
787+
last_error = ValueError("Invalid socket path")
788+
for socket_kind in valid_socket_kinds:
789+
# py2 stores socket kinds differently than py3, determine the name independently from version
790+
sk_name = {socket.SOCK_STREAM: "stream", socket.SOCK_DGRAM: "datagram"}[socket_kind]
791+
792+
try:
793+
sock = socket.socket(socket.AF_UNIX, socket_kind)
794+
sock.settimeout(timeout)
795+
cls._ensure_min_send_buffer_size(sock)
796+
sock.connect(socket_path)
797+
log.debug("Connected to socket %s with kind %s", socket_path, sk_name)
798+
return sock
799+
except Exception as e:
800+
if sock is not None:
801+
sock.close()
802+
log.debug("Failed to connect to %s with kind %s: %s", socket_path, sk_name, e)
803+
if e.errno == errno.EPROTOTYPE:
804+
last_error = e
805+
continue
806+
raise e
807+
raise last_error
746808

747809
@classmethod
748810
def _get_udp_socket(cls, host, port, timeout):
@@ -1243,14 +1305,22 @@ def _xmit_packet_with_telemetry(self, packet):
12431305
self.packets_dropped_writer += 1
12441306

12451307
def _xmit_packet(self, packet, is_telemetry):
1308+
socket_kind = None
12461309
try:
12471310
if is_telemetry and self._dedicated_telemetry_destination():
12481311
mysocket = self.telemetry_socket or self.get_socket(telemetry=True)
1312+
socket_kind = self._telemetry_socket_kind
12491313
else:
12501314
# If set, use socket directly
12511315
mysocket = self.socket or self.get_socket()
1316+
socket_kind = self._socket_kind
12521317

1253-
mysocket.send(packet.encode(self.encoding))
1318+
encoded_packet = packet.encode(self.encoding)
1319+
if socket_kind == socket.SOCK_STREAM:
1320+
mysocket.sendall(struct.pack('<I', len(encoded_packet)))
1321+
mysocket.sendall(encoded_packet)
1322+
else:
1323+
mysocket.send(encoded_packet)
12541324

12551325
if not is_telemetry and self._telemetry:
12561326
self.packets_sent += 1
@@ -1283,13 +1353,19 @@ def _xmit_packet(self, packet, is_telemetry):
12831353
)
12841354
self.close_socket()
12851355
except Exception as exc:
1286-
print("Unexpected error: %s", exc)
1356+
print("Unexpected error: ", exc)
12871357
log.error("Unexpected error: %s", str(exc))
12881358

12891359
if not is_telemetry and self._telemetry:
12901360
self.bytes_dropped_writer += len(packet)
12911361
self.packets_dropped_writer += 1
12921362

1363+
# if in stream mode we need to shut down the socket; we can't recover from a
1364+
# partial send
1365+
if socket_kind == socket.SOCK_STREAM:
1366+
log.debug("Confirming socket closure after error streaming")
1367+
self.close_socket()
1368+
12931369
return False
12941370

12951371
def _send_to_buffer(self, packet):

tests/integration/dogstatsd/test_statsd_sender.py

+72-5
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
1+
from contextlib import closing
12
import itertools
3+
import os
4+
import shutil
25
import socket
6+
import struct
7+
import tempfile
38
from threading import Thread
9+
import uuid
410

511
import pytest
612

713
from datadog.dogstatsd.base import DogStatsd
814

915
@pytest.mark.parametrize(
10-
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop",
11-
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False])),
16+
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind",
17+
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False], [socket.SOCK_DGRAM, socket.SOCK_STREAM])),
1218
)
13-
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop):
19+
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind):
1420
# Test basic sender operation with an assortment of options
15-
foo, bar = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM, 0)
21+
foo, bar = socket.socketpair(socket.AF_UNIX, socket_kind, 0)
1622
statsd = DogStatsd(
1723
telemetry_min_flush_interval=0,
1824
disable_background_sender=disable_background_sender,
@@ -24,7 +30,11 @@ def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pend
2430
statsd._reset_telemetry()
2531

2632
def reader_thread():
27-
msg = bar.recv(8192)
33+
if socket_kind == socket.SOCK_DGRAM:
34+
msg = bar.recv(8192)
35+
else:
36+
size = struct.unpack("<I", bar.recv(4))[0]
37+
msg = bar.recv(size)
2838
assert msg == b"test.metric:1|c\n"
2939

3040
t = Thread(target=reader_thread, name="test_sender_mode/reader_thread")
@@ -49,6 +59,25 @@ def test_set_socket_timeout():
4959
statsd.close_socket()
5060
assert statsd.get_socket().gettimeout() == 1
5161

62+
def test_stream_cleanup():
63+
foo, _ = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM, 0)
64+
65+
foo.settimeout(0)
66+
statsd = DogStatsd(disable_buffering=True)
67+
statsd.socket = foo
68+
statsd.increment("test", 1)
69+
statsd.increment("test", 1)
70+
statsd.increment("test", 1)
71+
assert statsd.socket is not None
72+
73+
foo.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1) # different os's have different mins, e.g. this sets the buffer size to 2304 on certain linux variants
74+
75+
with pytest.raises(socket.error):
76+
foo.sendall(os.urandom(5000)) # pre-emptively clog the buffer
77+
78+
statsd.increment("test", 1)
79+
80+
assert statsd.socket is None
5281

5382
@pytest.mark.parametrize(
5483
"disable_background_sender, disable_buffering",
@@ -101,3 +130,41 @@ def test_buffering_with_context():
101130
bar.settimeout(5)
102131
msg = bar.recv(8192)
103132
assert msg == b"first:1|c\n"
133+
134+
@pytest.fixture()
135+
def socket_dir():
136+
tempdir = tempfile.mkdtemp()
137+
yield tempdir
138+
shutil.rmtree(tempdir)
139+
140+
@pytest.mark.parametrize(
141+
"socket_prefix, socket_kind, success",
142+
[
143+
("", socket.SOCK_DGRAM, True),
144+
("", socket.SOCK_STREAM, True),
145+
("unix://", socket.SOCK_DGRAM, True),
146+
("unix://", socket.SOCK_STREAM, True),
147+
("unixstream://", socket.SOCK_DGRAM, False),
148+
("unixstream://", socket.SOCK_STREAM, True),
149+
("unixgram://", socket.SOCK_DGRAM, True),
150+
("unixgram://", socket.SOCK_STREAM, False)
151+
]
152+
)
153+
def test_socket_connection(socket_dir, socket_prefix, socket_kind, success):
154+
socket_path = os.path.join(socket_dir, str(uuid.uuid1()) + ".sock")
155+
listener_socket = socket.socket(socket.AF_UNIX, socket_kind)
156+
listener_socket.bind(socket_path)
157+
158+
if socket_kind == socket.SOCK_STREAM:
159+
listener_socket.listen(1)
160+
161+
with closing(listener_socket):
162+
statsd = DogStatsd(
163+
socket_path = socket_prefix + socket_path
164+
)
165+
166+
if success:
167+
assert statsd.get_socket() is not None
168+
else:
169+
with pytest.raises(socket.error):
170+
statsd.get_socket()

0 commit comments

Comments
 (0)