diff --git a/aredis/commands/streams.py b/aredis/commands/streams.py index 2ba45c00..d35133d0 100644 --- a/aredis/commands/streams.py +++ b/aredis/commands/streams.py @@ -50,7 +50,7 @@ class StreamsCommandMixin: ) async def xadd(self, name: str, entry: dict, - max_len=None, stream_id='*', + max_len=None, min_id=None, stream_id="*", approximate=True) -> str: """ Appends the specified stream entry to the stream at the specified key. @@ -62,9 +62,12 @@ async def xadd(self, name: str, entry: dict, :param name: name of the stream :param entry: key-values to be appended to the stream :param max_len: max length of the stream - length will not be limited max_len is set to None - notice: max_len should be int greater than 0, - if set to 0 or negative, the stream length will not be limited + length will not be limited if max_len is set to None + notice: max_len should be an int greater than 0, + + :param min_id: minimum id of the stream + if set, evicts entries with IDs lower than the one specified + max_len & min_id are mutually exclusive :param stream_id: id of the options appended to the stream. The XADD command will auto-generate a unique id for you @@ -80,17 +83,28 @@ async def xadd(self, name: str, entry: dict, notice: specified id without "-" character will be completed like "id-0" """ pieces = [] + + if max_len is not None and min_id is not None: + raise RedisError("XADD max_len & min_id are mutually exclusive") if max_len is not None: if not isinstance(max_len, int) or max_len < 1: raise RedisError("XADD maxlen must be a positive integer") - pieces.append('MAXLEN') + pieces.append("MAXLEN") if approximate: - pieces.append('~') + pieces.append("~") pieces.append(str(max_len)) + if min_id is not None: + if not isinstance(min_id, str): + raise RedisError("XADD min_id must be a string") + pieces.append("MINID") + if approximate: + pieces.append("~") + pieces.append(min_id) + pieces.append(stream_id) for kv in entry.items(): pieces.extend(list(kv)) - return await self.execute_command('XADD', name, *pieces) + return await self.execute_command("XADD", name, *pieces) async def xlen(self, name: str) -> int: """