diff --git a/dpdispatcher/contexts/ssh_context.py b/dpdispatcher/contexts/ssh_context.py index 66440928..0094c278 100644 --- a/dpdispatcher/contexts/ssh_context.py +++ b/dpdispatcher/contexts/ssh_context.py @@ -178,7 +178,7 @@ def _setup_ssh(self): key = pkey_class.from_private_key_file( key_path, self.passphrase ) - except paramiko.SSHException as e: + except paramiko.SSHException: pass if key is not None: break @@ -200,7 +200,7 @@ def _setup_ssh(self): for pkey_class, filename in keyfiles: try: key = pkey_class.from_private_key_file(filename, self.passphrase) - except paramiko.SSHException as e: + except paramiko.SSHException: pass if key is not None: break diff --git a/dpdispatcher/submission.py b/dpdispatcher/submission.py index 59376430..1a394807 100644 --- a/dpdispatcher/submission.py +++ b/dpdispatcher/submission.py @@ -167,16 +167,14 @@ def serialize(self, if_static=False): def register_task(self, task): if self.belonging_jobs: raise RuntimeError( - "Not allowed to register tasks after generating jobs. " - f"submission hash error {self}" + f"Not allowed to register tasks after generating jobs. submission hash error {self}" ) self.belonging_tasks.append(task) def register_task_list(self, task_list): if self.belonging_jobs: raise RuntimeError( - "Not allowed to register tasks after generating jobs. " - f"submission hash error {self}" + f"Not allowed to register tasks after generating jobs. submission hash error {self}" ) self.belonging_tasks.extend(task_list) @@ -219,9 +217,9 @@ def run_submission( self.try_recover_from_json() self.update_submission_state() if self.check_all_finished(): - dlog.info("info:check_all_finished: True") + dlog.info("check_all_finished: True") else: - dlog.info("info:check_all_finished: False") + dlog.info("check_all_finished: False") self.upload_jobs() if dry_run is True: dlog.info(f"submission succeeded: {self.submission_hash}") @@ -342,7 +340,7 @@ def update_submission_state(self): continue job.get_job_state() dlog.debug( - f"debug:update_submission_state: job: {job.job_hash}, {job.job_id}, {job.job_state}" + f"update_submission_state: job: {job.job_hash}, {job.job_id}, {job.job_state}" ) def handle_unexpected_submission_state(self): @@ -818,7 +816,7 @@ def get_job_state(self): this method will not submit or resubmit the jobs if the job is unsubmitted. """ dlog.debug( - f"debug:query database; self.job_hash:{self.job_hash}; self.job_id:{self.job_id}" + f"query database; self.job_hash:{self.job_hash}; self.job_id:{self.job_id}" ) assert self.machine is not None job_state = self.machine.check_status(self) @@ -838,7 +836,7 @@ def handle_unexpected_job_state(self): if job_state == JobStatus.terminated: self.fail_count += 1 dlog.info( - f"job: {self.job_hash} {self.job_id} terminated; " + f"job {self.job_hash} {self.job_id} terminated; " f"fail_cout is {self.fail_count}; resubmitting job" ) retry_count = 3 @@ -848,7 +846,7 @@ def handle_unexpected_job_state(self): if (self.fail_count) > 0 and (self.fail_count % retry_count == 0): last_error_message = self.get_last_error_message() err_msg = ( - f"job:{self.job_hash} {self.job_id} failed {self.fail_count} times." + f"job {self.job_hash} {self.job_id} failed {self.fail_count} times." ) if last_error_message is not None: err_msg += f"\nPossible remote error message: {last_error_message}" @@ -856,24 +854,24 @@ def handle_unexpected_job_state(self): self.submit_job() if self.job_state != JobStatus.unsubmitted: dlog.info( - f"job:{self.job_hash} re-submit after terminated; new job_id is {self.job_id}" + f"job {self.job_hash} re-submit after terminated; new job_id is {self.job_id}" ) time.sleep(0.2) self.get_job_state() dlog.info( - f"job:{self.job_hash} job_id:{self.job_id} after re-submitting; the state now is {repr(self.job_state)}" + f"job {self.job_hash} job_id:{self.job_id} after re-submitting; the state now is {repr(self.job_state)}" ) self.handle_unexpected_job_state() if self.resources.wait_time != 0: time.sleep(self.resources.wait_time) if job_state == JobStatus.unsubmitted: - dlog.debug(f"job: {self.job_hash} unsubmitted; submit it") + dlog.debug(f"job {self.job_hash} unsubmitted; submit it") # if self.fail_count > 3: # raise RuntimeError("job:job {job} failed 3 times".format(job=self)) self.submit_job() if self.job_state != JobStatus.unsubmitted: - dlog.info(f"job: {self.job_hash} submit; job_id is {self.job_id}") + dlog.info(f"job {self.job_hash} was submitted; job_id is {self.job_id}") if self.resources.wait_time != 0: time.sleep(self.resources.wait_time) # self.get_job_state() diff --git a/dpdispatcher/utils/dpcloudserver/client.py b/dpdispatcher/utils/dpcloudserver/client.py index 23eab8a0..61818922 100644 --- a/dpdispatcher/utils/dpcloudserver/client.py +++ b/dpdispatcher/utils/dpcloudserver/client.py @@ -141,7 +141,7 @@ def _get_oss_bucket(self, endpoint, bucket_name): # res = get("/tools/sts_token", {}) res = self.get("/data/get_sts_token", {}) # print('debug>>>>>>>>>>>>>', res) - dlog.debug(f"debug: _get_oss_bucket: res:{res}") + dlog.debug(f"_get_oss_bucket: res:{res}") auth = oss2.StsAuth( # type: ignore[reportPossiblyUnboundVariable] res["AccessKeyId"], res["AccessKeySecret"], res["SecurityToken"] ) @@ -149,7 +149,7 @@ def _get_oss_bucket(self, endpoint, bucket_name): def download(self, oss_file, save_file, endpoint, bucket_name): bucket = self._get_oss_bucket(endpoint, bucket_name) - dlog.debug(f"debug: download: oss_file:{oss_file}; save_file:{save_file}") + dlog.debug(f"download: oss_file:{oss_file}; save_file:{save_file}") bucket.get_object_to_file(oss_file, save_file) return save_file @@ -180,7 +180,7 @@ def download_from_url(self, url, save_file): def upload(self, oss_task_zip, zip_task_file, endpoint, bucket_name): dlog.debug( - f"debug: upload: oss_task_zip:{oss_task_zip}; zip_task_file:{zip_task_file}" + f"upload: oss_task_zip:{oss_task_zip}; zip_task_file:{zip_task_file}" ) bucket = self._get_oss_bucket(endpoint, bucket_name) total_size = os.path.getsize(zip_task_file)