Source code for desispec.workflow.queue

"""
desispec.workflow.queue
=======================

"""
import os
import re
import numpy as np
from astropy.table import Table, vstack
import subprocess

from desispec.workflow.proctable import get_err_qid, get_default_qid
from desiutil.log import get_logger
import time, datetime

global _cached_slurm_states
_cached_slurm_states = dict()

[docs] def get_resubmission_states(no_resub_failed=False): """ Defines what Slurm job failure modes should be resubmitted in the hopes of the job succeeding the next time. Possible values that Slurm returns are:: CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given BF BOOT_FAIL Job terminated due to launch failure CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated. CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero. DL DEADLINE Job terminated on deadline. F FAILED Job terminated with non-zero exit code or other failure condition. NF NODE_FAIL Job terminated due to failure of one or more allocated nodes. OOM OUT_OF_MEMORY Job experienced out of memory error. PD PENDING Job is awaiting resource allocation. PR PREEMPTED Job terminated due to preemption. R RUNNING Job currently has an allocation. RQ REQUEUED Job was requeued. RS RESIZING Job is about to change size. RV REVOKED Sibling was removed from cluster due to other cluster starting the job. S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs. TO TIMEOUT Job terminated upon reaching its time limit. Args: no_resub_failed: bool. Set to True if you do NOT want to resubmit jobs with Slurm status 'FAILED' by default. Default is False. Returns: list. A list of strings outlining the job states that should be resubmitted. """ ## 'UNSUBMITTED' is default pipeline state for things not yet submitted ## 'DEP_NOT_SUBD' is set when resubmission can't proceed because a ## dependency has failed resub_states = ['UNSUBMITTED', 'DEP_NOT_SUBD', 'MAX_RESUB', 'BOOT_FAIL', 'DEADLINE', 'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED', 'REVOKED', 'SUSPENDED', 'TIMEOUT', 'CANCELLED'] if not no_resub_failed: resub_states.append('FAILED') return resub_states
[docs] def get_termination_states(): """ Defines what Slurm job states that are final and aren't in question about needing resubmission. Possible values that Slurm returns are:: CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given BF BOOT_FAIL Job terminated due to launch failure CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated. CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero. DL DEADLINE Job terminated on deadline. F FAILED Job terminated with non-zero exit code or other failure condition. NF NODE_FAIL Job terminated due to failure of one or more allocated nodes. OOM OUT_OF_MEMORY Job experienced out of memory error. PD PENDING Job is awaiting resource allocation. PR PREEMPTED Job terminated due to preemption. R RUNNING Job currently has an allocation. RQ REQUEUED Job was requeued. RS RESIZING Job is about to change size. RV REVOKED Sibling was removed from cluster due to other cluster starting the job. S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs. TO TIMEOUT Job terminated upon reaching its time limit. Returns: list. A list of strings outlining the job states that are considered final (without human investigation/intervention) """ return ['COMPLETED', 'CANCELLED', 'FAILED']
[docs] def get_failed_states(): """ Defines what Slurm job states should be considered failed or problematic All possible values that Slurm returns are: BF BOOT_FAIL Job terminated due to launch failure, typically due to a hardware failure (e.g. unable to boot the node or block and the job can not be requeued). CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated. CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero. CF CONFIGURING Job has been allocated resources, but are waiting for them to become ready for use (e.g. booting). CG COMPLETING Job is in the process of completing. Some processes on some nodes may still be active. DL DEADLINE Job terminated on deadline. F FAILED Job terminated with non-zero exit code or other failure condition. NF NODE_FAIL Job terminated due to failure of one or more allocated nodes. OOM OUT_OF_MEMORY Job experienced out of memory error. PD PENDING Job is awaiting resource allocation. PR PREEMPTED Job terminated due to preemption. R RUNNING Job currently has an allocation. RD RESV_DEL_HOLD Job is being held after requested reservation was deleted. RF REQUEUE_FED Job is being requeued by a federation. RH REQUEUE_HOLD Held job is being requeued. RQ REQUEUED Completing job is being requeued. RS RESIZING Job is about to change size. RV REVOKED Sibling was removed from cluster due to other cluster starting the job. SI SIGNALING Job is being signaled. SE SPECIAL_EXIT The job was requeued in a special state. This state can be set by users, typically in EpilogSlurmctld, if the job has terminated with a particular exit value. SO STAGE_OUT Job is staging out files. ST STOPPED Job has an allocation, but execution has been stopped with SIGSTOP signal. CPUS have been retained by this job. S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs. TO TIMEOUT Job terminated upon reaching its time limit. Returns: list. A list of strings outlining the job states that are considered to be failed or problematic. """ return ['BOOT_FAIL', 'CANCELLED', 'DEADLINE', 'FAILED', 'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED', 'REVOKED', 'SUSPENDED', 'TIMEOUT']
[docs] def get_non_final_states(): """ Defines what Slurm job states that are not final and therefore indicate the job hasn't finished running. Possible values that Slurm returns are: CA or ca or CANCELLED for cancelled jobs will only show currently running jobs in queue unless times are explicitly given BF BOOT_FAIL Job terminated due to launch failure CA CANCELLED Job was explicitly cancelled by the user or system administrator. The job may or may not have been initiated. CD COMPLETED Job has terminated all processes on all nodes with an exit code of zero. DL DEADLINE Job terminated on deadline. F FAILED Job terminated with non-zero exit code or other failure condition. NF NODE_FAIL Job terminated due to failure of one or more allocated nodes. OOM OUT_OF_MEMORY Job experienced out of memory error. PD PENDING Job is awaiting resource allocation. PR PREEMPTED Job terminated due to preemption. R RUNNING Job currently has an allocation. RQ REQUEUED Job was requeued. RS RESIZING Job is about to change size. RV REVOKED Sibling was removed from cluster due to other cluster starting the job. S SUSPENDED Job has an allocation, but execution has been suspended and CPUs have been released for other jobs. TO TIMEOUT Job terminated upon reaching its time limit. Returns: list. A list of strings outlining the job states that are considered non-final """ return ['SUBMITTED', 'PENDING', 'RUNNING', 'REQUEUED', 'RESIZING']
[docs] def get_mock_slurm_data(): """ Returns a string of output that mimics what Slurm would return from sacct -X --parsable2 --delimiter=, \ --format=JobID,JobName,Partition,Submit,Eligible,Start,End,Elapsed,State,ExitCode -j <qid_str> Returns ------- str Mock Slurm data csv format. """ string = 'JobID,JobName,Partition,Submit,Eligible,Start,End,Elapsed,State,ExitCode\n' string += '49482394,arc-20211102-00107062-a0123456789,realtime,2021-11-02' \ + 'T18:31:14,2021-11-02T18:36:33,2021-11-02T18:36:33,2021-11-02T' \ + '18:48:32,00:11:59,COMPLETED,0:0' + '\n' string += '49482395,arc-20211102-00107063-a0123456789,realtime,2021-11-02' \ + 'T18:31:16,2021-11-02T18:36:33,2021-11-02T18:48:34,2021-11-02T' \ + '18:57:02,00:11:59,COMPLETED,0:0' + '\n' string += '49482397,arc-20211102-00107064-a0123456789,realtime,2021-11-02' \ + 'T18:31:19,2021-11-02T18:36:33,2021-11-02T18:57:05,2021-11-02T' \ + '19:06:17,00:11:59,COMPLETED,0:0' + '\n' string += '49482398,arc-20211102-00107065-a0123456789,realtime,2021-11-02' \ + 'T18:31:24,2021-11-02T18:36:33,2021-11-02T19:06:18,2021-11-02T' \ + '19:13:59,00:11:59,COMPLETED,0:0' + '\n' string += '49482399,arc-20211102-00107066-a0123456789,realtime,2021-11-02' \ + 'T18:31:27,2021-11-02T18:36:33,2021-11-02T19:14:00,2021-11-02T' \ + '19:24:49,00:11:59,COMPLETED,0:0' return string
[docs] def queue_info_from_time_window(start_time=None, end_time=None, user=None, \ columns='jobid,jobname,partition,submit,eligible,'+ 'start,end,elapsed,state,exitcode', dry_run_level=0): """ Queries the NERSC Slurm database using sacct with appropriate flags to get information within a specified time window of all jobs submitted or executed during that time. Parameters ---------- start_time : str String of the form YYYY-mm-ddTHH:MM:SS. Based on the given night and the earliest hour you want to see queue information about. end_time : str String of the form YYYY-mm-ddTHH:MM:SS. Based on the given night and the latest hour you want to see queue information about. user : str The username at NERSC that you want job information about. The default is an the environment name if if exists, otherwise 'desi'. columns : str Comma seperated string of valid sacct column names, in lower case. To be useful for the workflow, it should have MUST have columns "JOBID" and "STATE". Other columns available that aren't included in the default list are: jobid,jobname,partition,submit,eligible,start,end,elapsed,state,exitcode. Other options include: suspended,derivedexitcode,reason,priority,jobname. dry_run_level : int If nonzero, this is a simulated run. Default is 0. 0 which runs the code normally. 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. 4 Doesn't write, submit jobs, or query Slurm. 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. Returns ------- astropy.table.Table Table with the columns defined by the input variable 'columns' and information relating to all jobs submitted by the specified user in the specified time frame. """ # global queue_info_table if dry_run_level > 4: string = get_mock_slurm_data() cmd_as_list = ['echo', string] elif dry_run_level > 3: cmd_as_list = ['echo', 'JobID,JobName,Partition,Submit,Eligible,Start,End,Elapsed,State,ExitCode'] else: if user is None: if 'USER' in os.environ: user = os.environ['USER'] else: user = 'desi' if start_time is None: start_time = '2020-04-26T00:00' if end_time is None: end_time = '2020-05-01T00:00' cmd_as_list = ['sacct', '-X', '--parsable2', '--delimiter=,', \ '-S', start_time, \ '-E', end_time, \ '-u', user, \ f'--format={columns}'] table_as_string = subprocess.check_output(cmd_as_list, text=True, stderr=subprocess.STDOUT) queue_info_table = Table.read(table_as_string, format='ascii.csv') for col in queue_info_table.colnames: queue_info_table.rename_column(col, col.upper()) ## Update the cached states of these jobids if we have that info to update update_queue_state_cache_from_table(queue_info_table) return queue_info_table
[docs] def queue_info_from_qids(qids, columns='jobid,jobname,partition,submit,'+ 'eligible,start,end,elapsed,state,exitcode', dry_run_level=0, loglevel=None): """ Queries the NERSC Slurm database using sacct with appropriate flags to get information about specific jobs based on their jobids. Parameters ---------- jobids : list or array of ints Slurm QID's at NERSC that you want to return information about. columns : str Comma seperated string of valid sacct column names, in lower case. To be useful for the workflow, it should have MUST have columns "JOBID" and "STATE". Other columns available that aren't included in the default list are: jobid,jobname,partition,submit,eligible,start,end,elapsed,state,exitcode. Other options include: suspended,derivedexitcode,reason,priority,jobname. dry_run_level : int If nonzero, this is a simulated run. Default is 0. 0 which runs the code normally. 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. 4 Doesn't write, submit jobs, or query Slurm. 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. loglevel : str, optional loglevel to use (DEBUG, INFO, WARNING, ERROR, CRITICAL) Returns ------- astropy.table.Table Table with the columns defined by the input variable 'columns' and information relating to all jobs submitted by the specified user in the specified time frame. """ qids = np.atleast_1d(qids).astype(int) log = get_logger(level=loglevel) qids = qids[np.isin(qids, [get_err_qid(), get_default_qid()], invert=True)] # avoid default QID values ## If qids is too long, recursively call self and stack tables; otherwise sacct hangs nmax = 100 if len(qids) > nmax: results = list() for i in range(0, len(qids), nmax): results.append(queue_info_from_qids(qids[i:i+nmax], columns=columns, dry_run_level=dry_run_level, loglevel=loglevel)) results = vstack(results) return results elif len(qids) == 0: return Table(names=columns.upper().split(',')) ## Turn the queue id's into a list ## this should work with str or int type also, though not officially supported qid_str = ','.join(np.atleast_1d(qids).astype(str)).replace(' ','') cmd_as_list = ['sacct', '-X', '--parsable2', '--delimiter=,', f'--format={columns}', '-j', qid_str] if dry_run_level > 4: log.info("Dry run, would have otherwise queried Slurm with the" +f" following: {' '.join(cmd_as_list)}") ### Set a random 5% of jobs as TIMEOUT, set seed for reproducibility # np.random.seed(qids[0]) states = np.array(['COMPLETED'] * len(qids)) #states[np.random.random(len(qids)) < 0.05] = 'TIMEOUT' ## Try two different column configurations, otherwise give up trying to simulate string = 'JobID,JobName,Partition,Submit,Eligible,Start,End,Elapsed,State,ExitCode' if columns.lower() == string.lower(): for jobid, expid, state in zip(qids, 100000+np.arange(len(qids)), states): string += f'\n{jobid},arc-20211102-{expid:08d}-a0123456789,realtime,2021-11-02'\ +'T18:31:14,2021-11-02T18:36:33,2021-11-02T18:36:33,2021-11-02T'\ +f'18:48:32,00:11:59,{state},0:0' elif columns.lower() == 'jobid,state': string = 'JobID,State' for jobid, state in zip(qids, states): string += f'\n{jobid},{state}' # create command to run to exercise subprocess -> stdout parsing cmd_as_list = ['echo', string] elif dry_run_level > 3: cmd_as_list = ['echo', columns.lower()] else: log.info(f"Querying Slurm with the following: {' '.join(cmd_as_list)}") #- sacct sometimes fails; try several times before giving up max_attempts = 3 for attempt in range(max_attempts): try: table_as_string = subprocess.check_output(cmd_as_list, text=True, stderr=subprocess.STDOUT) break except subprocess.CalledProcessError as err: log.error(f'{qid_str} job query via sacct failure at {datetime.datetime.now()}') log.error(f'{qid_str} {cmd_as_list}') log.error(f'{qid_str} {err.output=}') else: #- for/else happens if loop doesn't succeed msg = f'{qid_str} job query via sacct failed {max_attempts} times; exiting' log.critical(msg) raise RuntimeError(msg) queue_info_table = Table.read(table_as_string, format='ascii.csv') for col in queue_info_table.colnames: queue_info_table.rename_column(col, col.upper()) ## Update the cached states of these jobids if we have that info to update update_queue_state_cache_from_table(queue_info_table) return queue_info_table
[docs] def get_queue_states_from_qids(qids, dry_run_level=0, use_cache=False, loglevel=None): """ Queries the NERSC Slurm database using sacct with appropriate flags to get information on the job STATE. If use_cache is set and all qids have cached values from a previous query, those cached states will be returned instead. Parameters ---------- jobids : list or array of ints Slurm QID's at NERSC that you want to return information about. dry_run_level : int If nonzero, this is a simulated run. Default is 0. 0 which runs the code normally. 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. 4 Doesn't write, submit jobs, or query Slurm. 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. use_cache : bool If True the code first looks for a cached status for the qid. If unavailable, then it queries Slurm. Default is False. loglevel : str, optional loglevel to use (DEBUG, INFO, WARNING, ERROR, CRITICAL) Returns ------- Dict Dictionary with the keys as jobids and values as the slurm state of the job. """ err_qid, def_qid = get_err_qid(), get_default_qid() global _cached_slurm_states qids = np.atleast_1d(qids).astype(int) log = get_logger(level=loglevel) # Exclude placeholder QIDs from cache checks and Slurm queries. # These placeholders are never cached and are not included in the output. real_qids = qids[(qids != err_qid) & (qids != def_qid)] ## Only use cached values if all are cahced, since the time is dominated ## by the call itself rather than the number of jobids, so we may as well ## get updated information from all of them if we're submitting a query anyway outdict = dict() if use_cache and real_qids.size > 0 and np.all(np.isin(real_qids, list(_cached_slurm_states.keys()))): log.info(f"All Slurm real_qids={real_qids} are cached. Using cached values.") for qid in real_qids: outdict[qid] = _cached_slurm_states[qid] elif real_qids.size > 0: outtable = queue_info_from_qids(real_qids, columns='jobid,state', dry_run_level=dry_run_level, loglevel=loglevel) for row in outtable: if int(row['JOBID']) not in [err_qid, def_qid]: outdict[int(row['JOBID'])] = row['STATE'] return outdict
[docs] def update_queue_state_cache_from_table(queue_info_table): """ Takes a Slurm jobid and updates the queue id cache with the supplied state Parameters ---------- queue_info_table : astropy.table.Table Table returned by an sacct query. Should contain at least JOBID and STATE columns Returns ------- Nothing """ ## Update the cached states of these jobids if we have that info to update if 'JOBID' in queue_info_table.colnames and 'STATE' in queue_info_table.colnames: for row in queue_info_table: update_queue_state_cache(qid=row['JOBID'], state=row['STATE'])
[docs] def update_queue_state_cache(qid, state): """ Takes a Slurm jobid and updates the queue id cache with the supplied state Parameters ---------- qid : int Slurm QID at NERSC state: str The current job status of the Slurm jobid Returns ------- Nothing """ global _cached_slurm_states if int(qid) not in [get_err_qid(), get_default_qid()]: _cached_slurm_states[int(qid)] = state
[docs] def clear_queue_state_cache(): """ Remove all entries from the queue state cache """ global _cached_slurm_states _cached_slurm_states.clear()
[docs] def update_from_queue(ptable, qtable=None, dry_run_level=0, ignore_scriptnames=False, check_complete_jobs=False): """ Given an input prcessing table (ptable) and query table from the Slurm queue (qtable) it cross matches the Slurm job ID's and updates the 'state' in the table using the current state in the Slurm scheduler system. Parameters ---------- ptable : astropy.table.Table Processing table that contains the jobs you want updated with the most recent queue table. Must have at least columnns 'LATEST_QID' and 'STATUS'. qtable : astropy.table.Table Table with the columns defined by the input variable 'columns' and information relating to all jobs submitted by the specified user in the specified time frame. ignore_scriptnames : bool Default is False. Set to true if you do not want to check whether the scriptname matches the jobname return by the slurm scheduler. check_complete_jobs: bool Default is False. Set to true if you want to also check QID's that currently have a STATUS "COMPLETED". in the ptable. dry_run_level : int If nonzero, this is a simulated run. Default is 0. 0 which runs the code normally. 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. 4 Doesn't write, submit jobs, or query Slurm. 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. Returns ------- ptab : astropy.table.Table A opy of the same processing table as the input except that the "STATUS" column in ptable for all jobs is updated based on the 'STATE' in the qtable (as matched by "LATEST_QID" in the ptable and "JOBID" in the qtable). """ log = get_logger() ptab = ptable.copy() if qtable is None: log.info("qtable not provided, querying Slurm using ptab's LATEST_QID set") ## Only submit incomplete jobs unless explicitly told to check them ## completed jobs shouldn't change status ## Intialize sel to True for all rows. Added benefit that any negative (non-real) qid's will ## be excluded from our query to Slurm sel = ptab['LATEST_QID'] > 0 if np.any(~sel): log.warning(f"Some rows in the ptab have non-positive LATEST_QID values, which is unexpected but not an issue here since we'll exclude them. Number of such rows: {np.sum(~sel)}.") if not check_complete_jobs: sel &= (ptab['STATUS'] != 'COMPLETED') log.info(f"Querying Slurm for {np.sum(sel)} QIDs from table of length {len(ptab)}.") qids = np.array(ptab['LATEST_QID'][sel]) ## If you provide empty jobids Slurm gives you the three most recent jobs, ## which we don't want here if len(qids) == 0: log.info(f"No QIDs left to query. Returning the original table.") return ptab qtable = queue_info_from_qids(qids, dry_run_level=dry_run_level) log.info(f"Slurm returned information on {len(qtable)} jobs out of " +f"{len(ptab)} jobs in the ptab. Updating those now.") check_scriptname = ('JOBNAME' in qtable.colnames and 'SCRIPTNAME' in ptab.colnames and not ignore_scriptnames) if check_scriptname: log.info("Will be verifying that the file names are consistent") for row in qtable: if int(row['JOBID']) in [get_err_qid(), get_default_qid()]: continue match = (int(row['JOBID']) == ptab['LATEST_QID']) if np.any(match): ind = np.where(match)[0][0] if check_scriptname and ptab['SCRIPTNAME'][ind] not in row['JOBNAME']: log.warning(f"For job with expids:{ptab['EXPID'][ind]}" + f" the scriptname is {ptab['SCRIPTNAME'][ind]}" + f" but the jobname in the queue was " + f"{row['JOBNAME']}.") state = str(row['STATE']).split(' ')[0] ## Since dry run 1 and 2 save proc tables, don't alter the ## states for these when simulating ptab['STATUS'][ind] = state return ptab
[docs] def any_jobs_not_complete(statuses, termination_states=None): """ Returns True if any of the job statuses in the input column of the processing table, statuses, are not complete (as based on the list of acceptable final states, termination_states, given as an argument. These should be states that are viewed as final, as opposed to job states that require resubmission. Parameters ---------- statuses : Table.Column or list or np.array The statuses in the processing table "STATUS". Each element should be a string. termination_states : list or np.array Each element should be a string signifying a state that is returned by the Slurm scheduler that should be deemed terminal state. Returns ------- bool True if any of the statuses of the jobs given in statuses are NOT a member of the termination states. Otherwise returns False. """ if termination_states is None: termination_states = get_termination_states() return np.any([status not in termination_states for status in statuses])
[docs] def any_jobs_need_resubmission(statuses, resub_states=None): """ Returns True if any of the job statuses in the input column of the processing table, statuses, are not in the resubmission states. Parameters ---------- statuses : Table.Column or list or np.array The statuses in the processing table "STATUS". Each element should be a string. resub_states : list or np.array Each element should be a string signifying a state that is returned by the Slurm scheduler that should be consider failing or problematic. Returns ------- bool True if any of the statuses of the jobs given in statuses are a member of the failed_states. """ if resub_states is None: resub_states = get_resubmission_states(no_resub_failed=False) ## This works with strings or bytes, unlike np.isin return np.any([status in resub_states for status in statuses])
[docs] def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0): """ Queries the NERSC Slurm database using sacct with appropriate flags to get information about specific jobs based on their jobids. Parameters ---------- user : str NERSC user to query the jobs for include_scron : bool True if you want to include scron entries in the returned table. Default is False. dry_run_level : int If nonzero, this is a simulated run. Default is 0. 0 which runs the code normally. 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. 4 Doesn't write, submit jobs, or query Slurm. 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. Returns ------- astropy.table.Table Table with the columns JOBID, PARTITION, RESERVATION, NAME, USER, ST, TIME, NODES, NODELIST(REASON) for the specified user. """ log = get_logger() if user is None: if 'USER' in os.environ: user = os.environ['USER'] else: user = 'desi' cmd = f'squeue -u {user} -o "%i,%P,%v,%j,%u,%t,%M,%D,%R"' cmd_as_list = cmd.split() header = 'JOBID,PARTITION,RESERVATION,NAME,USER,ST,TIME,NODES,NODELIST(REASON)' if dry_run_level > 4: log.info("Dry run, would have otherwise queried Slurm with the" +f" following: {' '.join(cmd_as_list)}") string = header string += f"27650097,cron,(null),scron_ar,{user},PD,0:00,1,(BeginTime)" string += f"27650100,cron,(null),scron_nh,{user},PD,0:00,1,(BeginTime)" string += f"27650098,cron,(null),scron_up,{user},PD,0:00,1,(BeginTime)" string += f"29078887,gpu_ss11,(null),tilenight-20230413-24315,{user},PD,0:00,1,(Priority)" string += f"29078892,gpu_ss11,(null),tilenight-20230413-21158,{user},PD,0:00,1,(Priority)" string += f"29079325,gpu_ss11,(null),tilenight-20240309-24526,{user},PD,0:00,1,(Dependency)" string += f"29079322,gpu_ss11,(null),ztile-22959-thru20240309,{user},PD,0:00,1,(Dependency)" string += f"29078883,gpu_ss11,(null),tilenight-20230413-21187,{user},R,10:18,1,nid003960" string += f"29079242,regular_milan_ss11,(null),arc-20240309-00229483-a0123456789,{user},PD,0:00,3,(Priority)" string += f"29079246,regular_milan_ss11,(null),arc-20240309-00229484-a0123456789,{user},PD,0:00,3,(Priority)" # create command to run to exercise subprocess -> stdout parsing cmd = 'echo ' + string cmd_as_list = ['echo', string] elif dry_run_level > 3: cmd = 'echo ' + header cmd_as_list = ['echo', header] else: log.info(f"Querying jobs in queue with: {' '.join(cmd_as_list)}") #- sacct sometimes fails; try several times before giving up max_attempts = 3 for attempt in range(max_attempts): try: table_as_string = subprocess.check_output(cmd_as_list, text=True, stderr=subprocess.STDOUT) break except subprocess.CalledProcessError as err: log.error(f'{cmd} job query failure at {datetime.datetime.now()}') log.error(f'{cmd_as_list}') log.error(f'{err.output=}') else: #- for/else happens if loop doesn't succeed msg = f'{cmd} query failed {max_attempts} times; exiting' log.critical(msg) raise RuntimeError(msg) ## remove extra quotes that astropy table does't like table_as_string = table_as_string.replace('"','') ## remove parenthesis are also not very desirable table_as_string = table_as_string.replace('(', '').replace(')', '') ## remove node list with hyphen or comma otherwise it will break table reader table_as_string = re.sub(r"nid\[[0-9,-]*\]", "multiple nodes", table_as_string) try: queue_info_table = Table.read(table_as_string, format='ascii.csv') except: log.info("Table retured by squeue couldn't be parsed. The string was:") print(table_as_string) raise for col in queue_info_table.colnames: queue_info_table.rename_column(col, col.upper()) ## If the table is empty, return it immediately, otherwise perform ## sanity check and cuts if len(queue_info_table) == 0: return queue_info_table if np.any(queue_info_table['USER']!=user): msg = f"Warning {np.sum(queue_info_table['USER']!=user)} " \ + f"jobs returned were not {user=}\n" \ + f"{queue_info_table['USER'][queue_info_table['USER']!=user]}" log.critical(msg) raise ValueError(msg) if not include_scron: queue_info_table = queue_info_table[queue_info_table['PARTITION'] != 'cron'] return queue_info_table
[docs] def check_queue_count(user=None, include_scron=False, dry_run_level=0): """ Queries the NERSC Slurm database using sacct with appropriate flags to get information about specific jobs based on their jobids. Parameters ---------- user : str NERSC user to query the jobs for include_scron : bool True if you want to include scron entries in the returned table. Default is False. dry_run_level : int If nonzero, this is a simulated run. Default is 0. 0 which runs the code normally. 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. 4 Doesn't write, submit jobs, or query Slurm. 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. Returns ------- int The number of jobs for that user in the queue (including or excluding scron entries depending on include_scron). """ return len(get_jobs_in_queue(user=user, include_scron=include_scron, dry_run_level=dry_run_level))
[docs] def submit_script(scriptpath, sbatch_opts=None): """ Submit script to queue and return the qid. Args: scriptpath (str): Path to the script to submit. Options: sbatch_opts (list, optional): List of additional options to pass to sbatch. Returns: int: The qid of the submitted job. This function automatically adds the sbatch --parsable flag to interpret the qid. The `sbatch_opts` argument is for any additional options like `--reservation ...` """ if sbatch_opts is None: sbatch_opts = [] cmd = ['sbatch', '--parsable',] + sbatch_opts + [scriptpath,] try: qid = subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True) qid = int(qid.strip(' \t\n')) except subprocess.CalledProcessError as err: log = get_logger() log.error(f"Error submitting script {scriptpath} with command {' '.join(cmd)}") log.error(f"Error message: {err.output}") raise err return qid