Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dtable_events/app/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
from dtable_events.activities.handlers import MessageHandler
from dtable_events.big_data.big_data_auto_archive_scanner import DTableAutoArchiveTaskScanner
from dtable_events.statistics.counter import UserActivityCounter
from dtable_events.dtable_io.dtable_io_server import DTableIOServer
from dtable_events.tasks.instant_notices_sender import InstantNoticeSender
Expand Down Expand Up @@ -51,6 +52,7 @@ def __init__(self, config, task_mode):
self._data_syncr = DataSyncer(config)
self._workflow_schedule_scanner = WorkflowSchedulesScanner(config)
self._dtable_asset_trash_cleaner = DTableAssetTrashCleaner(config)
self._dtable_auto_archive_scanner = DTableAutoArchiveTaskScanner(config)

def serve_forever(self):
if self._enable_foreground_tasks:
Expand Down Expand Up @@ -78,3 +80,4 @@ def serve_forever(self):
self._data_syncr.start() # default True
self._workflow_schedule_scanner.start() # default True
self._dtable_asset_trash_cleaner.start() # always True
self._dtable_auto_archive_scanner.start() # default True
1 change: 1 addition & 0 deletions dtable_events/big_data/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# -*- coding: utf-8 -*-
120 changes: 120 additions & 0 deletions dtable_events/big_data/auto_archive_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import logging
import json
from datetime import datetime
from dtable_events.utils import get_inner_dtable_server_url, uuid_str_to_36_chars
from dtable_events.utils.dtable_server_api import DTableServerAPI
from dtable_events.utils.sql_generator import BaseSQLGenerator

logger = logging.getLogger(__name__)

PER_DAY = 'per_day'
PER_WEEK = 'per_week'
PER_MONTH = 'per_month'

VALID_RUN_CONDITIONS = [
PER_DAY,
PER_WEEK,
PER_MONTH
]

def update_last_run_time(task_id, db_session):

cmd = "UPDATE dtable_auto_archive_task SET last_run_time=:new_time WHERE id=:task_id"
db_session.execute(cmd, {'new_time': datetime.now(), 'task_id': task_id})

def set_invalid(task_id, db_session):
sql = "UPDATE dtable_auto_archive_task SET is_valid=:is_valid WHERE id=:task_id"
try:
db_session.execute(sql, {'is_valid': 0, 'task_id': task_id})
except Exception as e:
logger.error(e)

def meet_condition(run_condition, details):
cur_datetime = datetime.now()
cur_hour = int(cur_datetime.hour)
cur_week_day = cur_datetime.isoweekday()
cur_month_day = cur_datetime.day
if run_condition == PER_DAY:
run_hour = details.get('run_hour', None)
try:
if int(run_hour) == cur_hour:
return True
except:
return False

if run_condition == PER_WEEK:
run_week_day = details.get('run_week_day', None)
run_week_hour = details.get('run_week_hour', None)
try:
if (int(run_week_hour) == cur_hour) and (int(run_week_day) == cur_week_day):
return True
except:
return False

if run_condition == PER_MONTH:
run_month_day = details.get('run_month_day', None)
run_month_hour = details.get('run_month_hour', None)
try:
if (int(run_month_hour) == cur_hour) and (int(run_month_day) == cur_month_day):
return True
except:
return False


return False


def run_dtable_auto_archive_task(task, db_session):

task_id = task[0]
run_condition = task[1]
table_id = task[2]
view_id = task[3]
last_run_time = task[4]
dtable_uuid = task[5]
details = task[6]
creator = task[7]
try:
details = json.loads(details)
if not meet_condition(run_condition, details):
return
dtable_uuid = uuid_str_to_36_chars(dtable_uuid)
dtable_server_url = get_inner_dtable_server_url()
seatable = DTableServerAPI(creator, dtable_uuid, dtable_server_url)
current_table, current_view = None, None
metadata = seatable.get_metadata()
for table in metadata['tables']:
if table.get('_id') == table_id:
current_table = table
break

if not current_table:
set_invalid(task_id, db_session)
return

for view in current_table['views']:
if view.get('_id') == view_id:
current_view = view
break

if not current_view:
set_invalid(task_id, db_session)
return

table_name = current_table.get('name')
filter_conditions = {
"filters": current_view.get('filters') or [],
"filter_conjunction": current_view.get('filter_conjunction') or 'And',
}

columns = seatable.list_columns(table_name)
sql_generator = BaseSQLGenerator(table_name, columns, filter_conditions=filter_conditions)
where_clause = sql_generator.get_where_clause()
seatable.archive_view(table_name, where_clause)

except Exception as e:
logger.error(e)
set_invalid(task_id, db_session)
return

update_last_run_time(task_id, db_session)
101 changes: 101 additions & 0 deletions dtable_events/big_data/big_data_auto_archive_scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import logging
from datetime import datetime, timedelta
from threading import Thread

from apscheduler.schedulers.blocking import BlockingScheduler
from dtable_events.big_data.auto_archive_utils import run_dtable_auto_archive_task
from dtable_events.db import init_db_session_class
from dtable_events.utils import get_opt_from_conf_or_env, parse_bool


__all__ = [
'DTableAutoArchiveTaskScanner',
]


class DTableAutoArchiveTaskScanner(object):

def __init__(self, config):
self._enabled = True
self._parse_config(config)
self._db_session_class = init_db_session_class(config)

def _parse_config(self, config):
"""parse send email related options from config file
"""
section_name = 'AUTOARCHIVE-SCANNER'
key_enabled = 'enabled'

if not config.has_section(section_name):
section_name = 'AUTOARCHIVE SCANNER'
if not config.has_section(section_name):
return

# enabled
enabled = get_opt_from_conf_or_env(config, section_name, key_enabled, default=True)
enabled = parse_bool(enabled)
self._enabled = enabled

def start(self):
if not self.is_enabled():
logging.warning('Can not start big data auto archive scanner: it is not enabled!')
return

logging.info('Start big data auto archive scanner')

DTableAutoArchiveTaskScannerTimer(self._db_session_class).start()

def is_enabled(self):
return self._enabled


def scan_auto_archive_tasks(db_session):
sql = '''
SELECT `bdar`.`id`, `run_condition`, `table_id`, `view_id`, `last_run_time`, `dtable_uuid`, `details`, bdar.`creator` FROM dtable_auto_archive_task bdar
JOIN dtables d ON bdar.dtable_uuid=d.uuid
WHERE ((run_condition='per_day' AND (last_run_time<:per_day_check_time OR last_run_time IS NULL))
OR (run_condition='per_week' AND (last_run_time<:per_week_check_time OR last_run_time IS NULL))
OR (run_condition='per_month' AND (last_run_time<:per_month_check_time OR last_run_time IS NULL)))
AND bdar.is_valid=1 AND d.deleted=0
'''
per_day_check_time = datetime.now() - timedelta(hours=23)
per_week_check_time = datetime.now() - timedelta(days=6)
per_month_check_time = datetime.now() - timedelta(days=29)

tasks = db_session.execute(sql, {
'per_day_check_time': per_day_check_time,
'per_week_check_time': per_week_check_time,
'per_month_check_time': per_month_check_time
})

for task in tasks:
try:
run_dtable_auto_archive_task(task, db_session)
except Exception as e:
logging.exception(e)
logging.error(f'check task failed. {task}, error: {e}')
db_session.commit()


class DTableAutoArchiveTaskScannerTimer(Thread):

def __init__(self, db_session_class):
super(DTableAutoArchiveTaskScannerTimer, self).__init__()
self.db_session_class = db_session_class

def run(self):
sched = BlockingScheduler()
# fire at every hour in every day of week
@sched.scheduled_job('cron', day_of_week='*', hour='*')
def timed_job():
logging.info('Starts to auto archive...')

db_session = self.db_session_class()
try:
scan_auto_archive_tasks(db_session)
except Exception as e:
logging.exception('error when scanning big data auto archives: %s', e)
finally:
db_session.close()

sched.start()
9 changes: 9 additions & 0 deletions dtable_events/utils/dtable_server_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,3 +411,12 @@ def batch_send_notification(self, user_msg_list):
}
response = requests.post(url, json=body, headers=self.headers)
return parse_response(response)

def archive_view(self, table_name, where_clause=""):
url = self.dtable_server_url + '/api/v1/dtables/' + self.dtable_uuid + '/archive-view/?from=dtable_events'
json_data = {
'table_name': table_name,
'where': where_clause,
}
response = requests.post(url, json=json_data, headers=self.headers, timeout=self.timeout)
return parse_response(response)
8 changes: 7 additions & 1 deletion dtable_events/utils/sql_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,7 @@ def _groupfilter2sql(self):
group_conjunction_split.join(group_string_list)
)

def _filter2sql(self):
def _filter2sql(self, include_where_string=True):
filter_conditions = self.filter_conditions
filters = filter_conditions.get('filters', [])
filter_conjunction = filter_conditions.get('filter_conjunction', 'And')
Expand Down Expand Up @@ -1692,6 +1692,9 @@ def _filter2sql(self):
)
else:
return ''

if not include_where_string:
return "%s" % filter_content
return "%s%s" % (
filter_header,
filter_content
Expand Down Expand Up @@ -1733,6 +1736,9 @@ def to_sql(self, by_group=False):
sql = "%s %s" % (sql, limit_clause)
return sql

def get_where_clause(self, include_where_string=False):
return self._filter2sql(include_where_string=include_where_string)


class LinkRecordsSQLGenerator(object):

Expand Down