diff --git a/dtable_events/common_dataset/common_dataset_sync_utils.py b/dtable_events/common_dataset/common_dataset_sync_utils.py index 4b5e8380..2db833ec 100644 --- a/dtable_events/common_dataset/common_dataset_sync_utils.py +++ b/dtable_events/common_dataset/common_dataset_sync_utils.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +import json import logging import re from copy import deepcopy @@ -233,7 +234,7 @@ def generate_synced_columns(src_columns, dst_columns=None): to_be_updated_columns.append(dst_col) else: if dst_column_name_dict.get(col.get('name')): - return None, None, 'Column %s exists' % (col.get('name'),) + return None, None, {'error_msg': 'Column %s exists' % (col.get('name'),), 'error_detail': {'column_name': col.get('name')}} to_be_appended_columns.append(col) return to_be_updated_columns, to_be_appended_columns, None @@ -697,7 +698,8 @@ def import_sync_CDS(context): return { 'dst_table_id': None, 'error_type': 'generate_synced_columns_error', - 'error_msg': str(error), # generally, this error is caused by client + 'error_msg': str(error.get('error_msg')), # generally, this error is caused by client + 'error_detail': error.get('error_detail'), 'task_status_code': 400 } final_columns = (to_be_updated_columns or []) + (to_be_appended_columns or []) @@ -875,10 +877,15 @@ def set_common_dataset_invalid(dataset_id, db_session): logger.error('set state of common dataset: %s error: %s', dataset_id, e) -def set_common_dataset_sync_invalid(dataset_sync_id, db_session): - sql = "UPDATE dtable_common_dataset_sync SET is_valid=0 WHERE id=:dataset_sync_id" +def set_common_dataset_sync_invalid(dataset_sync_id, db_session, invalid_detail=None): + if invalid_detail: + sql = "UPDATE dtable_common_dataset_sync SET is_valid=0, invalid_detail=:invalid_detail WHERE id=:dataset_sync_id" + params = {'dataset_sync_id': dataset_sync_id, 'invalid_detail': json.dumps(invalid_detail)} + else: + sql = "UPDATE dtable_common_dataset_sync SET is_valid=0 WHERE id=:dataset_sync_id" + params = {'dataset_sync_id': dataset_sync_id} try: - db_session.execute(sql, {'dataset_sync_id': dataset_sync_id}) + db_session.execute(sql, params) db_session.commit() except Exception as e: logger.error('set state of common dataset sync: %s error: %s', dataset_sync_id, e) diff --git a/dtable_events/common_dataset/common_dataset_syncer.py b/dtable_events/common_dataset/common_dataset_syncer.py index 328845e9..de11c232 100644 --- a/dtable_events/common_dataset/common_dataset_syncer.py +++ b/dtable_events/common_dataset/common_dataset_syncer.py @@ -77,7 +77,10 @@ def gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_ src_table = table break if not src_table: - set_common_dataset_sync_invalid(dataset_sync_id, db_session) + invalid_detail = { + 'error_type': 'source_table_not_found' + } + set_common_dataset_sync_invalid(dataset_sync_id, db_session, invalid_detail) logging.error('Source table not found.') return None for view in src_table.get('views', []): @@ -85,7 +88,10 @@ def gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_ src_view = view break if not src_view: - set_common_dataset_sync_invalid(dataset_sync_id, db_session) + invalid_detail = { + 'error_type': 'source_view_not_found' + } + set_common_dataset_sync_invalid(dataset_sync_id, db_session, invalid_detail) logging.error('Source view not found.') return None @@ -101,7 +107,10 @@ def gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_ dst_table = table break if not dst_table: - set_common_dataset_sync_invalid(dataset_sync_id, db_session) + invalid_detail = { + 'error_type': 'destination_table_not_found' + } + set_common_dataset_sync_invalid(dataset_sync_id, db_session, invalid_detail) logging.error('Destination table not found.') return None @@ -178,22 +187,34 @@ def check_common_dataset(session_class): continue else: if result.get('error_msg'): - logging.error(result['error_msg']) if result.get('error_type') == 'generate_synced_columns_error': logging.warning('src_dtable_uuid: %s src_table_id: %s src_view_id: %s dst_dtable_uuid: %s dst_table_id: %s generate sync-columns error: %s', src_dtable_uuid, src_table_id, src_view_id, dst_dtable_uuid, dst_table_id, result) + invalid_detail = { + 'error_type': 'generate_synced_columns_error', + 'error_msg': result.get('error_msg'), + 'error_detail': result.get('error_detail'), + } + with session_class() as db_session: + set_common_dataset_sync_invalid(dataset_sync_id, db_session, invalid_detail=invalid_detail) + else: + logging.error('src_dtable_uuid: %s src_table_id: %s src_view_id: %s dst_dtable_uuid: %s dst_table_id: %s error: %s', + src_dtable_uuid, src_table_id, src_view_id, dst_dtable_uuid, dst_table_id, result) continue sql = ''' UPDATE dtable_common_dataset_sync SET last_sync_time=:last_sync_time, src_version=:src_version WHERE id=:id ''' with session_class() as db_session: - db_session.execute(sql, { - 'last_sync_time': datetime.now(), - 'src_version': assets.get('src_version'), - 'id': dataset_sync_id - }) - db_session.commit() + try: + db_session.execute(sql, { + 'last_sync_time': datetime.now(), + 'src_version': assets.get('src_version'), + 'id': dataset_sync_id + }) + db_session.commit() + except Exception as e: + logging.exception('update sync: %s last_sync_time error: %s', dataset_sync_id, e) class CommonDatasetSyncerTimer(Thread): @@ -204,7 +225,7 @@ def __init__(self, db_session_class): def run(self): sched = BlockingScheduler() # fire at every hour in every day of week - @sched.scheduled_job('cron', day_of_week='*', hour='*') + @sched.scheduled_job('cron', day_of_week='*', hour='*', minute='31') def timed_job(): logging.info('Starts to scan common dataset syncs...') try: diff --git a/dtable_events/dtable_io/import_sync_common_dataset.py b/dtable_events/dtable_io/import_sync_common_dataset.py index c9f0a699..42049a15 100644 --- a/dtable_events/dtable_io/import_sync_common_dataset.py +++ b/dtable_events/dtable_io/import_sync_common_dataset.py @@ -68,6 +68,19 @@ def sync_common_dataset(context, config): sync_dataset = list(sync_dataset) if sync_dataset: dtable_io_logger.debug('sync_dataset: %s', sync_dataset[0]) + sql = ''' + UPDATE dtable_common_dataset_sync SET is_valid=1, invalid_detail=NULL + WHERE dataset_id=:dataset_id AND dst_dtable_uuid=:dst_dtable_uuid AND dst_table_id=:dst_table_id + ''' + try: + db_session.execute(sql, { + 'dst_dtable_uuid': uuid_str_to_32_chars(dst_dtable_uuid), + 'dst_table_id': dst_table_id, + 'dataset_id': dataset_id + }) + db_session.commit() + except Exception as e: + dtable_io_logger.error('reset dataset sync is_valid, invalid_detail error: %s', e) return try: @@ -107,7 +120,7 @@ def sync_common_dataset(context, config): sql = ''' UPDATE dtable_common_dataset_sync SET - last_sync_time=:last_sync_time, src_version=:last_src_version + last_sync_time=:last_sync_time, src_version=:last_src_version, is_valid=1, invalid_detail=NULL WHERE dataset_id=:dataset_id AND dst_dtable_uuid=:dst_dtable_uuid AND dst_table_id=:dst_table_id ''' try: