Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions aredis/commands/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class StreamsCommandMixin:
)

async def xadd(self, name: str, entry: dict,
max_len=None, stream_id='*',
max_len=None, min_id=None, stream_id="*",
approximate=True) -> str:
"""
Appends the specified stream entry to the stream at the specified key.
Expand All @@ -62,9 +62,12 @@ async def xadd(self, name: str, entry: dict,
:param name: name of the stream
:param entry: key-values to be appended to the stream
:param max_len: max length of the stream
length will not be limited max_len is set to None
notice: max_len should be int greater than 0,
if set to 0 or negative, the stream length will not be limited
length will not be limited if max_len is set to None
notice: max_len should be an int greater than 0,

:param min_id: minimum id of the stream
if set, evicts entries with IDs lower than the one specified
max_len & min_id are mutually exclusive

:param stream_id: id of the options appended to the stream.
The XADD command will auto-generate a unique id for you
Expand All @@ -80,17 +83,28 @@ async def xadd(self, name: str, entry: dict,
notice: specified id without "-" character will be completed like "id-0"
"""
pieces = []

if max_len is not None and min_id is not None:
raise RedisError("XADD max_len & min_id are mutually exclusive")
if max_len is not None:
if not isinstance(max_len, int) or max_len < 1:
raise RedisError("XADD maxlen must be a positive integer")
pieces.append('MAXLEN')
pieces.append("MAXLEN")
if approximate:
pieces.append('~')
pieces.append("~")
pieces.append(str(max_len))
if min_id is not None:
if not isinstance(min_id, str):
raise RedisError("XADD min_id must be a string")
pieces.append("MINID")
if approximate:
pieces.append("~")
pieces.append(min_id)

pieces.append(stream_id)
for kv in entry.items():
pieces.extend(list(kv))
return await self.execute_command('XADD', name, *pieces)
return await self.execute_command("XADD", name, *pieces)

async def xlen(self, name: str) -> int:
"""
Expand Down