Jobs#

LSTOSA job creation and submission using SLURM. There are several functions to handle the job processing levels keeping the history of all the different steps.

Reference/API#

Functions to handle the interaction with the job scheduler.

osa.job.are_all_jobs_correctly_finished(sequence_list)#

Check if all jobs are correctly finished by looking at the history file.

Parameters:
sequence_list: list

List of sequence objects

Returns:
flag: bool
osa.job.calibration_sequence_job_template(sequence)#

This file contains instruction to be submitted to job scheduler.

Parameters:
sequencesequence object
Returns:
job_templatestring
osa.job.check_history_level(history_file: Path, program_levels: dict)#

Check the history of the calibration sequence.

Parameters:
history_file: pathlib.Path

Path to the history file

program_levels: dict

Dictionary with the program name and the level of the program

Returns:
level: int

Level of the history file

exit_status: int

Exit status pf the program according to the history file

osa.job.data_sequence_job_template(sequence)#

This file contains instruction to be submitted to job scheduler.

Parameters:
sequencesequence object
Returns:
job_templatestring
osa.job.filter_jobs(job_info: DataFrame, sequence_list: Iterable)#

Filter the job info list to get the values of the jobs in the current queue.

osa.job.get_sacct_output(sacct_output: StringIO) DataFrame#

Fetch the information of jobs in the queue using the sacct SLURM output and store it in a pandas dataframe.

Returns:
queue_list: pd.DataFrame
osa.job.get_squeue_output(squeue_output: StringIO) DataFrame#

Obtain the current job information from squeue output and return a pandas dataframe.

osa.job.historylevel(history_file: Path, data_type: str)#

Returns the level from which the analysis should begin and the rc of the last executable given a certain history file.

Parameters:
history_file: pathlib.Path
data_type: str

Type of the sequence, either ‘DATA’ or ‘PEDCALIB’

Returns:
levelint
exit_statusint

Notes

Workflow for PEDCALIB sequences:
  • Creation of DRS4 pedestal file, level 2->1

  • Creation of charge calibration file, level 1->0

  • Sequence completed when reaching level 0

Workflow for DATA sequences:
  • R0->DL1, level 4->3

  • DL1->DL1AB, level 3->2

  • DATACHECK, level 2->1

  • DL1->DL2, level 1->0

  • Sequence completed when reaching level 0

osa.job.job_header_template(sequence)#

Returns a string with the job header template including SBATCH environment variables for sequencerXX.py script

Parameters:
sequence: sequence object
Returns:
header: str

String with job header template

osa.job.plot_job_statistics(sacct_output: DataFrame, directory: Path)#

Get statistics of the jobs. Check elapsed time used, the memory used, the number of jobs completed, the number of jobs failed, the number of jobs running, the number of jobs queued. It will fetch the information from the sacct output.

Parameters:
sacct_output: pd.DataFrame
directory: Path

Directory to save the plot.

osa.job.prepare_jobs(sequence_list)#

Prepare job file template for each sequence.

osa.job.run_sacct() StringIO#

Run sacct to obtain the job information.

osa.job.run_squeue() StringIO#

Run squeue command to get the status of the jobs.

osa.job.save_job_information()#

Write job information from sacct (elapsed time, memory used, number of completed, failed and running jobs in the queue) to a file.

osa.job.scheduler_env_variables(sequence, scheduler='slurm')#

Return the environment variables for the scheduler.

osa.job.sequence_filenames(sequence)#

Build names of the script, veto and history files.

osa.job.set_cache_dirs()#

Export cache directories for the jobs provided they are defined in the config file.

Returns:
content: string

String with the command to export the cache directories

osa.job.set_queue_values(sacct_info: DataFrame, squeue_info: DataFrame, sequence_list: Iterable) None#

Extract job info from sacct output and fetch them into the table of sequences.

Parameters:
sacct_info: pd.DataFrame
squeue_info: pd.DataFrame
sequence_list: list[Sequence object]
osa.job.submit_jobs(sequence_list, batch_command='sbatch')#

Submit the jobs to the cluster.

Parameters:
sequence_list: list

List of sequences to submit.

batch_command: str

The batch command to submit the job (Default: sbatch)

Returns:
job_list: list

List of submitted job IDs.