Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 34 additions & 21 deletions EosPayload/lib/orcheostrator/device_container.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum, unique
from multiprocessing import Process
Expand All @@ -17,13 +18,24 @@ class Status(Enum):
INITIALIZED = 5


@dataclass
class StatusUpdate:
def __init__(self, driver_id: int, status: Status, thread_count: int, reporter: Device, effective: datetime):
self.driver_id = driver_id
self.status = status
self.thread_count = thread_count
self.reporter = reporter
self.effective = effective
driver_id: Device = None
status: Status = None
thread_count: int = None
reporter: Device = None
effective: datetime = None

def update(self, other):
if other.effective is not None and other.effective >= self.effective:
if other.driver_id is not None:
self.driver_id = other.driver_id
if other.status is not None:
self.status = other.status
if other.thread_count is not None:
self.thread_count = other.thread_count
if other.reporter is not None:
self.reporter = other.reporter


class DeviceContainer:
Expand All @@ -32,18 +44,19 @@ def __init__(self, driver: DriverBase, process: Process = None, config: dict = N
self.driver = driver
self.process = process
self.config = config
self.status = Status.NONE
self.thread_count = 0
self.status_reporter = Device.NO_DEVICE
self.status_since = datetime.now()

def update_status(self, status: Status, thread_count: int = 0, reporter: Device = Device.ORCHEOSTRATOR,
effective: datetime = None):
if effective is None:
effective = datetime.now()

if effective >= self.status_since:
self.status = status
self.thread_count = thread_count
self.status_reporter = reporter
self.status_since = effective
self.status = StatusUpdate(
status=Status.NONE,
thread_count=0,
reporter=Device.NO_DEVICE,
effective=datetime.now()
)

def update_status(self, status_update: StatusUpdate = None, status: Status = None, thread_count: int = None,
reporter: Device = Device.ORCHEOSTRATOR, effective: datetime = None):
if status_update is None:
if effective is None:
effective = datetime.now()
status_update = StatusUpdate(status=status, thread_count=thread_count,
reporter=reporter, effective=effective)

self.status.update(status_update)
100 changes: 100 additions & 0 deletions EosPayload/lib/orcheostrator/health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from datetime import datetime, timedelta
from queue import Queue
import logging
import threading
import traceback

from EosLib.device import Device
from EosLib.packet.packet import Packet

from EosPayload.lib.mqtt.client import Client
from EosPayload.lib.orcheostrator.device_container import DeviceContainer, Status, StatusUpdate


class Health:

@staticmethod
def health_monitor(_client, user_data, message):
try:
packet = None
try:
packet = Packet.decode(message.payload)
except Exception as e:
user_data['logger'].error(f"failed to decode health packet: {e}")
return

user_data['logger'].info(f"received health packet from device id={packet.data_header.sender}")

if packet.data_header.sender != user_data['device_id']:
# extracts just the first two fields (is_healthy, thread_count), ignores the rest
is_healthy, thread_count, _ = packet.body.decode('ascii').split(',', 2)

status_update = StatusUpdate(
driver_id=packet.data_header.sender,
status=Status.HEALTHY if int(is_healthy) else Status.UNHEALTHY,
thread_count=thread_count,
reporter=packet.data_header.sender,
effective=packet.data_header.generate_time
)

user_data['queue'].put(status_update)
except Exception as e:
# this is needed b/c apparently an exception in a callback kills the mqtt thread
user_data['logger'].error(f"an unhandled exception occurred while processing health_monitor: {e}"
f"\n{traceback.format_exc()}")

@staticmethod
def health_check(driver_list: dict[Device|str, DeviceContainer], update_queue: Queue,
logger: logging.Logger) -> None:
try:
logger.info("Starting Health Check")
while not update_queue.empty():
status_update = update_queue.get()
driver_list[status_update.driver_id].update_status(status_update)

for key, driver in driver_list.items():
# auto set terminated if it died
if driver.status.status in [Status.HEALTHY, Status.UNHEALTHY]\
and (driver.process is None or not driver.process.is_alive()):
logger.critical(f"process for driver {key} is no longer running -- marking terminated")
driver.update_status(Status.TERMINATED)

# auto set unhealthy if we haven't had a ping in the last 30s from this device
if driver.status.status == Status.HEALTHY\
and driver.status.effective < (datetime.now() - timedelta(seconds=30)):
logger.critical(f"haven't received a health ping from driver {key} in 30s -- marking unhealthy")
driver.update_status(Status.UNHEALTHY)

logger.info(Health.generate_report(driver_list))

logger.info("Done Checking Health")
except Exception as e:
logger.critical("An exception occurred when attempting to perform health check:"
f" {e}\n{traceback.format_exc()}")

@staticmethod
def generate_report(driver_list: dict[Device|str, DeviceContainer]) -> str:
report = {}
for status in Status:
report[status] = []

num_threads = threading.active_count()
for key, driver in driver_list.items():
the_key = key if driver.status in [Status.NONE, Status.INVALID] else driver.config.get("pretty_id")
report[driver.status].append(f"{the_key} ({driver.status.thread_count} threads)"
f" as of {driver.status.effective} (reported by {driver.status.reporter}"
f" [{Device(driver.status.reporter).name}])")
num_threads += int(driver.status.thread_count)

report_string = f"Health Report: \n{len(report[Status.HEALTHY])} drivers running"
report_string += f"\n{num_threads} total threads in use ({threading.active_count()} by OrchEOStrator)"
for status, reports in report.items():
report_string += f"\n\t{status}:"
for item in reports:
report_string += f"\n\t\t{item}"

return report_string

#@staticmethod
#def publish_health_update(mqtt: Client, logger: logging.Logger, device_id: Device, ):
# pass
102 changes: 18 additions & 84 deletions EosPayload/lib/orcheostrator/orcheostrator.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
from datetime import datetime, timedelta
from multiprocessing import Process
from queue import Queue
import logging
import os
import threading
import time
import traceback

from EosLib.device import Device
from EosLib.packet.packet import Packet

from EosPayload.lib.config import OrcheostratorConfigParser
from EosPayload.lib.logger import init_logging
from EosPayload.lib.mqtt import MQTT_HOST
from EosPayload.lib.mqtt.client import Client, Topic
from EosPayload.lib.orcheostrator.device_container import Status, StatusUpdate
from EosPayload.lib.config import OrcheostratorConfigParser
from EosPayload.lib.orcheostrator.device_container import Status
from EosPayload.lib.orcheostrator.health import Health


class OrchEOStrator:

Expand Down Expand Up @@ -45,16 +44,20 @@ def __init__(self, output_directory: str, config_filepath: str):

try:
self._mqtt = Client(MQTT_HOST)
self._mqtt.user_data_set({'logger': self._logger, 'queue': self._health_queue})
self._mqtt.register_subscriber(Topic.HEALTH_HEARTBEAT, self.health_monitor)
self._mqtt.user_data_set({
'device_id': Device.ORCHEOSTRATOR,
'logger': self._logger,
'queue': self._health_queue
})
self._mqtt.register_subscriber(Topic.HEALTH_HEARTBEAT, Health.health_monitor)
except Exception as e:
self._logger.critical(f"Failed to setup MQTT: {e}\n{traceback.format_exc()}")

def __del__(self):
""" Destructor. Terminates all drivers. """
self._logger.info("shutting down")
self.terminate()
self._health_check()
Health.health_check(self._drivers, self._health_queue, self._logger)
logging.shutdown()

#
Expand All @@ -64,7 +67,7 @@ def __del__(self):
def run(self) -> None:
self._spawn_drivers()
while True:
self._health_check()
Health.health_check(self._drivers, self._health_queue, self._logger)
time.sleep(10)
# future: anything else OrchEOStrator is responsible for doing. Perhaps handling "force terminate" commands
# or MQTT things
Expand All @@ -75,7 +78,7 @@ def terminate(self) -> None:
if driver.status in [Status.HEALTHY, Status.UNHEALTHY]:
self._logger.info(f"terminating process for device id {device_id}")
driver.process.terminate()
driver.update_status(Status.TERMINATED)
driver.update_status(status=Status.TERMINATED)

#
# PROTECTED HELPER METHODS
Expand All @@ -92,7 +95,11 @@ def _spawn_drivers(self) -> None:
try:
self._logger.info(f"spawning process for device '{driver_config.get('pretty_id')}' from"
f" class '{driver.__name__}'")
proc = Process(target=self._driver_runner, args=(driver, self.output_directory, driver_config), daemon=True)
proc = Process(
target=self._driver_runner,
args=(driver, self.output_directory, driver_config),
daemon=True
)
container.process = proc
proc.start()
self._drivers[driver_config.get("device_id")] = container
Expand All @@ -112,79 +119,6 @@ def _driver_runner(cls, output_directory: str, config: dict) -> None:
"""
cls(output_directory, config).run()

@staticmethod
def health_monitor(_client, user_data, message):
try:
try:
packet = Packet.decode(message.payload)
except Exception as e:
user_data['logger'].error(f"failed to decode health packet: {e}")
return

user_data['logger'].info(f"received health packet from device id={packet.data_header.sender}")

is_healthy, thread_count, _ = packet.body.decode('ascii').split(',', 2)

status_update = StatusUpdate(
driver_id=packet.data_header.sender,
status=Status.HEALTHY if int(is_healthy) else Status.UNHEALTHY,
thread_count=thread_count,
reporter=packet.data_header.sender,
effective=packet.data_header.generate_time
)

user_data['queue'].put(status_update)
except Exception as e:
# this is needed b/c apparently an exception in a callback kills the mqtt thread
user_data['logger'].error(f"an unhandled exception occurred while processing health_monitor: {e}"
f"\n{traceback.format_exc()}")

def _health_check(self) -> None:
try:
self._logger.info("Starting Health Check")
while not self._health_queue.empty():
status_update = self._health_queue.get()
self._drivers[status_update.driver_id].update_status(
status_update.status,
status_update.thread_count,
status_update.reporter,
status_update.effective,
)

num_threads = threading.active_count()
report = {}
for status in Status:
report[status] = []
for key, driver in self._drivers.items():
# auto set terminated if it died
if driver.status in [Status.HEALTHY, Status.UNHEALTHY, Status.INITIALIZED] and (driver.process is None or not driver.process.is_alive()):
self._logger.critical(f"process for driver {key} is no longer running -- marking terminated")
driver.update_status(Status.TERMINATED)

# auto set unhealthy if we haven't had a ping in the last 30s from this device
if driver.status == Status.HEALTHY and driver.status_since < (datetime.now() - timedelta(seconds=30)):
self._logger.critical(f"haven't received a health ping from driver {key} in 30s -- marking unhealthy")
driver.update_status(Status.UNHEALTHY)

the_key = key if driver.status in [Status.NONE, Status.INVALID] else driver.config.get("pretty_id")
report[driver.status].append(f"{the_key} ({driver.thread_count} threads)"
f" as of {driver.status_since} (reported by {driver.status_reporter}"
f" [{Device(driver.status_reporter).name}])")
num_threads += int(driver.thread_count)

report_string = f"Health Report: \n{len(report[Status.HEALTHY])} drivers running"
report_string += f"\n{num_threads} total threads in use ({threading.active_count()} by OrchEOStrator)"
for status, reports in report.items():
report_string += f"\n\t{status}:"
for item in reports:
report_string += f"\n\t\t{item}"
self._logger.info(report_string)

self._logger.info("Done Checking Health")
except Exception as e:
self._logger.critical("An exception occurred when attempting to perform health check:"
f" {e}\n{traceback.format_exc()}")

@staticmethod
def _spin() -> None:
""" Spins to keep the software alive. Never returns. """
Expand Down