Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
66bde0f
refactor: update sending notifications for optimization & fault-toler…
niqzart Nov 11, 2025
14d36c9
refactor: class-based senders for easier task generation
niqzart Nov 11, 2025
5b1a46b
feat: sending telegram notifications in notifications_sub
niqzart Nov 11, 2025
33d3819
Merge [4126] Telegram Notifications
niqzart Nov 16, 2025
becf263
refactor: upgrade pochta contract to allow different payloads & multi…
niqzart Nov 9, 2025
36f52ee
feat: support notifications in email messages & pochta
niqzart Nov 9, 2025
5e2a2df
feat: email connections
niqzart Nov 11, 2025
26b32a9
feat: sending email notifications
niqzart Nov 11, 2025
16aabc2
Merge [4125] Email Notifications
niqzart Nov 16, 2025
1408ab1
feat: basic datalake events
niqzart Nov 14, 2025
f832f47
fix: finally add tests to for initial socketio connection
niqzart Nov 14, 2025
bbfaff8
feat: recording socketio connection datalake events
niqzart Nov 14, 2025
7c4a022
feat: add created_at to users for metrics & upgrade user tests
niqzart Nov 15, 2025
139bf83
Merge [4127] User Visit Datalake Events
niqzart Nov 16, 2025
e18b7c9
fix: starting conferences in empty classrooms should not send notific…
niqzart Nov 17, 2025
995a169
feat: install, setup & configure sentry w/ logging & users
niqzart Nov 17, 2025
057ff36
feat: better error handling & reporting for external http errors
niqzart Nov 22, 2025
63a2677
fix: cleanup breadcrumbs from spammy redis logs (initial xgroup creat…
niqzart Nov 22, 2025
e153a1b
feat: sentry integration for faststream
niqzart Nov 22, 2025
e496936
fix: cleanup breadcrumbs from telegram initial logs
niqzart Nov 22, 2025
85aa13b
Merge [4128] Sentry Integration
niqzart Dec 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ extend-ignore =
# # weird
PIE803 C101 FNE007 FNE008 N812 ANN101 ANN102 WPS110 WPS111 WPS114 WPS338 WPS407 WPS414 WPS440 VNE001 VNE002 CM001
# too many
WPS200 WPS201 WPS202 WPS203 WPS204 WPS210 WPS211 WPS213 WPS214 WPS217 WPS218 WPS221 WPS224 WPS230 WPS231 WPS234 WPS235 WPS238
WPS200 WPS201 WPS202 WPS203 WPS204 WPS210 WPS211 WPS212 WPS213 WPS214 WPS217 WPS218 WPS221 WPS224 WPS230 WPS231 WPS234 WPS235 WPS238
# "vague" imports
WPS347

Expand Down
62 changes: 62 additions & 0 deletions alembic/versions/052_email_connections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""email_connections

Revision ID: 052
Revises: 051
Create Date: 2025-11-11 20:51:09.142765

"""

from typing import Sequence, Union

import sqlalchemy as sa

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "052"
down_revision: Union[str, None] = "051"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"email_connections",
sa.Column("user_id", sa.Integer(), nullable=False),
sa.Column("email", sa.String(length=100), nullable=False),
sa.PrimaryKeyConstraint("user_id", name=op.f("pk_email_connections")),
schema="xi_back_2",
)
op.create_index(
op.f("ix_xi_back_2_email_connections_email"),
"email_connections",
["email"],
unique=False,
schema="xi_back_2",
)
# ### end Alembic commands ###

connection = op.get_bind()
metadata = sa.MetaData(schema="xi_back_2")

User = sa.Table("users", metadata, autoload_with=connection)
EmailConnection = sa.Table("email_connections", metadata, autoload_with=connection)

connection.execute(
sa.insert(EmailConnection).from_select(
["user_id", "email"],
sa.select(User.c.id, User.c.email),
)
)


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
op.f("ix_xi_back_2_email_connections_email"),
table_name="email_connections",
schema="xi_back_2",
)
op.drop_table("email_connections", schema="xi_back_2")
# ### end Alembic commands ###
51 changes: 51 additions & 0 deletions alembic/versions/053_basic_datalake_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""basic_datalake_events

Revision ID: 053
Revises: 052
Create Date: 2025-11-14 22:55:32.135968

"""

from typing import Sequence, Union

import sqlalchemy as sa

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "053"
down_revision: Union[str, None] = "052"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"datalake_events",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("kind", sa.String(length=100), nullable=False),
sa.Column("user_id", sa.Integer(), nullable=False),
sa.Column("recorded_at", sa.DateTime(timezone=True), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("pk_datalake_events")),
schema="xi_back_2",
)
op.create_index(
op.f("ix_xi_back_2_datalake_events_recorded_at"),
"datalake_events",
["recorded_at"],
unique=False,
schema="xi_back_2",
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(
op.f("ix_xi_back_2_datalake_events_recorded_at"),
table_name="datalake_events",
schema="xi_back_2",
)
op.drop_table("datalake_events", schema="xi_back_2")
# ### end Alembic commands ###
50 changes: 50 additions & 0 deletions alembic/versions/054_users_created_at.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""users_created_at

Revision ID: 054
Revises: 053
Create Date: 2025-11-15 09:15:32.247534

"""

from typing import Sequence, Union

import sqlalchemy as sa

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "054"
down_revision: Union[str, None] = "053"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.add_column(
"users",
sa.Column("created_at", sa.DateTime(timezone=True), nullable=True),
schema="xi_back_2",
)

connection = op.get_bind()
metadata = sa.MetaData(schema="xi_back_2")

User = sa.Table("users", metadata, autoload_with=connection)

connection.execute(
sa.update(User).values(created_at=User.c.password_last_changed_at)
# Good approximation for most users in production, although not perfect
)

op.alter_column(
"users",
column_name="created_at",
nullable=False,
schema="xi_back_2",
)


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("users", "created_at", schema="xi_back_2")
# ### end Alembic commands ###
6 changes: 4 additions & 2 deletions app/classrooms/routes/classrooms_tutor_rst.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
UserClassroomStatus,
)
from app.classrooms.models.tutorships_db import Tutorship
from app.common.bridges.autocomplete_bdg import SubjectNotFoundException
from app.common.config_bdg import autocomplete_bridge
from app.common.dependencies.authorization_dep import AuthorizationData
from app.common.fastapi_ext import APIRouterExt, Responses
Expand Down Expand Up @@ -58,8 +59,9 @@ async def validate_subject(
if new_subject_id == old_subject_id:
return

subject = await autocomplete_bridge.retrieve_subject(subject_id=new_subject_id)
if subject is None:
try:
await autocomplete_bridge.retrieve_subject(subject_id=new_subject_id)
except SubjectNotFoundException: # noqa: WPS329 # false-positive / broken rule
raise SubjectResponses.SUBJECT_NOT_FOUND


Expand Down
2 changes: 1 addition & 1 deletion app/common/aiogram_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def maybe_initialize_from_config(
) -> None:
if settings.is_testing_mode or bot_settings is None:
if settings.production_mode:
logging.warning(f"Configuration for {bot_name} is missing")
logging.error(f"Configuration for {bot_name} is missing")
return

await self.initialize(
Expand Down
14 changes: 10 additions & 4 deletions app/common/bridges/autocomplete_bdg.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
from httpx import Response
from pydantic import TypeAdapter
from starlette import status

from app.common.bridges.base_bdg import BaseBridge
from app.common.bridges.utils import validate_external_json_response
from app.common.config import settings
from app.common.schemas.autocomplete_sch import SubjectSchema


class SubjectNotFoundException(Exception):
pass


class AutocompleteBridge(BaseBridge):
def __init__(self) -> None:
super().__init__(
base_url=f"{settings.bridge_base_url}/internal/autocomplete-service",
headers={"X-Api-Key": settings.api_key},
)

async def retrieve_subject(self, subject_id: int) -> SubjectSchema | None:
@validate_external_json_response(TypeAdapter(SubjectSchema))
async def retrieve_subject(self, subject_id: int) -> Response:
response = await self.client.get(f"/subjects/{subject_id}/")
if (
response.status_code == status.HTTP_404_NOT_FOUND
and response.json()["detail"] == "Subject not found"
):
return None
response.raise_for_status()
return TypeAdapter(SubjectSchema).validate_python(response.json())
raise SubjectNotFoundException
return response
4 changes: 2 additions & 2 deletions app/common/bridges/classrooms_bdg.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pydantic import TypeAdapter

from app.common.bridges.base_bdg import BaseBridge
from app.common.bridges.utils import validate_json_response
from app.common.bridges.utils import validate_external_json_response
from app.common.config import settings


Expand All @@ -13,7 +13,7 @@ def __init__(self) -> None:
headers={"X-Api-Key": settings.api_key},
)

@validate_json_response(TypeAdapter(list[int]))
@validate_external_json_response(TypeAdapter(list[int]))
async def list_classroom_student_ids(self, classroom_id: int) -> Response:
return await self.client.get(
f"/classrooms/{classroom_id}/students/",
Expand Down
17 changes: 17 additions & 0 deletions app/common/bridges/datalake_bdg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from app.common.bridges.base_bdg import BaseBridge
from app.common.config import settings
from app.common.schemas.datalake_sch import DatalakeEventInputSchema


class DatalakeBridge(BaseBridge):
def __init__(self) -> None:
super().__init__(
base_url=f"{settings.bridge_base_url}/internal/datalake-service",
headers={"X-Api-Key": settings.api_key},
)

async def record_datalake_event(self, data: DatalakeEventInputSchema) -> None:
await self.broker.publish(
message=data.model_dump(mode="json"),
stream=settings.datalake_events_record_stream_name,
)
10 changes: 5 additions & 5 deletions app/common/bridges/messenger_bdg.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pydantic import BaseModel, TypeAdapter

from app.common.bridges.base_bdg import BaseBridge
from app.common.bridges.utils import validate_json_response
from app.common.bridges.utils import validate_external_json_response
from app.common.config import settings
from app.common.schemas.messenger_sch import ChatAccessKind

Expand All @@ -20,7 +20,7 @@ def __init__(self) -> None:
headers={"X-Api-Key": settings.api_key},
)

@validate_json_response(TypeAdapter(ChatMetaSchema))
@validate_external_json_response(TypeAdapter(ChatMetaSchema))
async def create_chat(
self, access_kind: ChatAccessKind, related_id: int | str
) -> Response:
Expand All @@ -34,6 +34,6 @@ async def create_chat(
response.raise_for_status()
return response

async def delete_chat(self, chat_id: int) -> None:
response = await self.client.delete(f"/chats/{chat_id}/")
response.raise_for_status()
@validate_external_json_response()
async def delete_chat(self, chat_id: int) -> Response:
return await self.client.delete(f"/chats/{chat_id}/")
19 changes: 16 additions & 3 deletions app/common/bridges/notifications_bdg.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pydantic import TypeAdapter

from app.common.bridges.base_bdg import BaseBridge
from app.common.bridges.utils import validate_json_response
from app.common.bridges.utils import validate_external_json_response
from app.common.config import settings
from app.common.schemas.notifications_sch import NotificationInputSchema
from app.common.schemas.user_contacts_sch import UserContactSchema
Expand All @@ -15,15 +15,28 @@ def __init__(self) -> None:
headers={"X-Api-Key": settings.api_key},
)

@validate_json_response(TypeAdapter(list[UserContactSchema]))
@validate_external_json_response(TypeAdapter(list[UserContactSchema]))
async def list_user_contacts(
self, user_id: int, public_only: bool = False
self,
user_id: int,
public_only: bool = False,
) -> Response:
return await self.client.get(
f"/users/{user_id}/contacts/",
params={"public_only": public_only},
)

@validate_external_json_response()
async def create_or_update_email_connection(
self,
user_id: int,
email: str,
) -> Response:
return await self.client.put(
f"/users/{user_id}/email-connection/",
json={"email": email},
)

async def send_notification(self, data: NotificationInputSchema) -> None:
await self.broker.publish(
message=data.model_dump(mode="json"),
Expand Down
15 changes: 9 additions & 6 deletions app/common/bridges/posts_bdg.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from httpx import Response

from app.common.bridges.base_bdg import BaseBridge
from app.common.bridges.utils import validate_external_json_response
from app.common.config import settings


Expand All @@ -9,13 +12,13 @@ def __init__(self) -> None:
headers={"X-Api-Key": settings.api_key},
)

async def create_post_channel(self, channel_id: int, community_id: int) -> None:
response = await self.client.post(
@validate_external_json_response()
async def create_post_channel(self, channel_id: int, community_id: int) -> Response:
return await self.client.post(
f"/post-channels/{channel_id}/",
json={"community_id": community_id},
)
response.raise_for_status()

async def delete_post_channel(self, channel_id: int) -> None:
response = await self.client.delete(f"/post-channels/{channel_id}/")
response.raise_for_status()
@validate_external_json_response()
async def delete_post_channel(self, channel_id: int) -> Response:
return await self.client.delete(f"/post-channels/{channel_id}/")
Loading