From 656fde59704e030e874162cb33288aa92f61b49a Mon Sep 17 00:00:00 2001 From: Inquisitor Date: Sun, 16 Nov 2025 12:09:56 +0200 Subject: [PATCH 1/2] Fixed check streams after restart. Stream will not try to create new the same stream --- taskiq_nats/broker.py | 64 ++++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/taskiq_nats/broker.py b/taskiq_nats/broker.py index db22c61..0e2d033 100644 --- a/taskiq_nats/broker.py +++ b/taskiq_nats/broker.py @@ -7,16 +7,15 @@ from nats.errors import TimeoutError as NatsTimeoutError from nats.js import JetStreamContext from nats.js.api import ConsumerConfig, StreamConfig +from nats.js.errors import NotFoundError from taskiq import AckableMessage, AsyncBroker, AsyncResultBackend, BrokerMessage _T = typing.TypeVar("_T") # (Too short) - JetStreamConsumerType = typing.TypeVar( "JetStreamConsumerType", ) - logger = getLogger("taskiq_nats") @@ -36,13 +35,13 @@ class NatsBroker(AsyncBroker): """ def __init__( - self, - servers: typing.Union[str, typing.List[str]], - subject: str = "taskiq_tasks", - queue: typing.Optional[str] = None, - result_backend: "typing.Optional[AsyncResultBackend[_T]]" = None, - task_id_generator: typing.Optional[typing.Callable[[], str]] = None, - **connection_kwargs: typing.Any, + self, + servers: typing.Union[str, typing.List[str]], + subject: str = "taskiq_tasks", + queue: typing.Optional[str] = None, + result_backend: "typing.Optional[AsyncResultBackend[_T]]" = None, + task_id_generator: typing.Optional[typing.Callable[[], str]] = None, + **connection_kwargs: typing.Any, ) -> None: super().__init__(result_backend, task_id_generator) self.servers = servers @@ -106,17 +105,17 @@ class BaseJetStreamBroker( """ def __init__( - self, - servers: typing.Union[str, typing.List[str]], - subject: str = "taskiq_tasks", - stream_name: str = "taskiq_jetstream", - queue: typing.Optional[str] = None, - durable: str = "taskiq_durable", - stream_config: typing.Optional[StreamConfig] = None, - consumer_config: typing.Optional[ConsumerConfig] = None, - pull_consume_batch: int = 1, - pull_consume_timeout: typing.Optional[float] = None, - **connection_kwargs: typing.Any, + self, + servers: typing.Union[str, typing.List[str]], + subject: str = "taskiq_tasks", + stream_name: str = "taskiq_jetstream", + queue: typing.Optional[str] = None, + durable: str = "taskiq_durable", + stream_config: typing.Optional[StreamConfig] = None, + consumer_config: typing.Optional[ConsumerConfig] = None, + pull_consume_batch: int = 1, + pull_consume_timeout: typing.Optional[float] = None, + **connection_kwargs: typing.Any, ) -> None: super().__init__() self.servers: typing.Final = servers @@ -138,6 +137,23 @@ def __init__( self.consumer: JetStreamConsumerType + async def _ensure_stream_exists(self) -> None: + """Ensure stream exists, create if it doesn't.""" + if self.stream_config.name is None: + self.stream_config.name = self.stream_name + if not self.stream_config.subjects: + self.stream_config.subjects = [self.subject] + + try: + # Check if stream already exists + await self.js.stream_info(self.stream_config.name) + logger.debug("Stream %s already exists", self.stream_config.name) + except NotFoundError: + logger.debug("stream %s does not exist", self.stream_config.name) + # Stream doesn't exist, create it + await self.js.add_stream(config=self.stream_config) + logger.info("Created stream %s", self.stream_config.name) + async def startup(self) -> None: """ Startup event handler. @@ -148,11 +164,9 @@ async def startup(self) -> None: await super().startup() await self.client.connect(self.servers, **self.connection_kwargs) self.js = self.client.jetstream() - if self.stream_config.name is None: - self.stream_config.name = self.stream_name - if not self.stream_config.subjects: - self.stream_config.subjects = [self.subject] - await self.js.add_stream(config=self.stream_config) + + # Ensure stream exists (won't recreate if it exists) + await self._ensure_stream_exists() await self._startup_consumer() async def shutdown(self) -> None: From 3d54388222359bbb258b48560b5e2d41c890f2aa Mon Sep 17 00:00:00 2001 From: Ruslan-Droid Date: Mon, 17 Nov 2025 19:49:20 +0200 Subject: [PATCH 2/2] Removed extra spaces from the code Removed extra spaces from the code --- taskiq_nats/broker.py | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/taskiq_nats/broker.py b/taskiq_nats/broker.py index 0e2d033..879d444 100644 --- a/taskiq_nats/broker.py +++ b/taskiq_nats/broker.py @@ -35,13 +35,13 @@ class NatsBroker(AsyncBroker): """ def __init__( - self, - servers: typing.Union[str, typing.List[str]], - subject: str = "taskiq_tasks", - queue: typing.Optional[str] = None, - result_backend: "typing.Optional[AsyncResultBackend[_T]]" = None, - task_id_generator: typing.Optional[typing.Callable[[], str]] = None, - **connection_kwargs: typing.Any, + self, + servers: typing.Union[str, typing.List[str]], + subject: str = "taskiq_tasks", + queue: typing.Optional[str] = None, + result_backend: "typing.Optional[AsyncResultBackend[_T]]" = None, + task_id_generator: typing.Optional[typing.Callable[[], str]] = None, + **connection_kwargs: typing.Any, ) -> None: super().__init__(result_backend, task_id_generator) self.servers = servers @@ -105,17 +105,17 @@ class BaseJetStreamBroker( """ def __init__( - self, - servers: typing.Union[str, typing.List[str]], - subject: str = "taskiq_tasks", - stream_name: str = "taskiq_jetstream", - queue: typing.Optional[str] = None, - durable: str = "taskiq_durable", - stream_config: typing.Optional[StreamConfig] = None, - consumer_config: typing.Optional[ConsumerConfig] = None, - pull_consume_batch: int = 1, - pull_consume_timeout: typing.Optional[float] = None, - **connection_kwargs: typing.Any, + self, + servers: typing.Union[str, typing.List[str]], + subject: str = "taskiq_tasks", + stream_name: str = "taskiq_jetstream", + queue: typing.Optional[str] = None, + durable: str = "taskiq_durable", + stream_config: typing.Optional[StreamConfig] = None, + consumer_config: typing.Optional[ConsumerConfig] = None, + pull_consume_batch: int = 1, + pull_consume_timeout: typing.Optional[float] = None, + **connection_kwargs: typing.Any, ) -> None: super().__init__() self.servers: typing.Final = servers