Skip to content
This repository was archived by the owner on Feb 21, 2023. It is now read-only.

Adding stream commands #299

Merged
merged 35 commits into from
Jun 18, 2018
Merged

Conversation

adamcharnock
Copy link
Collaborator

@adamcharnock adamcharnock commented Oct 7, 2017

@antirez is in the process of adding streams to Redis. This PR adds initial support for the XADD/XRANGE/XREAD commands in aioredis.

Tests

I've written tests and added the streams Redis branch to the travis config. In theory, the tests will only be run when the streams branch is available. The logic is a bit hacky, but I figured it will only be until streams makes it into 4.0 (now improved as the hacky version didn't actually work).

Style

I'm very open to feedback on my implementation style for the new commands. I've tried to stick to the aioredis style, but I think I will have probably wandered here or there. Happy to make any corrections or alterations.

Reason

This is my first time using aioredis. I'm working on another project in which I would like to experiment with Redis streams, plus I also need asyncio support. I therefore thought I'd contribute this here.

As I said, happy to make any changes etc.

Remaining work

  • Add support for the MAXLEN parameter in xadd()
  • There is a question regarding style in the source for xread()

@codecov
Copy link

codecov bot commented Oct 7, 2017

Codecov Report

Merging #299 into master will decrease coverage by 0.14%.
The diff coverage is 97.35%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #299      +/-   ##
==========================================
- Coverage   96.85%   96.71%   -0.15%     
==========================================
  Files          54       56       +2     
  Lines        7447     7825     +378     
  Branches      525      546      +21     
==========================================
+ Hits         7213     7568     +355     
- Misses        171      185      +14     
- Partials       63       72       +9
Impacted Files Coverage Δ
aioredis/commands/__init__.py 95.18% <100%> (+0.05%) ⬆️
tests/stream_commands_test.py 100% <100%> (ø)
aioredis/commands/streams.py 90.19% <90.19%> (ø)
aioredis/log.py 45.45% <0%> (-54.55%) ⬇️
aioredis/sentinel/pool.py 77.06% <0%> (-3.76%) ⬇️
tests/conftest.py 89.07% <0%> (+0.47%) ⬆️
tests/sentinel_failover_test.py 87.96% <0%> (+0.75%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f6e0658...1707f94. Read the comment docs.

@adamcharnock
Copy link
Collaborator Author

It appears to me that the test_pool_idle_close test is being a little flakey on Windows. AFAIK this is not my fault... I hope.

@popravich popravich self-requested a review October 9, 2017 07:00
@popravich popravich mentioned this pull request Nov 20, 2017
# TODO: Add the MAXLEN parameter
flattened = []
for k, v in fields.items():
flattened += [k, v]
Copy link

@barrachri barrachri Nov 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flattened.extend?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes. Changed.

@barrachri
Copy link

Great @adamcharnock! Do you know where I can find the implementation details related to Redis?

Did you use this: https://github.com/redis/redis-rcp/blob/master/RCP11.md?

@adamcharnock
Copy link
Collaborator Author

Hi @barrachri,

I think that is a different kind of stream, one more to do with how the redis protocol encodes data on the wire.

I used the following:

http://antirez.com/news/114

The branch can be found here:

https://github.com/antirez/redis/tree/streams

@barrachri
Copy link

Thanks.

Is using OrderedDict really important?

I was trying to think about a use case where the order of the key=value might be important after you used fields_to_dict.

@adamcharnock
Copy link
Collaborator Author

I used an OrderedDict simply because the ordering of the key/value pairs is explicitly respected and maintained by Redis Streams. I therefore wanted to respect this in the implementation. I expect someone will find a use for it, even if I'm not sure what it would be.

Perhaps one reads a stream record then writes it back to another stream. The new record has a different field order. As Redis guarantees order, other clients could legitimately decide that this copied record was different to the original, despite there being no intention to modify it during the copying process.

As a secondary reason, I've occasionally seen hard-to-diagnose bugs crop you due to the non-deterministic ordering of dictionaries. This normally occurs when the developer assumes a regular dict has a particular ordering, which then sporadically changes under different environments and python versions. Given that the key/values do in fact have ordering in this case, it seemed like an easy way to avoid this issue.

Also, I suspect ignoring the ordering provided & documented by Redis would probably cause developers to walk right into this kind of bug.

@barrachri
Copy link

Clarified, thanks!

@barrachri
Copy link

Streams has been merged inside https://github.com/antirez/redis/tree/unstable.

@asvetlov
Copy link
Contributor

asvetlov commented Dec 4, 2017

Should we wait for redis stable with streaming feature?

@adamcharnock
Copy link
Collaborator Author

@barrachri That is great to hear.

@asvetlov Not sure if that was directed at me – but I have no view on the matter.

I'll see about getting MAXLEN implemented soon.

@asvetlov
Copy link
Contributor

asvetlov commented Dec 4, 2017

My question is addressed mostly to @popravich

@adamcharnock
Copy link
Collaborator Author

I've just added support for MAXLEN as well as the new XREVRANGE command.

I based my implementation of that of ZRANGE and ZREVRANGE, in which case it seemed the implementation was just copied. I therefore did the same and likewise duplicated the relevant test.

@adamcharnock
Copy link
Collaborator Author

Update: Redis 5 RC3 is now out. What is the view on merging at this time?

@adamcharnock
Copy link
Collaborator Author

Ok, travis is happy now. The code coverage diff seems to show lack of coverage on sentinel as the sticking point, but as I haven't touched that I'm going to assuming is not my fault.

PY_VER = sys.version_info

if PY_VER < (3, 0):
from itertools import izip as zip
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines don't make sense. We don't support Python 2.x.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, not sure what I was thinking here


def parse_lists_to_dicts(lists):
""" Convert [[a, 1, b, 2], ...] into [{a:1, b: 2}, ...]"""
def _list_to_dict(list_):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is almost the same as fields_to_dict except return type,
It is better to make a single function and pass OrderedDict/dict` types as argument.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will do

fut = self.execute(b'XREAD', *args)
return wait_convert(fut, parse_messages_by_stream)

def xread_group(self, group_name, consumer_name, streams, timeout=0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a docstring

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...and to 12 more methods below

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done now

consumer=None):
# Returns: total pel messages, min id, max id, count
ssc = [start, stop, count]
if any(ssc) and not all(ssc):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check may trigger false-positive exceptions in case when start=0.
Please add test for this case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Will add test and fix

"unstable branch")
async def test_xadd_maxlen_exact(redis, server_bin):
message_id1 = await redis.xadd('test_stream', {'f1': 'v1'}) # noqa
sleep(0.001) # Ensure the millisecond-based message ID increments
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need a thread-blocking sleep?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha, I certainly don't :-)

@popravich popravich self-requested a review June 18, 2018 11:19
@popravich popravich self-assigned this Jun 18, 2018
@adamcharnock
Copy link
Collaborator Author

Thank you for the comments @popravich, I'll take a look at making the changes now/shortly.

@adamcharnock
Copy link
Collaborator Author

I've just pushed changes for everything bar the doc strings, which I'll take a look at now.

def fields_to_dict(fields, type_=OrderedDict):
"""Convert a flat list of key/values into an OrderedDict"""
fields_iterator = iter(fields)
return OrderedDict(zip(fields_iterator, fields_iterator))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type_ argument is not used

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Urgh, thanks. Fixed.

@adamcharnock
Copy link
Collaborator Author

AFAIK I've now responded to all your change requests. Do let me know if there is anything else though 👍

@popravich
Copy link
Contributor

Lets wait for CI tasks to complete.

@popravich popravich merged commit 050e4dd into aio-libs-abandoned:master Jun 18, 2018
@popravich
Copy link
Contributor

Merged! Thanks

@adamcharnock
Copy link
Collaborator Author

That's great to hear, thank you very much for taking the time to review this. I'll be happy to revisit this if this API changes before Redis 5.0 stable.

Once this sees a release I'll update lightbus to point to the particular aioredis version.

@barrachri barrachri mentioned this pull request Aug 11, 2018
asgoel pushed a commit to ramjet-labs/aioredis that referenced this pull request Aug 31, 2018
* Adding XADD/XRANGE/XREAD stream commands

* Fixing flake errors. Mostly long lines.

* Work on CI detection of streams availability

* Fixing test error caused by undefined ordering of fields

* Removing stray print statement

* Removing stray print statement

* Making using of .extend() rather than list += [], as per @barrachri's suggestion.

* Streams PR: Fixing use of asyncio in tests for recent changes in aioredis project

* Adding MAXLEN support to XADD command

* Steams PR: Adding XREVRANGE command

* Streams: Updating travis config to match that on master. Building for unstable branch. Improving stream test skipping logic.

* Flake fixes

* Adding message_id parameter to XADD, plus test

* Streams: Moving default value for xadd's message_id parameter onto method signature (as per @hoh's suggestion)

* Initial untested implementation of consumer group commands (tests coming)

* Work on tests from streams consumer group commands

* Adding tests for remaining consumer group commands

* Fixing flake errors

* Testing use of xgroup_create() when group already exists

* Adding test for xpending() using start/stop/count args

* Fixing flake error

* Fixing flake error (again)

* Removing unnecessary python 2 compatability

* Refactoring parse_lists_to_dicts() at @popravich's request

* Fixing error in xpending sanity check

* Replacing blocking sleep with asyncio sleep

* Adding docstrings

* Enabling testing of xgroup_setid, xgroup_destroy, xinfo_consumers (as these are now present in redis)

* Fixing error in fields_to_dict() refactoring (type_ was not used)

* Fixing flake errors
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants