Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions dtable_events/common_dataset/common_dataset_sync_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import json
import logging
import re
from copy import deepcopy
Expand Down Expand Up @@ -233,7 +234,7 @@ def generate_synced_columns(src_columns, dst_columns=None):
to_be_updated_columns.append(dst_col)
else:
if dst_column_name_dict.get(col.get('name')):
return None, None, 'Column %s exists' % (col.get('name'),)
return None, None, {'error_msg': 'Column %s exists' % (col.get('name'),), 'error_detail': {'column_name': col.get('name')}}
to_be_appended_columns.append(col)
return to_be_updated_columns, to_be_appended_columns, None

Expand Down Expand Up @@ -697,7 +698,8 @@ def import_sync_CDS(context):
return {
'dst_table_id': None,
'error_type': 'generate_synced_columns_error',
'error_msg': str(error), # generally, this error is caused by client
'error_msg': str(error.get('error_msg')), # generally, this error is caused by client
'error_detail': error.get('error_detail'),
'task_status_code': 400
}
final_columns = (to_be_updated_columns or []) + (to_be_appended_columns or [])
Expand Down Expand Up @@ -875,10 +877,15 @@ def set_common_dataset_invalid(dataset_id, db_session):
logger.error('set state of common dataset: %s error: %s', dataset_id, e)


def set_common_dataset_sync_invalid(dataset_sync_id, db_session):
sql = "UPDATE dtable_common_dataset_sync SET is_valid=0 WHERE id=:dataset_sync_id"
def set_common_dataset_sync_invalid(dataset_sync_id, db_session, invalid_detail=None):
if invalid_detail:
sql = "UPDATE dtable_common_dataset_sync SET is_valid=0, invalid_detail=:invalid_detail WHERE id=:dataset_sync_id"
params = {'dataset_sync_id': dataset_sync_id, 'invalid_detail': json.dumps(invalid_detail)}
else:
sql = "UPDATE dtable_common_dataset_sync SET is_valid=0 WHERE id=:dataset_sync_id"
params = {'dataset_sync_id': dataset_sync_id}
try:
db_session.execute(sql, {'dataset_sync_id': dataset_sync_id})
db_session.execute(sql, params)
db_session.commit()
except Exception as e:
logger.error('set state of common dataset sync: %s error: %s', dataset_sync_id, e)
43 changes: 32 additions & 11 deletions dtable_events/common_dataset/common_dataset_syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,21 @@ def gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_
src_table = table
break
if not src_table:
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
invalid_detail = {
'error_type': 'source_table_not_found'
}
set_common_dataset_sync_invalid(dataset_sync_id, db_session, invalid_detail)
logging.error('Source table not found.')
return None
for view in src_table.get('views', []):
if view['_id'] == src_view_id:
src_view = view
break
if not src_view:
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
invalid_detail = {
'error_type': 'source_view_not_found'
}
set_common_dataset_sync_invalid(dataset_sync_id, db_session, invalid_detail)
logging.error('Source view not found.')
return None

Expand All @@ -101,7 +107,10 @@ def gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_
dst_table = table
break
if not dst_table:
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
invalid_detail = {
'error_type': 'destination_table_not_found'
}
set_common_dataset_sync_invalid(dataset_sync_id, db_session, invalid_detail)
logging.error('Destination table not found.')
return None

Expand Down Expand Up @@ -178,22 +187,34 @@ def check_common_dataset(session_class):
continue
else:
if result.get('error_msg'):
logging.error(result['error_msg'])
if result.get('error_type') == 'generate_synced_columns_error':
logging.warning('src_dtable_uuid: %s src_table_id: %s src_view_id: %s dst_dtable_uuid: %s dst_table_id: %s generate sync-columns error: %s',
src_dtable_uuid, src_table_id, src_view_id, dst_dtable_uuid, dst_table_id, result)
invalid_detail = {
'error_type': 'generate_synced_columns_error',
'error_msg': result.get('error_msg'),
'error_detail': result.get('error_detail'),
}
with session_class() as db_session:
set_common_dataset_sync_invalid(dataset_sync_id, db_session, invalid_detail=invalid_detail)
else:
logging.error('src_dtable_uuid: %s src_table_id: %s src_view_id: %s dst_dtable_uuid: %s dst_table_id: %s error: %s',
src_dtable_uuid, src_table_id, src_view_id, dst_dtable_uuid, dst_table_id, result)
continue
sql = '''
UPDATE dtable_common_dataset_sync SET last_sync_time=:last_sync_time, src_version=:src_version
WHERE id=:id
'''
with session_class() as db_session:
db_session.execute(sql, {
'last_sync_time': datetime.now(),
'src_version': assets.get('src_version'),
'id': dataset_sync_id
})
db_session.commit()
try:
db_session.execute(sql, {
'last_sync_time': datetime.now(),
'src_version': assets.get('src_version'),
'id': dataset_sync_id
})
db_session.commit()
except Exception as e:
logging.exception('update sync: %s last_sync_time error: %s', dataset_sync_id, e)


class CommonDatasetSyncerTimer(Thread):
Expand All @@ -204,7 +225,7 @@ def __init__(self, db_session_class):
def run(self):
sched = BlockingScheduler()
# fire at every hour in every day of week
@sched.scheduled_job('cron', day_of_week='*', hour='*')
@sched.scheduled_job('cron', day_of_week='*', hour='*', minute='31')
def timed_job():
logging.info('Starts to scan common dataset syncs...')
try:
Expand Down
15 changes: 14 additions & 1 deletion dtable_events/dtable_io/import_sync_common_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ def sync_common_dataset(context, config):
sync_dataset = list(sync_dataset)
if sync_dataset:
dtable_io_logger.debug('sync_dataset: %s', sync_dataset[0])
sql = '''
UPDATE dtable_common_dataset_sync SET is_valid=1, invalid_detail=NULL
WHERE dataset_id=:dataset_id AND dst_dtable_uuid=:dst_dtable_uuid AND dst_table_id=:dst_table_id
'''
try:
db_session.execute(sql, {
'dst_dtable_uuid': uuid_str_to_32_chars(dst_dtable_uuid),
'dst_table_id': dst_table_id,
'dataset_id': dataset_id
})
db_session.commit()
except Exception as e:
dtable_io_logger.error('reset dataset sync is_valid, invalid_detail error: %s', e)
return

try:
Expand Down Expand Up @@ -107,7 +120,7 @@ def sync_common_dataset(context, config):

sql = '''
UPDATE dtable_common_dataset_sync SET
last_sync_time=:last_sync_time, src_version=:last_src_version
last_sync_time=:last_sync_time, src_version=:last_src_version, is_valid=1, invalid_detail=NULL
WHERE dataset_id=:dataset_id AND dst_dtable_uuid=:dst_dtable_uuid AND dst_table_id=:dst_table_id
'''
try:
Expand Down