Source code for topchef.models.job

"""
Implements a job that maps to the 
"""
import json
from datetime import datetime
from typing import Optional
from uuid import UUID
from topchef.models.interfaces.job import Job as JobInterface
from ..database.models import Job as DatabaseJob
from ..database.models import JobStatus as DatabaseJobStatus
from ..json_type import JSON_TYPE as JSON


[docs]class Job(JobInterface): """ Contains an implementation of the job interface """ _DATABASE_JOB_STATUS_LOOKUP = { DatabaseJobStatus.REGISTERED: JobInterface.JobStatus.REGISTERED, DatabaseJobStatus.COMPLETED: JobInterface.JobStatus.COMPLETED, DatabaseJobStatus.WORKING: JobInterface.JobStatus.WORKING, DatabaseJobStatus.ERROR: JobInterface.JobStatus.ERROR } _MODEL_JOB_STATUS_LOOKUP = { JobInterface.JobStatus.REGISTERED: DatabaseJobStatus.REGISTERED, JobInterface.JobStatus.COMPLETED: DatabaseJobStatus.COMPLETED, JobInterface.JobStatus.WORKING: DatabaseJobStatus.WORKING, JobInterface.JobStatus.ERROR: DatabaseJobStatus.ERROR }
[docs] def __init__(self, database_job: DatabaseJob): """ :param database_job: The database model for the job """ self.db_model = database_job
@property def id(self) -> UUID: """ :return: The job ID """ return self.db_model.id @property def status(self) -> JobInterface.JobStatus: return self._DATABASE_JOB_STATUS_LOOKUP[self.db_model.status] @status.setter def status(self, new_status: JobInterface.JobStatus) -> None: self.db_model.status = self._MODEL_JOB_STATUS_LOOKUP[new_status] @property def parameters(self) -> JSON: return self.db_model.parameters @property def results(self) -> Optional[JSON]: return self.db_model.results @property def date_submitted(self) -> datetime: return self.db_model.date_submitted @results.setter def results(self, new_results: JSON) -> None: self._assert_json(new_results) self.db_model.results = new_results @property def parameter_schema(self) -> dict: return self.db_model.service.job_registration_schema @property def result_schema(self) -> dict: return self.db_model.service.job_result_schema def __hash__(self) -> int: return hash((self.__class__.__name__, self.id)) @staticmethod def _assert_json(json_to_set): try: json.loads(json.dumps(json_to_set)) except json.JSONDecodeError as error: raise ValueError( 'The input is not JSON', error )