Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
142 commits
Select commit Hold shift + click to select a range
6ee8224
uvicorn asgi
jmlago Sep 8, 2025
e1af102
fastapi migration workiing with uvicorn
jmlago Sep 8, 2025
7fbca4f
fastapi endpoints working
jmlago Sep 9, 2025
0e3c61c
add validators working
jmlago Sep 9, 2025
eb8e9ca
update and delete vals fixed
jmlago Sep 9, 2025
cab942f
consensus background tasks working
jmlago Sep 9, 2025
eb388ee
some fixes
jmlago Sep 9, 2025
27a8120
non dead workers and RPC logs showing on frontend
jmlago Sep 9, 2025
d7cf210
gen_call working
jmlago Sep 9, 2025
ba31a27
fix: db sharing session management across endpoints
cristiam86 Sep 10, 2025
f492f97
chore: improved logging
cristiam86 Sep 10, 2025
27935cf
Merge remote-tracking branch 'origin/main' into gunicorn
cristiam86 Sep 10, 2025
35715a9
chore: removed unused code
cristiam86 Sep 10, 2025
8a0e6f9
chore: early return for health ping method and improved db session ma…
cristiam86 Sep 10, 2025
09f4d43
chore: removed unused workflow
cristiam86 Sep 10, 2025
b568f18
Merge remote-tracking branch 'origin/main' into gunicorn
cristiam86 Sep 11, 2025
6a62c03
feat(genvm): update to v0.2
kp2pml30 Sep 11, 2025
9236b62
fix: json rpc response no compliant
cristiam86 Sep 11, 2025
c1d248d
fix: disabled raising error when consensus_service forward transactio…
cristiam86 Sep 11, 2025
1220c4b
fix: remove contract state from transaction data
cristiam86 Sep 11, 2025
4bedf3c
chore: add logs to RPC execution
cristiam86 Sep 11, 2025
e1263b1
wip: fixing validators registry
cristiam86 Sep 11, 2025
3f91499
wip: fixing validators registry
cristiam86 Sep 11, 2025
4f10fc3
fix: singleton websocket
kstroobants Sep 12, 2025
965d9da
fix: disable some logs with env var DISABLE_INFO_LOGS_ENDPOINTS
kstroobants Sep 12, 2025
5aaf83d
fix: success logs are green
kstroobants Sep 12, 2025
2a50687
fix: duplicated finality window log for new browser
kstroobants Sep 12, 2025
486cf90
fix: bug in process round data returns error when appealing
kstroobants Sep 12, 2025
2545d4a
fix: application now shuts down gracefully without errors
kstroobants Sep 12, 2025
52122ab
fix: standarized JSONRPC Errors
cristiam86 Sep 12, 2025
99f7e38
fix: simplify llm and web modules execution
cristiam86 Sep 15, 2025
7d35fa3
chore: removed tx execution semaphore
cristiam86 Sep 15, 2025
bbc3d93
chore: disable hardhat and allow RPC batch calls
cristiam86 Sep 15, 2025
d999c1f
chore: optionally disable hardhat by removing the env
cristiam86 Sep 15, 2025
0d55c30
fix: avoid nullifying the web3 client
cristiam86 Sep 15, 2025
73bbbba
wip: adjusting genvm timeouts
cristiam86 Sep 16, 2025
822b322
wip: revert genvm changes
cristiam86 Sep 16, 2025
b09a728
chore: setup cpus and memory limit for jsonrpc container
cristiam86 Sep 16, 2025
33baa39
chore: setup env vars for cpu and mem limit
cristiam86 Sep 16, 2025
c9e373b
fix: set main mem and cpu env vars same as deploy section
cristiam86 Sep 16, 2025
12de6a8
fix: improved genvm unzip based on system architecture
cristiam86 Sep 16, 2025
2049530
fix: add debugging to find hosted studio issue websocket
kstroobants Sep 18, 2025
0289260
Merge remote-tracking branch 'origin/feat/update-genvm-to-v0.2' into …
cristiam86 Sep 18, 2025
9b0ffb4
fix: simplify debugging
kstroobants Sep 18, 2025
d3f5c25
Merge branch 'gunicorn' of github.com:yeagerai/genlayer-studio into g…
cristiam86 Sep 18, 2025
be2c2b2
fix: solve print error
kstroobants Sep 18, 2025
ef4ff66
Merge branch 'gunicorn' of github.com:yeagerai/genlayer-studio into g…
cristiam86 Sep 18, 2025
bf1c93d
Revert "feat(genvm): update to v0.2"
cristiam86 Sep 18, 2025
cb56de3
fix: jsonrpc docker stuck in downloading runners
cristiam86 Sep 18, 2025
d6274fc
chore: reverted separated genvm runners download
cristiam86 Sep 18, 2025
f377c88
Merge remote-tracking branch 'origin/main' into gunicorn
cristiam86 Sep 18, 2025
66a27a4
fix: endpoint registry and dockerfile typo
cristiam86 Sep 18, 2025
c7955b1
fix: add ws and health to traefik routing
kstroobants Sep 19, 2025
12ed34f
fix: remove true from backend docker file
kstroobants Sep 19, 2025
aa2333b
fix: remove debugging message to find websocket issue
kstroobants Sep 19, 2025
8955af6
fix: revert changes related to adding prints for websocket issue
kstroobants Sep 19, 2025
07acfa7
docs: added architecture refactor plan
cristiam86 Sep 19, 2025
71687d0
Merge branch 'gunicorn' of github.com:yeagerai/genlayer-studio into g…
cristiam86 Sep 19, 2025
e133b48
docs: relocate report
cristiam86 Sep 19, 2025
e4ef649
refactor: generate tx hash similar to consensus
cristiam86 Sep 19, 2025
cf74c54
refactor: restructured fastapi interface and dependencies management
cristiam86 Sep 23, 2025
0a787c5
test: added type to gen_call
cristiam86 Sep 24, 2025
6fff643
fix: get_block_by_hash params
cristiam86 Sep 24, 2025
6637b94
fix: improved tx decoding parsing
cristiam86 Sep 24, 2025
7628db6
fix: improved logging error data
cristiam86 Sep 24, 2025
4fc803f
fix: deleted tests
mpaya5 Sep 25, 2025
8f09cce
fix: fix test_rpc_endpoint_manager.py
mpaya5 Sep 25, 2025
ac4326d
fix: fix storage
mpaya5 Sep 25, 2025
70737b1
feat: finished tests
mpaya5 Sep 25, 2025
27dac6e
Potential fix for code scanning alert no. 137: Use of externally-cont…
mpaya5 Sep 25, 2025
e8a24c5
Potential fix for code scanning alert no. 136: Information exposure t…
mpaya5 Sep 25, 2025
c12194e
feat: pre-commit run
mpaya5 Sep 25, 2025
fe07c77
fix: sample env format
cristiam86 Sep 26, 2025
19f6a94
fix: removed set env variables already in config
cristiam86 Sep 26, 2025
46b82c3
Update frontend/test/unit/hooks/useWebSocketClient.test.ts
mpaya5 Sep 26, 2025
dc42071
fix: reorganice unit tests
mpaya5 Sep 26, 2025
24fe280
fix: added units to default mem values
cristiam86 Sep 26, 2025
481111f
fix: minor fixes on types and checks
cristiam86 Sep 26, 2025
8ce7bea
tests: fixes and improvements
cristiam86 Sep 26, 2025
45836ae
fix: removed channel and improved types and checks
cristiam86 Sep 26, 2025
900a10e
fix: small checks and fixes for types and tests
cristiam86 Sep 26, 2025
533d189
fix: concurrent verify_for_read() can double-restart and kill a fresh…
cristiam86 Sep 26, 2025
dc4cec1
fix: check RPC request ID in the response object
cristiam86 Sep 26, 2025
e12da06
fix: added test assert
cristiam86 Sep 26, 2025
2e39933
fix: deadlock using LLM module
cristiam86 Sep 26, 2025
b872e73
Apply suggestion from @coderabbitai[bot]
mpaya5 Sep 29, 2025
3312115
Apply suggestion from @coderabbitai[bot]
mpaya5 Sep 29, 2025
02134c3
Apply suggestion from @coderabbitai[bot]
mpaya5 Sep 29, 2025
fa140dd
feat: deletted commentted comments
mpaya5 Sep 29, 2025
2986f32
Revert "Apply suggestion from @coderabbitai[bot]"
mpaya5 Sep 29, 2025
e4f7e84
fix: format
cristiam86 Sep 29, 2025
997dade
fix: precommit
cristiam86 Sep 29, 2025
15f47b9
fix: types for execute API request
cristiam86 Sep 29, 2025
4fb5c96
fix: logging standarization
cristiam86 Sep 29, 2025
79cdf1f
Merge branch 'gunicorn' of github.com:yeagerai/genlayer-studio into g…
cristiam86 Sep 29, 2025
47a5c43
fix: avoid silently ignoring falsy data values
cristiam86 Sep 29, 2025
f39d8c1
Merge remote-tracking branch 'origin/dxp-682-fix-current-tests' into …
cristiam86 Sep 29, 2025
5dcd493
Merge branch 'gunicorn' of github.com:yeagerai/genlayer-studio into g…
cristiam86 Sep 29, 2025
01aadd9
fix: unit tests errors and warnings
cristiam86 Sep 29, 2025
fad236d
Merge branch 'gunicorn' into dxp-683-separate-re-group-unit-tests
mpaya5 Sep 29, 2025
3ebb088
fix: Consensus test helper async execution improvements (#1328)
mpaya5 Sep 29, 2025
31733a7
fix: websockets tests
cristiam86 Sep 29, 2025
99d1d8d
Merge branch 'gunicorn' of github.com:yeagerai/genlayer-studio into g…
cristiam86 Sep 29, 2025
f384d3a
Merge branch 'gunicorn' into dxp-683-separate-re-group-unit-tests
mpaya5 Sep 30, 2025
2b7600e
fix: fix tests
mpaya5 Sep 30, 2025
01a0658
fix: pre-commit run
mpaya5 Sep 30, 2025
0a7c2d9
fix: tests and removed unnecessary print
cristiam86 Sep 30, 2025
71f6790
fix: removed state from test receipt schema
cristiam86 Sep 30, 2025
20d6da1
merge: merged gunicorn
mpaya5 Oct 1, 2025
d45800c
merge: merged main
mpaya5 Oct 1, 2025
f1b71c0
Merge branch 'main' into dxp-683-separate-re-group-unit-tests
kstroobants Oct 9, 2025
a181f2e
fix: use same names for bake and don't build frontend for ci test
kstroobants Oct 9, 2025
6bacf42
fix: ci test job uses one worker because multiple workers do not see …
kstroobants Oct 10, 2025
b0d6157
fix: use validators_manager.registry to trigger do_write interceptor …
kstroobants Oct 10, 2025
c6bce36
fix: session caching issue, fix unit tests
kstroobants Oct 13, 2025
eeb93f5
fix: unit tests
kstroobants Oct 13, 2025
a547987
fix: only run 1 failed test_company_naming
kstroobants Oct 13, 2025
b6b0c31
fix: json dump to string
kstroobants Oct 13, 2025
7cc8c6e
fix: revert json dump; add consumed_gen in mock response
kstroobants Oct 14, 2025
d0ceb9a
fix: enable all integration tests; rabbitai remark
kstroobants Oct 14, 2025
d17c6c4
fix: only run failed test_multi_tenant_storage
kstroobants Oct 14, 2025
b4da41a
fix: change workflow
kstroobants Oct 14, 2025
1cd3709
fix: add compose env vars in integration test
kstroobants Oct 14, 2025
3078f42
fix: run all integration tests
kstroobants Oct 14, 2025
fa3212f
fix: change interval unit test
kstroobants Oct 14, 2025
f6a2e49
Merge branch 'main' into dxp-699-fix-broken-ci-test
kstroobants Oct 14, 2025
aff26ad
Merge branch 'dxp-699-fix-broken-ci-test' into dxp-683-separate-re-gr…
kstroobants Oct 15, 2025
93f5fa9
fix: solve ci tests fails
kstroobants Oct 15, 2025
88c9ec3
fix: solve ci test
kstroobants Oct 15, 2025
a3337f4
Merge branch 'main' into dxp-699-fix-broken-ci-test
kstroobants Oct 20, 2025
35aaf0a
fix: production docker file was removed
kstroobants Oct 20, 2025
7abcd50
fix: pre-commit
kstroobants Oct 20, 2025
b4bb067
fix: only run first failed integration test
kstroobants Oct 20, 2025
16fb2d3
fix: add consensus-worker to test and error handling
kstroobants Oct 20, 2025
8b06388
fix: add consensus_worker
kstroobants Oct 20, 2025
bb631bf
fix: add imagename to consensus-worker for cache
kstroobants Oct 20, 2025
ecd788a
fix: try only one integration test
kstroobants Oct 21, 2025
c833d52
fix: add pub sub for validator change to redis
kstroobants Oct 21, 2025
746638e
fix: redis in app_lifespan.py
kstroobants Oct 21, 2025
5e5eeac
fix: add optional
kstroobants Oct 21, 2025
0a56ce6
fix: enable all integration tests
kstroobants Oct 21, 2025
a85d569
Merge branch 'dxp-699-fix-broken-ci-test' into dxp-683-separate-re-gr…
kstroobants Oct 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .github/workflows/backend_integration_tests_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ jobs:
OPENAIKEY: ${{ secrets.OPENAIKEY }}
run: |
sed -i "s/<add_your_openai_api_key_here>/${OPENAIKEY}/g" .env
sed -i "s/FINALITY_WINDOW =.*/FINALITY_WINDOW = 10/" .env
sed -i "s/VITE_FINALITY_WINDOW=\".*\"/VITE_FINALITY_WINDOW=\"10\"/" .env
sed -i "s/COMPOSE_CPU_LIMIT=\".*\"/COMPOSE_CPU_LIMIT=\"4\"/" .env
sed -i "s/COMPOSE_CPU_RESERVATION=\".*\"/COMPOSE_CPU_RESERVATION=\"1\"/" .env
sed -i "s/COMPOSE_MEM_LIMIT=\".*\"/COMPOSE_MEM_LIMIT=\"6gb\"/" .env
sed -i "s/COMPOSE_MEM_RESERVATION=\".*\"/COMPOSE_MEM_RESERVATION=\"2gb\"/" .env
echo >> .env
if [[ "${{ needs.triggers.outputs.is_pull_request_review_approved }}" == "true" ]]; then
echo "TEST_WITH_MOCK_LLMS=false" >> .env
Expand All @@ -67,18 +71,18 @@ jobs:
with:
files: |
./docker-compose.yml
./docker-compose.production.yml
targets: |
database-migration
jsonrpc
consensus-worker
set: |
*.cache-from=type=gha
*.cache-to=type=gha,mode=max
load: true

# Start services without rebuilding images
- name: Run Docker Compose
run: docker compose up -d --no-build
run: docker compose up -d --no-build jsonrpc consensus-worker



Expand Down
9 changes: 9 additions & 0 deletions backend/consensus/worker_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ def get_session():
validators_manager = validators.Manager(SessionLocal())
await validators_manager.restart()

# Subscribe to validator change events
async def handle_validator_change(event_data):
"""Reload validators when notified of changes."""
logger.info(f"Received validator change event: {event_data}")
await validators_manager.restart()

# Subscribe to validator events channel
await msg_handler.subscribe_to_validator_events(handle_validator_change)

# Create and start the worker
worker = ConsensusWorker(
get_session=get_session,
Expand Down
13 changes: 11 additions & 2 deletions backend/database_handler/llm_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from backend.node.create_nodes.providers import get_default_providers
from .models import LLMProviderDBModel
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError


class LLMProviderRegistry:
Expand All @@ -18,7 +19,11 @@ def reset_defaults(self):
for provider in providers:
self.session.add(_to_db_model(provider, is_default=True))

self.session.commit()
try:
self.session.commit()
except IntegrityError:
# Another worker already seeded the providers, rollback and continue
self.session.rollback()

def update_defaults(self):
"""Update default providers while preserving custom ones."""
Expand Down Expand Up @@ -62,7 +67,11 @@ def update_defaults(self):
}
)

self.session.commit()
try:
self.session.commit()
except IntegrityError:
# Another worker already seeded the providers, rollback and continue
self.session.rollback()

def get_all(self) -> list[LLMProvider]:
return [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,34 @@
Create Date: 2025-09-11 13:18:28.225878

"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = '96840ab9133a'
down_revision: Union[str, None] = '035ef00c2779'
revision: str = "96840ab9133a"
down_revision: Union[str, None] = "035ef00c2779"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('transactions', sa.Column('blocked_at', sa.DateTime(timezone=True), nullable=True))
op.add_column('transactions', sa.Column('worker_id', sa.String(length=255), nullable=True))
op.add_column(
"transactions",
sa.Column("blocked_at", sa.DateTime(timezone=True), nullable=True),
)
op.add_column(
"transactions", sa.Column("worker_id", sa.String(length=255), nullable=True)
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('transactions', 'worker_id')
op.drop_column('transactions', 'blocked_at')
op.drop_column("transactions", "worker_id")
op.drop_column("transactions", "blocked_at")
# ### end Alembic commands ###
6 changes: 6 additions & 0 deletions backend/database_handler/validators_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ def _get_validator_or_fail(self, validator_address: str) -> Validators:
return validator_data

def count_validators(self) -> int:
# Expire all objects to ensure we get fresh data from the database
self.session.expire_all()
return self.session.query(Validators).count()

def get_all_validators(self, include_private_key: bool = True) -> List[dict]:
# Expire all objects to ensure we get fresh data from the database
self.session.expire_all()
validators_data = self.session.query(Validators).all()
return [
to_dict(validator, include_private_key) for validator in validators_data
Expand All @@ -55,6 +59,8 @@ def get_all_validators(self, include_private_key: bool = True) -> List[dict]:
def get_validator(
self, validator_address: str, include_private_key: bool = True
) -> dict:
# Expire all objects to ensure we get fresh data from the database
self.session.expire_all()
return to_dict(
self._get_validator_or_fail(validator_address), include_private_key
)
Expand Down
23 changes: 21 additions & 2 deletions backend/protocol_rpc/app_lifespan.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,16 @@ def _verify_database_ready(db_manager: DatabaseSessionManager) -> None:
async def _initialise_validators(
validators_config_json: Optional[str],
db_manager: DatabaseSessionManager,
validators_manager: validators.Manager,
) -> None:
if not validators_config_json:
return

init_session = db_manager.open_session()
try:
await initialize_validators(validators_config_json, init_session)
await initialize_validators(
validators_config_json, init_session, validators_manager
)
init_session.commit()
finally:
init_session.close()
Expand Down Expand Up @@ -201,7 +204,9 @@ async def rpc_app_lifespan(app, settings: RPCAppSettings) -> AsyncIterator[RPCAp
# Delete all validators from the database and create new ones based on env.VAlIDATORS_CONFIG_JSON
if settings.validators_config_json:
logger.info("[STARTUP] Initializing validators from config")
await _initialise_validators(settings.validators_config_json, db_manager)
await _initialise_validators(
settings.validators_config_json, db_manager, validators_manager
)

# Restart web and llm modules, created the validators Snapshot, and registers providers and models to the LLM module
logger.info("[STARTUP] Restarting validators and creating snapshot")
Expand Down Expand Up @@ -311,6 +316,20 @@ def get_session() -> Session:
)
await redis_subscriber.connect()
await redis_subscriber.start()

# Register handler for validator change events
async def handle_validator_change(event_data):
"""Reload validators when they change."""
logger.info(f"RPC worker reloading validators due to change event")
await validators_manager.restart()

redis_subscriber.register_handler("validator_created", handle_validator_change)
redis_subscriber.register_handler("validator_updated", handle_validator_change)
redis_subscriber.register_handler("validator_deleted", handle_validator_change)
redis_subscriber.register_handler(
"all_validators_deleted", handle_validator_change
)

logger.info(
f"[STARTUP] Redis subscriber connected at {redis_url} for worker event broadcasting"
)
Expand Down
25 changes: 10 additions & 15 deletions backend/protocol_rpc/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def delete_provider(session: Session, id: int) -> None:

async def create_validator(
session: Session,
validators_manager: validators.Manager,
stake: int,
provider: str,
model: str,
Expand All @@ -215,11 +216,10 @@ async def create_validator(
validate_provider(llm_provider)

accounts_manager = AccountsManager(session)
validators_registry = ModifiableValidatorsRegistry(session)

account = accounts_manager.create_new_account()

return await validators_registry.create_validator(
return await validators_manager.registry.create_validator(
Validator(
address=account.address,
private_key=account.key,
Expand Down Expand Up @@ -257,7 +257,6 @@ async def create_random_validators(
limit_models: list[str] = None,
) -> list[dict]:
accounts_manager = AccountsManager(session)
validators_registry = ModifiableValidatorsRegistry(session)
llm_provider_registry = LLMProviderRegistry(session)

limit_providers = limit_providers or []
Expand All @@ -276,7 +275,7 @@ async def create_random_validators(
stake = random.randint(min_stake, max_stake)
validator_account = accounts_manager.create_new_account()

validator = await validators_registry.create_validator(
validator = await validators_manager.registry.create_validator(
Validator(
address=validator_account.address,
private_key=validator_account.key,
Expand All @@ -292,6 +291,7 @@ async def create_random_validators(
@check_forbidden_method_in_hosted_studio
async def update_validator(
session: Session,
validators_manager: validators.Manager,
validator_address: str,
stake: int,
provider: str,
Expand Down Expand Up @@ -321,38 +321,33 @@ async def update_validator(
)
validate_provider(llm_provider)

validators_registry = ModifiableValidatorsRegistry(session)

validator = Validator(
address=validator_address,
stake=stake,
llmprovider=llm_provider,
)
return await validators_registry.update_validator(validator)
return await validators_manager.registry.update_validator(validator)


@check_forbidden_method_in_hosted_studio
async def delete_validator(
session: Session,
validators_manager: validators.Manager,
validator_address: str,
) -> str:
# Remove validation while adding migration to update the db address
# if not accounts_manager.is_valid_address(validator_address):
# raise InvalidAddressError(validator_address)

validators_registry = ModifiableValidatorsRegistry(session)

await validators_registry.delete_validator(validator_address)
await validators_manager.registry.delete_validator(validator_address)
return validator_address


@check_forbidden_method_in_hosted_studio
async def delete_all_validators(
session: Session,
validators_manager: validators.Manager,
) -> list:
validators_registry = ModifiableValidatorsRegistry(session)
await validators_registry.delete_all_validators()
return validators_registry.get_all_validators()
await validators_manager.registry.delete_all_validators()
return validators_manager.registry.get_all_validators()


def get_all_validators(validators_registry: ValidatorsRegistry) -> list:
Expand Down
46 changes: 46 additions & 0 deletions backend/protocol_rpc/message_handler/redis_worker_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class RedisWorkerMessageHandler(MessageHandler):
CONSENSUS_CHANNEL = "consensus:events"
TRANSACTION_CHANNEL = "transaction:events"
GENERAL_CHANNEL = "general:events"
VALIDATOR_CHANNEL = "validator:events"

def __init__(
self,
Expand Down Expand Up @@ -70,6 +71,51 @@ async def initialize(self):
logger.error(f"Worker {self.worker_id} failed to connect to Redis: {e}")
raise

async def subscribe_to_validator_events(self, callback):
"""
Subscribe to validator change events.
Used by consensus-worker to reload validators when they change.

Args:
callback: Async function to call when validator event received
"""
if not self.redis_client:
await self.initialize()

if not self.redis_client:
logger.error(
"Cannot subscribe to validator events: Redis client not initialized"
)
return

redis_client = self.redis_client

async def listener():
"""Listen for validator events and call callback."""
redis_pubsub = redis_client.pubsub()
await redis_pubsub.subscribe(self.VALIDATOR_CHANNEL)
logger.info(
f"Worker {self.worker_id} subscribed to {self.VALIDATOR_CHANNEL}"
)

try:
async for message in redis_pubsub.listen():
if message["type"] == "message":
try:
event_data = json.loads(message["data"])
await callback(event_data)
except Exception as e:
logger.error(f"Error processing validator event: {e}")
except asyncio.CancelledError:
logger.info(
f"Worker {self.worker_id} validator event listener cancelled"
)
await redis_pubsub.unsubscribe(self.VALIDATOR_CHANNEL)
await redis_pubsub.close()

# Start listener in background
asyncio.create_task(listener())

def _get_channel_for_event(self, log_event: LogEvent) -> str:
"""
Determine the appropriate Redis channel for an event.
Expand Down
Loading
Loading