From 09a52bbac1572173241e122ca154747d3c582385 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=98JoinTyang=E2=80=99?= Date: Mon, 30 Sep 2024 10:52:41 +0800 Subject: [PATCH] add file access log --- dtable_events/app/app.py | 6 ++ dtable_events/app/mq_handler.py | 101 +++++++++++++++++++++++++++++++ dtable_events/events/db.py | 16 +++++ dtable_events/events/handlers.py | 35 +++++++++++ dtable_events/events/models.py | 41 +++++++++++++ 5 files changed, 199 insertions(+) create mode 100644 dtable_events/app/mq_handler.py create mode 100644 dtable_events/events/db.py create mode 100644 dtable_events/events/handlers.py create mode 100644 dtable_events/events/models.py diff --git a/dtable_events/app/app.py b/dtable_events/app/app.py index 520d9bff..13a9b5ce 100644 --- a/dtable_events/app/app.py +++ b/dtable_events/app/app.py @@ -23,6 +23,7 @@ from dtable_events.workflow.workflow_schedules_scanner import WorkflowSchedulesScanner from dtable_events.convert_page.manager import conver_page_to_pdf_manager from dtable_events.api_calls.api_calls_counter import APICallsCounter +from dtable_events.app.mq_handler import EventsHandler, init_message_handlers class App(object): @@ -62,6 +63,9 @@ def __init__(self, config, task_mode): # convert pdf manager conver_page_to_pdf_manager.init(config) + init_message_handlers() + self._events_handler = EventsHandler(config) + def serve_forever(self): if self._enable_foreground_tasks: @@ -93,3 +97,5 @@ def serve_forever(self): self._license_expiring_notices_sender.start() # always True # convert pdf manager conver_page_to_pdf_manager.start() # always True + + self._events_handler.start() diff --git a/dtable_events/app/mq_handler.py b/dtable_events/app/mq_handler.py new file mode 100644 index 00000000..1afb8cbf --- /dev/null +++ b/dtable_events/app/mq_handler.py @@ -0,0 +1,101 @@ +import time +import logging +from threading import Thread + +from seaserv import seafile_api + +import dtable_events.events.handlers as events_handlers +from dtable_events.db import init_db_session_class + +logger = logging.getLogger(__name__) + +__all__ = [ + 'EventsHandler', + 'init_message_handlers' +] + + +class MessageHandler(object): + def __init__(self): + # A (channel, List) map. For a given channel, there may be + # multiple handlers + self._handlers = {} + + def add_handler(self, msg_type, func): + if msg_type in self._handlers: + funcs = self._handlers[msg_type] + else: + funcs = [] + self._handlers[msg_type] = funcs + + if func not in funcs: + funcs.append(func) + + def handle_message(self, config, session, channel, msg): + pos = msg['content'].find('\t') + if pos == -1: + logger.warning("invalid message format: %s", msg) + return + + msg_type = channel + ':' + msg['content'][:pos] + if msg_type not in self._handlers: + return + + if msg_type not in self._handlers: + return + + funcs = self._handlers.get(msg_type) + for func in funcs: + try: + func(config, session, msg) + except Exception as e: + logger.exception("error when handle msg: %s", e) + + def get_channels(self): + channels = set() + for msg_type in self._handlers: + pos = msg_type.find(':') + channels.add(msg_type[:pos]) + + return channels + + +message_handler = MessageHandler() + + +def init_message_handlers(): + events_handlers.register_handlers(message_handler) + + +class EventsHandler(object): + + def __init__(self, config): + self._config = config + self._db_session_class = init_db_session_class(config) + + def handle_event(self, channel): + config = self._config + session = self._db_session_class() + while 1: + try: + msg = seafile_api.pop_event(channel) + except Exception as e: + logger.error('Failed to get event: %s' % e) + time.sleep(3) + continue + if msg: + try: + message_handler.handle_message(config, session, channel, msg) + except Exception as e: + logger.error(e) + finally: + session.close() + else: + time.sleep(0.5) + + def start(self): + channels = message_handler.get_channels() + logger.info('Subscribe to channels: %s', channels) + for channel in channels: + event_handler = Thread(target=self.handle_event, args=(channel, )) + event_handler.start() diff --git a/dtable_events/events/db.py b/dtable_events/events/db.py new file mode 100644 index 00000000..1e12a12a --- /dev/null +++ b/dtable_events/events/db.py @@ -0,0 +1,16 @@ +import logging +import datetime + +from .models import FileAudit + +logger = logging.getLogger(__name__) + + +def save_file_audit_event(session, timestamp, etype, user, ip, device, org_id, dtable_uuid, file_path): + if timestamp is None: + timestamp = datetime.datetime.utcnow() + + file_audit = FileAudit(timestamp, etype, user, ip, device, org_id, dtable_uuid, file_path) + + session.add(file_audit) + session.commit() diff --git a/dtable_events/events/handlers.py b/dtable_events/events/handlers.py new file mode 100644 index 00000000..4c5099de --- /dev/null +++ b/dtable_events/events/handlers.py @@ -0,0 +1,35 @@ +# coding: utf-8 +import logging +import logging.handlers +import datetime + +from dtable_events.events.db import save_file_audit_event + + +def FileAuditEventHandler(config, session, msg): + elements = msg['content'].split('\t') + if len(elements) != 7: + logging.warning("got bad message: %s", elements) + return + + logging.warning('FileAuditEventHandler elements:%s', elements) + + timestamp = datetime.datetime.utcfromtimestamp(msg['ctime']) + msg_type = elements[0] + user_name = elements[1] + ip = elements[2] + user_agent = elements[3] + org_id = elements[4] + dtable_uuid = elements[5] + file_path = elements[6] + if not file_path.startswith('/'): + file_path = '/' + file_path + + save_file_audit_event(session, timestamp, msg_type, user_name, ip, + user_agent, org_id, dtable_uuid, file_path) + + +def register_handlers(handlers): + handlers.add_handler('seahub.audit:file-download-web', FileAuditEventHandler) + handlers.add_handler('seahub.audit:file-download-api', FileAuditEventHandler) + handlers.add_handler('seahub.audit:file-download-share-link', FileAuditEventHandler) diff --git a/dtable_events/events/models.py b/dtable_events/events/models.py new file mode 100644 index 00000000..565607f7 --- /dev/null +++ b/dtable_events/events/models.py @@ -0,0 +1,41 @@ +# coding: utf-8 +from sqlalchemy.orm import mapped_column +from sqlalchemy.sql.sqltypes import Integer, String, DateTime, Text, BigInteger + +from dtable_events.db import Base + + +class FileAudit(Base): + __tablename__ = 'FileAudit' + + eid = mapped_column(BigInteger, primary_key=True, autoincrement=True) + timestamp = mapped_column(DateTime, nullable=False, index=True) + etype = mapped_column(String(length=128), nullable=False) + user = mapped_column(String(length=255), nullable=False, index=True) + ip = mapped_column(String(length=45), nullable=False) + device = mapped_column(Text, nullable=False) + org_id = mapped_column(Integer, nullable=False) + dtable_uuid = mapped_column(String(length=36), nullable=False, index=True) + file_path = mapped_column(Text, nullable=False) + + def __init__(self, timestamp, etype, user, ip, device, + org_id, dtable_uuid, file_path): + super().__init__() + self.timestamp = timestamp + self.etype = etype + self.user = user + self.ip = ip + self.device = device + self.org_id = org_id + self.dtable_uuid = dtable_uuid + self.file_path = file_path + + def __str__(self): + if self.org_id > 0: + return "FileAudit" % \ + (self.etype, self.user, self.ip, self.device, + self.org_id, self.dtable_uuid, self.file_path) + else: + return "FileAudit" % \ + (self.etype, self.user, self.ip, self.device, self.dtable_uuid, self.file_path)