Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dtable_events/app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dtable_events.workflow.workflow_schedules_scanner import WorkflowSchedulesScanner
from dtable_events.convert_page.manager import conver_page_to_pdf_manager
from dtable_events.api_calls.api_calls_counter import APICallsCounter
from dtable_events.app.mq_handler import EventsHandler, init_message_handlers


class App(object):
Expand Down Expand Up @@ -62,6 +63,9 @@ def __init__(self, config, task_mode):
# convert pdf manager
conver_page_to_pdf_manager.init(config)

init_message_handlers()
self._events_handler = EventsHandler(config)

def serve_forever(self):

if self._enable_foreground_tasks:
Expand Down Expand Up @@ -93,3 +97,5 @@ def serve_forever(self):
self._license_expiring_notices_sender.start() # always True
# convert pdf manager
conver_page_to_pdf_manager.start() # always True

self._events_handler.start()
101 changes: 101 additions & 0 deletions dtable_events/app/mq_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import time
import logging
from threading import Thread

from seaserv import seafile_api

import dtable_events.events.handlers as events_handlers
from dtable_events.db import init_db_session_class

logger = logging.getLogger(__name__)

__all__ = [
'EventsHandler',
'init_message_handlers'
]


class MessageHandler(object):
def __init__(self):
# A (channel, List<handler>) map. For a given channel, there may be
# multiple handlers
self._handlers = {}

def add_handler(self, msg_type, func):
if msg_type in self._handlers:
funcs = self._handlers[msg_type]
else:
funcs = []
self._handlers[msg_type] = funcs

if func not in funcs:
funcs.append(func)

def handle_message(self, config, session, channel, msg):
pos = msg['content'].find('\t')
if pos == -1:
logger.warning("invalid message format: %s", msg)
return

msg_type = channel + ':' + msg['content'][:pos]
if msg_type not in self._handlers:
return

if msg_type not in self._handlers:
return

funcs = self._handlers.get(msg_type)
for func in funcs:
try:
func(config, session, msg)
except Exception as e:
logger.exception("error when handle msg: %s", e)

def get_channels(self):
channels = set()
for msg_type in self._handlers:
pos = msg_type.find(':')
channels.add(msg_type[:pos])

return channels


message_handler = MessageHandler()


def init_message_handlers():
events_handlers.register_handlers(message_handler)


class EventsHandler(object):

def __init__(self, config):
self._config = config
self._db_session_class = init_db_session_class(config)

def handle_event(self, channel):
config = self._config
session = self._db_session_class()
while 1:
try:
msg = seafile_api.pop_event(channel)
except Exception as e:
logger.error('Failed to get event: %s' % e)
time.sleep(3)
continue
if msg:
try:
message_handler.handle_message(config, session, channel, msg)
except Exception as e:
logger.error(e)
finally:
session.close()
else:
time.sleep(0.5)

def start(self):
channels = message_handler.get_channels()
logger.info('Subscribe to channels: %s', channels)
for channel in channels:
event_handler = Thread(target=self.handle_event, args=(channel, ))
event_handler.start()
16 changes: 16 additions & 0 deletions dtable_events/events/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import logging
import datetime

from .models import FileAudit

logger = logging.getLogger(__name__)


def save_file_audit_event(session, timestamp, etype, user, ip, device, org_id, dtable_uuid, file_path):
if timestamp is None:
timestamp = datetime.datetime.utcnow()

file_audit = FileAudit(timestamp, etype, user, ip, device, org_id, dtable_uuid, file_path)

session.add(file_audit)
session.commit()
35 changes: 35 additions & 0 deletions dtable_events/events/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# coding: utf-8
import logging
import logging.handlers
import datetime

from dtable_events.events.db import save_file_audit_event


def FileAuditEventHandler(config, session, msg):
elements = msg['content'].split('\t')
if len(elements) != 7:
logging.warning("got bad message: %s", elements)
return

logging.warning('FileAuditEventHandler elements:%s', elements)

timestamp = datetime.datetime.utcfromtimestamp(msg['ctime'])
msg_type = elements[0]
user_name = elements[1]
ip = elements[2]
user_agent = elements[3]
org_id = elements[4]
dtable_uuid = elements[5]
file_path = elements[6]
if not file_path.startswith('/'):
file_path = '/' + file_path

save_file_audit_event(session, timestamp, msg_type, user_name, ip,
user_agent, org_id, dtable_uuid, file_path)


def register_handlers(handlers):
handlers.add_handler('seahub.audit:file-download-web', FileAuditEventHandler)
handlers.add_handler('seahub.audit:file-download-api', FileAuditEventHandler)
handlers.add_handler('seahub.audit:file-download-share-link', FileAuditEventHandler)
41 changes: 41 additions & 0 deletions dtable_events/events/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# coding: utf-8
from sqlalchemy.orm import mapped_column
from sqlalchemy.sql.sqltypes import Integer, String, DateTime, Text, BigInteger

from dtable_events.db import Base


class FileAudit(Base):
__tablename__ = 'FileAudit'

eid = mapped_column(BigInteger, primary_key=True, autoincrement=True)
timestamp = mapped_column(DateTime, nullable=False, index=True)
etype = mapped_column(String(length=128), nullable=False)
user = mapped_column(String(length=255), nullable=False, index=True)
ip = mapped_column(String(length=45), nullable=False)
device = mapped_column(Text, nullable=False)
org_id = mapped_column(Integer, nullable=False)
dtable_uuid = mapped_column(String(length=36), nullable=False, index=True)
file_path = mapped_column(Text, nullable=False)

def __init__(self, timestamp, etype, user, ip, device,
org_id, dtable_uuid, file_path):
super().__init__()
self.timestamp = timestamp
self.etype = etype
self.user = user
self.ip = ip
self.device = device
self.org_id = org_id
self.dtable_uuid = dtable_uuid
self.file_path = file_path

def __str__(self):
if self.org_id > 0:
return "FileAudit<EventType = %s, User = %s, IP = %s, Device = %s, DtableUuid = %s, \
OrgID = %s, FilePath = %s>" % \
(self.etype, self.user, self.ip, self.device,
self.org_id, self.dtable_uuid, self.file_path)
else:
return "FileAudit<EventType = %s, User = %s, IP = %s, Device = %s, DtableUuid = %s, FilePath = %s>" % \
(self.etype, self.user, self.ip, self.device, self.dtable_uuid, self.file_path)