Skip to content
This repository was archived by the owner on Feb 21, 2023. It is now read-only.

Adding stream commands #299

Merged
merged 35 commits into from
Jun 18, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
75db0e7
Adding XADD/XRANGE/XREAD stream commands
adamcharnock Oct 7, 2017
25b7f62
Fixing flake errors. Mostly long lines.
adamcharnock Oct 7, 2017
04e903e
Work on CI detection of streams availability
adamcharnock Oct 7, 2017
5073fd0
Fixing test error caused by undefined ordering of fields
adamcharnock Oct 7, 2017
b69d376
Removing stray print statement
adamcharnock Oct 8, 2017
aa0acfa
Removing stray print statement
adamcharnock Oct 18, 2017
dbf4fcd
Making using of .extend() rather than list += [], as per @barrachri's…
adamcharnock Nov 20, 2017
c4312e0
Merge branch 'master' into streams
adamcharnock Dec 4, 2017
3bce1d9
Merge branch 'streams' of github.com:adamcharnock/aioredis into streams
adamcharnock Dec 4, 2017
bec4410
Streams PR: Fixing use of asyncio in tests for recent changes in aior…
adamcharnock Dec 4, 2017
56ccd66
Adding MAXLEN support to XADD command
adamcharnock Dec 4, 2017
187a911
Steams PR: Adding XREVRANGE command
adamcharnock Dec 4, 2017
d9778b0
Streams: Updating travis config to match that on master. Building for…
adamcharnock Dec 19, 2017
71ddb99
Merge branch 'master' into streams
adamcharnock Dec 19, 2017
47dc71b
Flake fixes
adamcharnock Dec 19, 2017
51943be
Adding message_id parameter to XADD, plus test
adamcharnock Jan 14, 2018
f3c2e9c
Merge branch 'master' into streams
adamcharnock Mar 8, 2018
bbf707a
Streams: Moving default value for xadd's message_id parameter onto me…
adamcharnock Mar 8, 2018
2a30d2f
Initial untested implementation of consumer group commands (tests com…
adamcharnock Apr 8, 2018
7c0bcaf
Work on tests from streams consumer group commands
adamcharnock Apr 8, 2018
9d9efb3
Adding tests for remaining consumer group commands
adamcharnock Apr 8, 2018
dbaa814
Merge branch 'master' into streams
adamcharnock Apr 8, 2018
c526cc5
Fixing flake errors
adamcharnock Apr 8, 2018
1ff2619
Testing use of xgroup_create() when group already exists
adamcharnock Apr 17, 2018
aabbed0
Adding test for xpending() using start/stop/count args
adamcharnock Apr 17, 2018
c383085
Fixing flake error
adamcharnock Jun 14, 2018
98f6a5e
Fixing flake error (again)
adamcharnock Jun 17, 2018
275f25d
Removing unnecessary python 2 compatability
adamcharnock Jun 18, 2018
b5cd6fa
Refactoring parse_lists_to_dicts() at @popravich's request
adamcharnock Jun 18, 2018
29e9089
Fixing error in xpending sanity check
adamcharnock Jun 18, 2018
6437743
Replacing blocking sleep with asyncio sleep
adamcharnock Jun 18, 2018
7ca8704
Adding docstrings
adamcharnock Jun 18, 2018
28eea0b
Enabling testing of xgroup_setid, xgroup_destroy, xinfo_consumers (as…
adamcharnock Jun 18, 2018
ddc1856
Fixing error in fields_to_dict() refactoring (type_ was not used)
adamcharnock Jun 18, 2018
1707f94
Fixing flake errors
adamcharnock Jun 18, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ addons:

env:
global:
- REDIS_TAGS="2.6.17 2.8.22 3.0.7 3.2.8 4.0-rc2" INSTALL_DIR=$HOME/redis PYPY_RELEASE="pypy-c-jit-91658-97ca3ac43c30-linux64"
- REDIS_TAGS="2.6.17 2.8.22 3.0.7 3.2.8 4.0-rc2 streams" INSTALL_DIR=$HOME/redis PYPY_RELEASE="pypy-c-jit-91658-97ca3ac43c30-linux64"

python:
- "3.3"
Expand Down
3 changes: 2 additions & 1 deletion aioredis/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .pubsub import PubSubCommandsMixin
from .cluster import ClusterCommandsMixin
from .geo import GeoCommandsMixin, GeoPoint, GeoMember
from .streams import StreamCommandsMixin

__all__ = [
'create_redis',
Expand All @@ -35,7 +36,7 @@ class Redis(GenericCommandsMixin, StringCommandsMixin,
SortedSetCommandsMixin, ListCommandsMixin,
ScriptingCommandsMixin, ServerCommandsMixin,
PubSubCommandsMixin, ClusterCommandsMixin,
GeoCommandsMixin):
GeoCommandsMixin, StreamCommandsMixin):
"""High-level Redis interface.

Gathers in one place Redis commands implemented in mixins.
Expand Down
120 changes: 120 additions & 0 deletions aioredis/commands/streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import sys
from collections import OrderedDict

from aioredis.util import wait_convert

PY_VER = sys.version_info

if PY_VER < (3, 0):
from itertools import izip as zip
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines don't make sense. We don't support Python 2.x.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, not sure what I was thinking here



def fields_to_dict(fields):
"""Convert a flat list of key/values into an OrderedDict"""
fields_iterator = iter(fields)
return OrderedDict(zip(fields_iterator, fields_iterator))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type_ argument is not used

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Urgh, thanks. Fixed.



def parse_messages(messages):
""" Parse messages as returned by Redis into something useful

Messages returned by XRANGE arrive in the form:

[
[message_id, [key1, value1, key2, value2, ...]],
...
]

Here we parse this into:

[
[message_id, OrderedDict(
(key1, value1),
(key2, value2),
...
)],
...
]

"""
if messages is None:
return []
return [(mid, fields_to_dict(values)) for mid, values in messages]


def parse_messages_by_stream(messages_by_stream):
""" Parse messages returned by stream

Messages returned by XREAD arrive in the form:
[stream_name,
[
[message_id, [key1, value1, key2, value2, ...]],
...
],
...
]

Here we parse this into (with the help of the above parse_messages()
function):

[
[stream_name, message_id, OrderedDict(
(key1, value1),
(key2, value2),.
...
)],
...
]

"""
if messages_by_stream is None:
return []

parsed = []
for stream, messages in messages_by_stream:
for message_id, fields in parse_messages(messages):
parsed.append((stream, message_id, fields))
return parsed


class StreamCommandsMixin:
"""Stream commands mixin

Streams are under development in Redis and
not currently released.
"""

def xadd(self, stream, fields=None):
""" Add a message to the specified stream
"""
# TODO: Add the MAXLEN parameter
flattened = []
for k, v in fields.items():
flattened += [k, v]
Copy link

@barrachri barrachri Nov 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flattened.extend?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes. Changed.

return self.execute(b'XADD', stream, '*', *flattened)

def xrange(self, stream, start='-', stop='+', count=None):
"""Retrieve stream data"""
if count is not None:
extra = ['COUNT', count]
else:
extra = []
fut = self.execute(b'XRANGE', stream, start, stop, *extra)
return wait_convert(fut, parse_messages)

def xread(self, streams, timeout=0, count=None, latest_ids=None):
"""Perform a blocking read on the given stream"""
# QUESTION: Should we combine streams & starting_ids
# into a single parameter?
if latest_ids is None:
latest_ids = ['$'] * len(streams)
if len(streams) != len(latest_ids):
raise ValueError(
'The streams and latest_ids parameters must be of the '
'same length'
)

count_args = [b'COUNT', count] if count else []
args = count_args + [b'STREAMS'] + streams + latest_ids
fut = self.execute(b'XREAD', b'BLOCK', timeout, *args)
return wait_convert(fut, parse_messages_by_stream)
184 changes: 184 additions & 0 deletions tests/stream_commands_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
from collections import OrderedDict

import os

import pytest
import asyncio


def skip_if_streams_not_present(server_bin):
if os.environ.get('STREAMS_AVAILABLE'):
return
if '/streams/' in server_bin:
return

pytest.skip(
"Streams testing is disabled as streams are not yet available "
"in Redis 4.0. Set STREAMS_AVAILABLE=1 in your environment "
"if you have compiled the Redis 'streams' branch. You will "
"probably need specify the --redis-server=path/to/redis-server "
" to py.test."
)


@asyncio.coroutine
def add_message_with_sleep(redis, loop, stream, fields):
yield from asyncio.sleep(0.2, loop=loop)
result = yield from redis.xadd(stream, fields)
return result


@pytest.mark.run_loop
def test_xadd(redis, server_bin):
skip_if_streams_not_present(server_bin)

fields = OrderedDict((
(b'field1', b'value1'),
(b'field2', b'value2'),
))
message_id = yield from redis.xadd('test_stream', fields)

# Check the result is in the expected format (i.e: 1507400517949-0)
assert b'-' in message_id
timestamp, sequence = message_id.split(b'-')
assert timestamp.isdigit()
assert sequence.isdigit()

# Read it back
messages = yield from redis.xrange('test_stream')
assert len(messages) == 1
message = messages[0]
assert message[0] == message_id
assert message[1] == OrderedDict([
(b'field1', b'value1'),
(b'field2', b'value2')]
)


@pytest.mark.run_loop
def test_xrange(redis, server_bin):
skip_if_streams_not_present(server_bin)

stream = 'test_stream'
fields = OrderedDict((
(b'field1', b'value1'),
(b'field2', b'value2'),
))
message_id1 = yield from redis.xadd(stream, fields)
message_id2 = yield from redis.xadd(stream, fields)
message_id3 = yield from redis.xadd(stream, fields) # noqa

# Test no parameters
messages = yield from redis.xrange(stream)
assert len(messages) == 3
message = messages[0]
assert message[0] == message_id1
assert message[1] == OrderedDict([
(b'field1', b'value1'),
(b'field2', b'value2')]
)

# Test start
messages = yield from redis.xrange(stream, start=message_id2)
assert len(messages) == 2

messages = yield from redis.xrange(stream, start='9900000000000-0')
assert len(messages) == 0

# Test stop
messages = yield from redis.xrange(stream, stop='0000000000000-0')
assert len(messages) == 0

messages = yield from redis.xrange(stream, stop=message_id2)
assert len(messages) == 2

messages = yield from redis.xrange(stream, stop='9900000000000-0')
assert len(messages) == 3

# Test start & stop
messages = yield from redis.xrange(stream,
start=message_id1,
stop=message_id2)
assert len(messages) == 2

messages = yield from redis.xrange(stream,
start='0000000000000-0',
stop='9900000000000-0')
assert len(messages) == 3

# Test count
messages = yield from redis.xrange(stream, count=2)
assert len(messages) == 2


@pytest.mark.run_loop
def test_xread_selection(redis, server_bin):
"""Test use of counts and starting IDs"""
skip_if_streams_not_present(server_bin)

stream = 'test_stream'
fields = OrderedDict((
(b'field1', b'value1'),
(b'field2', b'value2'),
))
message_id1 = yield from redis.xadd(stream, fields)
message_id2 = yield from redis.xadd(stream, fields) # noqa
message_id3 = yield from redis.xadd(stream, fields)

messages = yield from redis.xread([stream],
timeout=1,
latest_ids=['0000000000000-0'])
assert len(messages) == 3

messages = yield from redis.xread([stream],
timeout=1,
latest_ids=[message_id1])
assert len(messages) == 2

messages = yield from redis.xread([stream],
timeout=1,
latest_ids=[message_id3])
assert len(messages) == 0

messages = yield from redis.xread([stream],
timeout=1,
latest_ids=['0000000000000-0'], count=2)
assert len(messages) == 2


@pytest.mark.run_loop
def test_xread_blocking(redis, create_redis, loop, server, server_bin):
"""Test the blocking read features"""
skip_if_streams_not_present(server_bin)

fields = OrderedDict((
(b'field1', b'value1'),
(b'field2', b'value2'),
))
other_redis = yield from create_redis(
server.tcp_address, loop=loop)

# create blocking task in separate connection
consumer = other_redis.xread(['test_stream'], timeout=1000)

producer_task = asyncio.Task(
add_message_with_sleep(redis, loop, 'test_stream', fields), loop=loop)
results = yield from asyncio.gather(
consumer, producer_task, loop=loop)

received_messages, sent_message_id = results
assert len(received_messages) == 1
assert sent_message_id

received_stream, received_message_id, received_fields \
= received_messages[0]

assert received_stream == b'test_stream'
assert sent_message_id == received_message_id
assert fields == received_fields

# Test that we get nothing back from an empty stream
results = yield from redis.xread(['another_stream'], timeout=100)
assert results == []

other_redis.close()