IAGOS.apps.workflow.jobs

Overview

Jobs & Tasks

The PI has the opportunity to define jobs that should be executed periodically at a fixed time. Therefore the PI has to set the frequency and the interval between the executions. The system supports hourly, daily, and weekly jobs. If the job should run every second hour, then the PI has to set the frequency to hourly and the interval to two. Each job has several TaskManagers. A manager is dedicated to handling one kind of task. For example, a job could have two task managers—the first manager imports the metadata (e.g., instruments data), and the second the series. To resolve dependencies, the PI can order the jobs and the managers. During the execution of a job, the task manager will first collect all tasks and store them in a queue. After it, the tasks will be executed by the managers.

../_images/jobs.svg

Execution of a task

  1. Request data that should be imported (TransferHandler)

  2. Read the data and parse it to an interpretable structure (Reader) After reading import and process the data and perform some basic tests for flagging (Importer)

  3. Perform advanced tests

  4. Inform PI that checks the plausibility of the data. Can change the evaluation method and rerun the data processing

  5. Prepare the export data (Exporter) and write the data to a specific format (Writer)

  6. Export the data to an external system (TransferHandler)

../_images/tasks.svg

Job

class IAGOS.apps.workflow.jobs.models.Job(*args, **kwargs)

Bases: django.db.models.base.Model

This class implements the table job. A job could have several task managers that check for new tasks and execute them. Each job could be performed once or frequently. If the job is run repeatedly, the user has to define a period (e.g., every hour). Furthermore, a responsible user has to be assigned to the job. If there are new events or problems, the application will inform the user.

Parameters
  • name (string) – Name of the job

  • start (datetime) – Start date of the job

  • end (datetime) – End date of the job

  • user (User) – Responsible user of the job

  • frequency (string) – Frequeny of execution

  • interval (integer) – Interval of the execution

  • notification (string) – Notification option (e.g. always get informed)

  • order (integer) – Order of the job

  • rules (ManyToManyField) – Job rules

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

get_tasks()

Checks for new tasks and returns them.

Returns

Tasks

Return type

List(Task)

notify()

This method informs the PI about new events.

Returns

None

Return type

None

property pis

Returns the PI and the CO-PIs in a list.

Returns

All PIs of the Job

Return type

List(User)

run()

Marks that the job is running.

Returns

None

Return type

None

property running

Returns if the job is running.

Returns

True if job is running, False otherwise.

Return type

bool

stop()

Marks that the jobs is not running.

Returns

None

Return type

None

JobRule

class IAGOS.apps.workflow.jobs.models.JobRule(*args, **kwargs)

Bases: django.db.models.base.Model

This class implements the table job_rule. A rule can be executed after the execution of tasks or manually. Since the tasks are isolated, they can not recognize relations between each other. For example, if you process the NOx data and want to perform the nighttime correction, you have to import and process the NO data of a complete deployment first. After it, you can calculate the nighttime offset and reprocess the executed tasks again with the offset. To solve this kind of problem in an automatic way, you can implement rules for it.

Parameters
  • name (string) – Name of the rule

  • description (string) – Description of the rule

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

handle_deployment(deployment_id)

This method handles the task of the given deployment.

Parameters

deployment_id (integer) – ID of the deployment

Returns

None

Return type

None

handle_job_execution(tasks)

This methods handle the tasks of a job exection.

Parameters

tasks (List(Task)) – Tasks

Returns

None

Return type

None

load_instance()

Loads the reader with the name. If the rule is already loaded, the method do not load the instance again.

Returns

None

Return type

None

Task

class IAGOS.apps.workflow.jobs.models.Task(*args, **kwargs)

Bases: django.db.models.base.Model

This class implements the table task. An instance of the class can perform a task and stores the results. Moreover, a task can be reprocessed.

Parameters
  • name (string) – Name of the Task

  • task_manager (TaskManager) – Task manager that created the task

  • comments (string) – Additional comments

  • addition (string) – Additional informations (JSON)

  • start (datetime) – Start of task

  • end (datetime) – End of the task

  • source (string) – Source of the data (e.g. file, request-cmd)

  • destination (string) – Destionation of the export

  • imported_series (ManyToManyField) – Imported series

  • exported_series (ManyToManyField) – Exported series

  • process_status (models.ManyToManyField) – Several status of the process (e.g. missing data)

  • task_status (Status) – Status of the task (e.g. completed)

  • error (bool) – True if an error occured, False otherwise

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

check_all_exported()

Checks if all exporter exported the data. If so, the status will set to Exporter.

Returns

True if all data is exported, otherwise False

Return type

bool

check_export_info(manager)

Checks if the manager is already assigned to an export information. If not, an instance will be created for it.

Parameters

manager (ExportManager) – Export manager

Returns

ExportInfo

Return type

ExportInfo

classmethod copy(task)

Copies the given task properties.

Parameters

task (Task) – Task that should be copied.

Returns

Copied task

Return type

Task

deny()

Release the data.

Returns

None

Return type

None

export_data(manager, mark_running=True, force_export=False)

Exports the data with the given manager.

Parameters
  • manager (ExportManager) – ExportManager

  • mark_running (bool) – Marks that the task is running (default: True)

get_level()

Returns the level of the data.

Returns

None

Return type

None

release()

Release the data.

Returns

None

Return type

None

reprocess()

Reprocess the data.

Returns

None

Return type

None

run()

This method runs the task. The first step is to import the data and process it. After the data is processed, perform the quality checks. After the quality checks, the method will check if the PI has to release the data or not. If the PI has to confirm the release, the runs stop with the status pending. Otherwise, the data will be exported if configured.

Returns

None

Return type

None

run_export()

Runs the export.

Parameters

level (int) – Data level of the import data

Returns

True if export was successful, otherwise False.

Return type

bool

run_import()

Runs the import.

Returns

True if import faild, False otherwise.

Return type

bool

TaskManager

class IAGOS.apps.workflow.jobs.models.TaskManager(*args, **kwargs)

Bases: django.db.models.base.Model

This class implements the table task_manager. An instance of the class manages tasks. A task manager combines an ImportManager, a QAQCContainer and an ExportManager.

Execution Pipeline:
  1. Transfer the data (TransferHandler)

  2. Parse and import the data (ImportManager)

  3. Perform the QA/QC checks (QAQCContainer)

  4. Export the data (ExportManager)

Parameters
  • job (Job) – Related job

  • name (string) – Name of the task manager

  • import_manager (ImportManager) – Manages the import of the data

  • qaqc_container (QAQCContainer) – QA/QC components

  • export_manager (ExportManager) – Manager to export the data

  • last_execution (datetime) – Timestamp of the last execution

  • order (integer) – Order number

  • execution_stop (BooleanField) – Stop execution of the tasks

exception DoesNotExist

Bases: django.core.exceptions.ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: django.core.exceptions.MultipleObjectsReturned

get_tasks()

This method checks if new tasks exist and return them. Therefore the import manager connects with the external system and checks for new sources that changed after the last execution. If the task manager checks for the first time for new sources, all sources will be considered.

Returns

Tasks

Return type

List(Task)