Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
578 changes: 331 additions & 247 deletions dtable_events/common_dataset/common_dataset_sync_utils.py

Large diffs are not rendered by default.

181 changes: 80 additions & 101 deletions dtable_events/common_dataset/common_dataset_syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@

from dtable_events import init_db_session_class
from dtable_events.app.config import DTABLE_PRIVATE_KEY
from dtable_events.common_dataset.common_dataset_sync_utils import import_or_sync, set_common_dataset_invalid, set_common_dataset_sync_invalid
from dtable_events.common_dataset.common_dataset_sync_utils import import_sync_CDS, set_common_dataset_invalid, set_common_dataset_sync_invalid
from dtable_events.utils import get_opt_from_conf_or_env, parse_bool, uuid_str_to_36_chars, get_inner_dtable_server_url

logger = logging.getLogger(__name__)
from dtable_events.utils.dtable_server_api import DTableServerAPI

class CommonDatasetSyncer(object):

Expand Down Expand Up @@ -54,105 +53,91 @@ def get_dtable_server_header(dtable_uuid):
algorithm='HS256'
)
except Exception as e:
logger.error(e)
logging.error(e)
return
return {'Authorization': 'Token ' + access_token}


def gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_id, dst_table_id, dataset_sync_id, dataset_id, db_session):
def gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_id, dst_table_id, to_archive, dataset_sync_id, dataset_id, db_session):
"""
return assets -> dict
"""
dst_headers = get_dtable_server_header(dst_dtable_uuid)
src_headers = get_dtable_server_header(src_dtable_uuid)

# request src_dtable
dtable_server_url = get_inner_dtable_server_url()
url = dtable_server_url.strip('/') + '/dtables/' + src_dtable_uuid + '?from=dtable_events'

src_dtable_server_api = DTableServerAPI('dtable-events', src_dtable_uuid, dtable_server_url)
dst_dtable_server_api = DTableServerAPI('dtable-events', dst_dtable_uuid, dtable_server_url)
try:
resp = requests.get(url, headers=src_headers)
src_dtable_json = resp.json()
src_dtable_metadata = src_dtable_server_api.get_metadata()
dst_dtable_metadata = dst_dtable_server_api.get_metadata()
except Exception as e:
logger.error('request src dtable: %s error: %s', src_dtable_uuid, e)
return
logging.error('request src dst dtable: %s, %s metadata error: %s', src_dtable_uuid, dst_dtable_uuid, e)
return None

# check src_table src_view
src_table = None
for table in src_dtable_json.get('tables', []):
if table.get('_id') == src_table_id:
dst_enable_archive = (dst_dtable_metadata.get('settings') or {}).get('enable_archive', False)
if to_archive and not dst_enable_archive:
set_common_dataset_invalid(dataset_id, db_session)
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.error('To archive sync but destination dtable disable archive')
return None

src_table, src_view = None, None
for table in src_dtable_metadata.get('tables', []):
if table['_id'] == src_table_id:
src_table = table
break
if not src_table:
set_common_dataset_invalid(dataset_id, db_session)
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.error('Source table not found.')
return
return None
for view in src_table.get('views', []):
if view['_id'] == src_view_id:
src_view = view
break
if not src_view:
set_common_dataset_invalid(dataset_id, db_session)
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.error('Source view not found.')
return None

src_view = None
if src_view_id:
for view in src_table.get('views', []):
if view.get('_id') == src_view_id:
src_view = view
break
if not src_view:
src_columns = [col for col in src_table.get('columns', []) if col['key'] not in src_view.get('hidden_columns', [])]

src_enable_archive = (src_dtable_metadata.get('settings') or {}).get('enable_archive', False)
src_version = src_dtable_metadata.get('version')

dst_table = None
if dst_table_id:
dst_enable_archive = dst_dtable_metadata.get('settings', {}).get('enable_archive', False)
if to_archive and not dst_enable_archive:
set_common_dataset_invalid(dataset_id, db_session)
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.error('Source view not found.')
return
else:
views = src_table.get('views', [])
if not views or not isinstance(views, list):
logging.error('Destination dtable disable archive')
return None
for table in dst_dtable_metadata.get('tables', []):
if table['_id'] == dst_table_id:
dst_table = table
break
if not dst_table:
set_common_dataset_invalid(dataset_id, db_session)
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.error('No views found.')
return
src_view = views[0]

# get src columns
src_view_hidden_columns = src_view.get('hidden_columns', [])
if not src_view_hidden_columns:
src_columns = src_table.get('columns', [])
else:
src_columns = [col for col in src_table.get('columns', []) if col.get('key') not in src_view_hidden_columns]

# request dst_dtable
url = dtable_server_url.strip('/') + '/dtables/' + dst_dtable_uuid + '?from=dtable_events'
try:
resp = requests.get(url, headers=dst_headers)
dst_dtable_json = resp.json()
except Exception as e:
logging.error('request dst dtable: %s error: %s', dst_dtable_uuid, e)
return

# check dst_table
dst_table = None
for table in dst_dtable_json.get('tables', []):
if table.get('_id') == dst_table_id:
dst_table = table
break
if not dst_table:
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.warning('Destination table: %s not found.' % dst_table_id)
return
logging.error('Destination table not found.')
return None

return {
'dst_headers': dst_headers,
'src_headers': src_headers,
'src_table': src_table,
'src_view': src_view,
'src_table_name': src_table['name'],
'src_view_name': src_view['name'],
'src_view_type': src_view.get('type', 'table'),
'src_columns': src_columns,
'dst_columns': dst_table.get('columns'),
'dst_rows': dst_table.get('rows'),
'dst_table_name': dst_table.get('name'),
'src_version': src_dtable_json.get('version')
'src_enable_archive': src_enable_archive,
'src_version': src_version,
'dst_table_name': dst_table['name'] if dst_table else None,
'dst_columns': dst_table['columns'] if dst_table else None
}


def list_pending_common_dataset_syncs(db_session):
sql = '''
SELECT dcds.dst_dtable_uuid, dcds.dst_table_id, dcd.table_id AS src_table_id, dcd.view_id AS src_view_id,
dcd.dtable_uuid AS src_dtable_uuid, dcds.id AS sync_id, dcds.src_version, dcd.id
dcd.dtable_uuid AS src_dtable_uuid, dcds.id AS sync_id, dcds.src_version, dcd.id, dcds.is_to_archive
FROM dtable_common_dataset dcd
INNER JOIN dtable_common_dataset_sync dcds ON dcds.dataset_id=dcd.id
INNER JOIN dtables d_src ON dcd.dtable_uuid=d_src.uuid AND d_src.deleted=0
Expand Down Expand Up @@ -201,67 +186,61 @@ def check_common_dataset(db_session):
dataset_sync_id = dataset_sync[5]
last_src_version = dataset_sync[6]
dataset_id = dataset_sync[7]
is_to_archive = dataset_sync[8]

assets = gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_id, dst_table_id, dataset_sync_id, dataset_id, db_session)
# assets = gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_id, dst_table_id, dataset_sync_id, dataset_id, db_session)
assets = gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_id, dst_table_id, is_to_archive, dataset_sync_id, dataset_id, db_session)

if not assets:
continue

dst_headers = assets.get('dst_headers')
src_table = assets.get('src_table')
src_view = assets.get('src_view')
src_columns = assets.get('src_columns')
src_headers = assets.get('src_headers')
dst_columns = assets.get('dst_columns')
dst_rows = assets.get('dst_rows')
dst_table_name = assets.get('dst_table_name')
dtable_src_version = assets.get('src_version')

if dtable_src_version == last_src_version:
if assets.get('src_version') == last_src_version:
continue

try:
result = import_or_sync({
'dst_dtable_uuid': dst_dtable_uuid,
result = import_sync_CDS({
'src_dtable_uuid': src_dtable_uuid,
'src_rows': src_table.get('rows', []),
'src_columns': src_columns,
'src_table_name': src_table.get('name'),
'src_view_name': src_view.get('name'),
'src_headers': src_headers,
'dst_dtable_uuid': dst_dtable_uuid,
'src_table_name': assets.get('src_table_name'),
'src_view_name': assets.get('src_view_name'),
'src_view_type': assets.get('src_view_type'),
'src_columns': assets.get('src_columns'),
'src_enable_archive': assets.get('src_enable_archive'),
'src_version': assets.get('src_version'),
'dst_table_id': dst_table_id,
'dst_table_name': dst_table_name,
'dst_headers': dst_headers,
'dst_columns': dst_columns,
'dst_rows': dst_rows,
'lang': 'en' # TODO: lang
'dst_table_name': assets.get('dst_table_name'),
'dst_columns': assets.get('dst_columns'),
'operator': 'dtable-events',
'lang': 'en', # TODO: lang
'dataset_id': dataset_id,
'to_archive': is_to_archive
})
except Exception as e:
logger.error('sync common dataset error: %s', e)
logging.error('sync common dataset error: %s', e)
continue
else:
if result.get('error_msg'):
logger.error(result['error_msg'])
logging.error(result['error_msg'])
if result.get('error_type') == 'generate_synced_columns_error':
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
continue

dataset_update_map[dataset_sync_id] = dtable_src_version
dataset_update_map[dataset_sync_id] = assets.get('src_version')
sync_count += 1

if sync_count == 1000:
try:
update_sync_time_and_version(db_session, dataset_update_map)
except Exception as e:
logger.error(f'update sync time and src_version failed, error: {e}')
logging.error(f'update sync time and src_version failed, error: {e}')
dataset_update_map = {}
sync_count = 0

if dataset_update_map:
try:
update_sync_time_and_version(db_session, dataset_update_map)
except Exception as e:
logger.error(f'update sync time and src_version failed, error: {e}')
logging.error(f'update sync time and src_version failed, error: {e}')


class CommonDatasetSyncerTimer(Thread):
Expand All @@ -279,7 +258,7 @@ def timed_job():
try:
check_common_dataset(db_session)
except Exception as e:
logger.exception('check periodcal common dataset syncs error: %s', e)
logging.exception('check periodcal common dataset syncs error: %s', e)
finally:
db_session.close()

Expand Down
Loading