Jobs#
LSTOSA job creation and submission using SLURM. There are several functions to handle the job processing levels keeping the history of all the different steps.
Reference/API#
Functions to handle the interaction with the job scheduler.
- osa.job.are_all_jobs_correctly_finished(sequence_list)#
Check if all jobs are correctly finished by looking at the history file.
- Parameters:
- sequence_list: list
List of sequence objects
- Returns:
- flag: bool
- osa.job.calibration_sequence_job_template(sequence)#
This file contains instruction to be submitted to job scheduler.
- Parameters:
- sequencesequence object
- Returns:
- job_templatestring
- osa.job.check_history_level(history_file: Path, program_levels: dict)#
Check the history of the calibration sequence.
- Parameters:
- history_file: pathlib.Path
Path to the history file
- program_levels: dict
Dictionary with the program name and the level of the program
- Returns:
- level: int
Level of the history file
- exit_status: int
Exit status pf the program according to the history file
- osa.job.data_sequence_job_template(sequence)#
This file contains instruction to be submitted to job scheduler.
- Parameters:
- sequencesequence object
- Returns:
- job_templatestring
- osa.job.filter_jobs(job_info: DataFrame, sequence_list: Iterable)#
Filter the job info list to get the values of the jobs in the current queue.
- osa.job.get_sacct_output(sacct_output: StringIO) DataFrame #
Fetch the information of jobs in the queue using the sacct SLURM output and store it in a pandas dataframe.
- Returns:
- queue_list: pd.DataFrame
- osa.job.get_squeue_output(squeue_output: StringIO) DataFrame #
Obtain the current job information from squeue output and return a pandas dataframe.
- osa.job.historylevel(history_file: Path, data_type: str)#
Returns the level from which the analysis should begin and the rc of the last executable given a certain history file.
- Parameters:
- history_file: pathlib.Path
- data_type: str
Type of the sequence, either ‘DATA’ or ‘PEDCALIB’
- Returns:
- levelint
- exit_statusint
Notes
- Workflow for PEDCALIB sequences:
Creation of DRS4 pedestal file, level 2->1
Creation of charge calibration file, level 1->0
Sequence completed when reaching level 0
- Workflow for DATA sequences:
R0->DL1, level 4->3
DL1->DL1AB, level 3->2
DATACHECK, level 2->1
DL1->DL2, level 1->0
Sequence completed when reaching level 0
- osa.job.job_header_template(sequence)#
Returns a string with the job header template including SBATCH environment variables for sequencerXX.py script
- Parameters:
- sequence: sequence object
- Returns:
- header: str
String with job header template
- osa.job.plot_job_statistics(sacct_output: DataFrame, directory: Path)#
Get statistics of the jobs. Check elapsed time used, the memory used, the number of jobs completed, the number of jobs failed, the number of jobs running, the number of jobs queued. It will fetch the information from the sacct output.
- Parameters:
- sacct_output: pd.DataFrame
- directory: Path
Directory to save the plot.
- osa.job.prepare_jobs(sequence_list)#
Prepare job file template for each sequence.
- osa.job.run_sacct() StringIO #
Run sacct to obtain the job information.
- osa.job.run_squeue() StringIO #
Run squeue command to get the status of the jobs.
- osa.job.save_job_information()#
Write job information from sacct (elapsed time, memory used, number of completed, failed and running jobs in the queue) to a file.
- osa.job.scheduler_env_variables(sequence, scheduler='slurm')#
Return the environment variables for the scheduler.
- osa.job.sequence_filenames(sequence)#
Build names of the script, veto and history files.
- osa.job.set_cache_dirs()#
Export cache directories for the jobs provided they are defined in the config file.
- Returns:
- content: string
String with the command to export the cache directories
- osa.job.set_queue_values(sacct_info: DataFrame, squeue_info: DataFrame, sequence_list: Iterable) None #
Extract job info from sacct output and fetch them into the table of sequences.
- Parameters:
- sacct_info: pd.DataFrame
- squeue_info: pd.DataFrame
- sequence_list: list[Sequence object]
- osa.job.submit_jobs(sequence_list, batch_command='sbatch')#
Submit the jobs to the cluster.
- Parameters:
- sequence_list: list
List of sequences to submit.
- batch_command: str
The batch command to submit the job (Default: sbatch)
- Returns:
- job_list: list
List of submitted job IDs.