# BSD 2-Clause License## Copyright (c) 2021-2024, Hewlett Packard Enterprise# All rights reserved.## Redistribution and use in source and binary forms, with or without# modification, are permitted provided that the following conditions are met:## 1. Redistributions of source code must retain the above copyright notice, this# list of conditions and the following disclaimer.## 2. Redistributions in binary form must reproduce the above copyright notice,# this list of conditions and the following disclaimer in the documentation# and/or other materials provided with the distribution.## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.from__future__importannotationsimportdatetimeimportosimporttypingastfrom..errorimportSSUnsupportedErrorfrom..logimportget_loggerfrom.baseimportBatchSettings,RunSettingslogger=get_logger(__name__)
[docs]classSrunSettings(RunSettings):def__init__(self,exe:str,exe_args:t.Optional[t.Union[str,t.List[str]]]=None,run_args:t.Optional[t.Dict[str,t.Union[int,str,float,None]]]=None,env_vars:t.Optional[t.Dict[str,t.Optional[str]]]=None,alloc:t.Optional[str]=None,**kwargs:t.Any,)->None:"""Initialize run parameters for a slurm job with ``srun`` ``SrunSettings`` should only be used on Slurm based systems. If an allocation is specified, the instance receiving these run parameters will launch on that allocation. :param exe: executable to run :param exe_args: executable arguments :param run_args: srun arguments without dashes :param env_vars: environment variables for job :param alloc: allocation ID if running on existing alloc """super().__init__(exe,exe_args,run_command="srun",run_args=run_args,env_vars=env_vars,**kwargs,)self.alloc=allocself.mpmd:t.List[RunSettings]=[]reserved_run_args={"chdir","D"}
[docs]defset_nodes(self,nodes:int)->None:"""Set the number of nodes Effectively this is setting: ``srun --nodes <num_nodes>`` :param nodes: number of nodes to run with """self.run_args["nodes"]=int(nodes)
[docs]defmake_mpmd(self,settings:RunSettings)->None:"""Make a mpmd workload by combining two ``srun`` commands This connects the two settings to be executed with a single Model instance :param settings: SrunSettings instance """ifself.colocated_db_settings:raiseSSUnsupportedError("Colocated models cannot be run as a mpmd workload")ifself.container:raiseSSUnsupportedError("Containerized MPMD workloads are not yet supported.")ifos.getenv("SLURM_HET_SIZE")isnotNone:raiseValueError("Slurm does not support MPMD workloads in heterogeneous jobs.")self.mpmd.append(settings)
[docs]defset_hostlist(self,host_list:t.Union[str,t.List[str]])->None:"""Specify the hostlist for this job This sets ``--nodelist`` :param host_list: hosts to launch on :raises TypeError: if not str or list of str """ifisinstance(host_list,str):host_list=[host_list.strip()]ifnotisinstance(host_list,list):raiseTypeError("host_list argument must be a list of strings")ifnotall(isinstance(host,str)forhostinhost_list):raiseTypeError("host_list argument must be list of strings")self.run_args["nodelist"]=",".join(host_list)
[docs]defset_hostlist_from_file(self,file_path:str)->None:"""Use the contents of a file to set the node list This sets ``--nodefile`` :param file_path: Path to the hostlist file """self.run_args["nodefile"]=file_path
[docs]defset_excluded_hosts(self,host_list:t.Union[str,t.List[str]])->None:"""Specify a list of hosts to exclude for launching this job :param host_list: hosts to exclude :raises TypeError: """ifisinstance(host_list,str):host_list=[host_list.strip()]ifnotisinstance(host_list,list):raiseTypeError("host_list argument must be a list of strings")ifnotall(isinstance(host,str)forhostinhost_list):raiseTypeError("host_list argument must be list of strings")self.run_args["exclude"]=",".join(host_list)
[docs]defset_cpus_per_task(self,cpus_per_task:int)->None:"""Set the number of cpus to use per task This sets ``--cpus-per-task`` :param num_cpus: number of cpus to use per task """self.run_args["cpus-per-task"]=int(cpus_per_task)
[docs]defset_tasks(self,tasks:int)->None:"""Set the number of tasks for this job This sets ``--ntasks`` :param tasks: number of tasks """self.run_args["ntasks"]=int(tasks)
[docs]defset_tasks_per_node(self,tasks_per_node:int)->None:"""Set the number of tasks for this job This sets ``--ntasks-per-node`` :param tasks_per_node: number of tasks per node """self.run_args["ntasks-per-node"]=int(tasks_per_node)
[docs]defset_cpu_bindings(self,bindings:t.Union[int,t.List[int]])->None:"""Bind by setting CPU masks on tasks This sets ``--cpu-bind`` using the ``map_cpu:<list>`` option :param bindings: List specifing the cores to which MPI processes are bound """ifisinstance(bindings,int):bindings=[bindings]self.run_args["cpu_bind"]="map_cpu:"+",".join(str(int(num))fornuminbindings)
[docs]defset_memory_per_node(self,memory_per_node:int)->None:"""Specify the real memory required per node This sets ``--mem`` in megabytes :param memory_per_node: Amount of memory per node in megabytes """self.run_args["mem"]=f"{int(memory_per_node)}M"
[docs]defset_verbose_launch(self,verbose:bool)->None:"""Set the job to run in verbose mode This sets ``--verbose`` :param verbose: Whether the job should be run verbosely """ifverbose:self.run_args["verbose"]=Noneelse:self.run_args.pop("verbose",None)
[docs]defset_quiet_launch(self,quiet:bool)->None:"""Set the job to run in quiet mode This sets ``--quiet`` :param quiet: Whether the job should be run quietly """ifquiet:self.run_args["quiet"]=Noneelse:self.run_args.pop("quiet",None)
[docs]defset_broadcast(self,dest_path:t.Optional[str]=None)->None:"""Copy executable file to allocated compute nodes This sets ``--bcast`` :param dest_path: Path to copy an executable file """self.run_args["bcast"]=dest_path
[docs]defset_node_feature(self,feature_list:t.Union[str,t.List[str]])->None:"""Specify the node feature for this job This sets ``-C`` :param feature_list: node feature to launch on :raises TypeError: if not str or list of str """ifisinstance(feature_list,str):feature_list=[feature_list.strip()]elifnotall(isinstance(feature,str)forfeatureinfeature_list):raiseTypeError("node_feature argument must be string or list of strings")self.run_args["C"]=",".join(feature_list)
@staticmethoddef_fmt_walltime(hours:int,minutes:int,seconds:int)->str:"""Convert hours, minutes, and seconds into valid walltime format Converts time to format HH:MM:SS :param hours: number of hours to run job :param minutes: number of minutes to run job :param seconds: number of seconds to run job :returns: Formatted walltime """returnfmt_walltime(hours,minutes,seconds)
[docs]defset_walltime(self,walltime:str)->None:"""Set the walltime of the job format = "HH:MM:SS" :param walltime: wall time """self.run_args["time"]=str(walltime)
[docs]defset_het_group(self,het_group:t.Iterable[int])->None:"""Set the heterogeneous group for this job this sets `--het-group` :param het_group: list of heterogeneous groups """het_size_env=os.getenv("SLURM_HET_SIZE")ifhet_size_envisNone:msg="Requested to set het group, but the allocation is not a het job"raiseValueError(msg)het_size=int(het_size_env)ifself.mpmd:msg="Slurm does not support MPMD workloads in heterogeneous jobs\n"raiseValueError(msg)msg=("Support for heterogeneous groups is an experimental feature, ""please report any unexpected behavior to SmartSim developers ""by opening an issue on https://github.com/CrayLabs/SmartSim/issues")ifany(group>=het_sizeforgroupinhet_group):msg=(f"Het group {max(het_group)} requested, "f"but max het group in allocation is {het_size-1}")raiseValueError(msg)logger.warning(msg)self.run_args["het-group"]=",".join(str(group)forgroupinhet_group)
[docs]defformat_run_args(self)->t.List[str]:"""Return a list of slurm formatted run arguments :return: list of slurm arguments for these settings """# add additional slurm arguments based on key lengthopts=[]foropt,valueinself.run_args.items():short_arg=bool(len(str(opt))==1)prefix="-"ifshort_argelse"--"ifnotvalue:opts+=[prefix+opt]else:ifshort_arg:opts+=[prefix+opt,str(value)]else:opts+=["=".join((prefix+opt,str(value)))]returnopts
[docs]defcheck_env_vars(self)->None:"""Warn a user trying to set a variable which is set in the environment Given Slurm's env var precedence, trying to export a variable which is already present in the environment will not work. """fork,vinself.env_vars.items():if","notinstr(v):# If a variable is defined, it will take precedence over --export# we warn the userpreexisting_var=os.environ.get(k,None)ifpreexisting_varisnotNoneandpreexisting_var!=v:msg=(f"Variable {k} is set to {preexisting_var} in current ""environment. If the job is running in an interactive "f"allocation, the value {v} will not be set. Please ""consider removing the variable from the environment ""and re-running the experiment.")logger.warning(msg)
[docs]defformat_env_vars(self)->t.List[str]:"""Build bash compatible environment variable string for Slurm :returns: the formatted string of environment variables """self.check_env_vars()return[f"{k}={v}"fork,vinself.env_vars.items()if","notinstr(v)]
[docs]defformat_comma_sep_env_vars(self)->t.Tuple[str,t.List[str]]:"""Build environment variable string for Slurm Slurm takes exports in comma separated lists the list starts with all as to not disturb the rest of the environment for more information on this, see the slurm documentation for srun :returns: the formatted string of environment variables """self.check_env_vars()exportable_env,compound_env,key_only=[],[],[]fork,vinself.env_vars.items():kvp=f"{k}={v}"if","instr(v):key_only.append(k)compound_env.append(kvp)else:exportable_env.append(kvp)# Append keys to exportable KVPs, e.g. `--export x1=v1,KO1,KO2`fmt_exported_env=",".join(vforvinexportable_env+key_only)formpmdinself.mpmd:compound_mpmd_env={k:vfork,vinmpmd.env_vars.items()if","instr(v)}compound_mpmd_fmt={f"{k}={v}"fork,vincompound_mpmd_env.items()}compound_env.extend(compound_mpmd_fmt)returnfmt_exported_env,compound_env
deffmt_walltime(hours:int,minutes:int,seconds:int)->str:"""Helper function walltime format conversion Converts time to format HH:MM:SS :param hours: number of hours to run job :param minutes: number of minutes to run job :param seconds: number of seconds to run job :returns: Formatted walltime """delta=datetime.timedelta(hours=hours,minutes=minutes,seconds=seconds)fmt_str=str(delta)ifdelta.seconds//3600<10:fmt_str="0"+fmt_strreturnfmt_str
[docs]classSbatchSettings(BatchSettings):def__init__(self,nodes:t.Optional[int]=None,time:str="",account:t.Optional[str]=None,batch_args:t.Optional[t.Dict[str,t.Optional[str]]]=None,**kwargs:t.Any,)->None:"""Specify run parameters for a Slurm batch job Slurm `sbatch` arguments can be written into ``batch_args`` as a dictionary. e.g. {'ntasks': 1} If the argument doesn't have a parameter, put `None` as the value. e.g. {'exclusive': None} Initialization values provided (nodes, time, account) will overwrite the same arguments in ``batch_args`` if present :param nodes: number of nodes :param time: walltime for job, e.g. "10:00:00" for 10 hours :param account: account for job :param batch_args: extra batch arguments """super().__init__("sbatch",batch_args=batch_args,nodes=nodes,account=account,time=time,**kwargs,)
[docs]defset_walltime(self,walltime:str)->None:"""Set the walltime of the job format = "HH:MM:SS" :param walltime: wall time """# TODO check for formatting hereifwalltime:self.batch_args["time"]=walltime
[docs]defset_nodes(self,num_nodes:int)->None:"""Set the number of nodes for this batch job :param num_nodes: number of nodes """ifnum_nodes:self.batch_args["nodes"]=str(int(num_nodes))
[docs]defset_account(self,account:str)->None:"""Set the account for this batch job :param account: account id """ifaccount:self.batch_args["account"]=account
[docs]defset_partition(self,partition:str)->None:"""Set the partition for the batch job :param partition: partition name """self.batch_args["partition"]=str(partition)
[docs]defset_queue(self,queue:str)->None:"""alias for set_partition Sets the partition for the slurm batch job :param queue: the partition to run the batch job on """ifqueue:self.set_partition(queue)
[docs]defset_cpus_per_task(self,cpus_per_task:int)->None:"""Set the number of cpus to use per task This sets ``--cpus-per-task`` :param num_cpus: number of cpus to use per task """self.batch_args["cpus-per-task"]=str(int(cpus_per_task))
[docs]defset_hostlist(self,host_list:t.Union[str,t.List[str]])->None:"""Specify the hostlist for this job :param host_list: hosts to launch on :raises TypeError: if not str or list of str """ifisinstance(host_list,str):host_list=[host_list.strip()]ifnotisinstance(host_list,list):raiseTypeError("host_list argument must be a list of strings")ifnotall(isinstance(host,str)forhostinhost_list):raiseTypeError("host_list argument must be list of strings")self.batch_args["nodelist"]=",".join(host_list)
[docs]defformat_batch_args(self)->t.List[str]:"""Get the formatted batch arguments for a preview :return: batch arguments for Sbatch """opts=[]# TODO add restricted hereforopt,valueinself.batch_args.items():# attach "-" prefix if argument is 1 character otherwise "--"short_arg=bool(len(str(opt))==1)prefix="-"ifshort_argelse"--"ifnotvalue:opts+=[prefix+opt]else:ifshort_arg:opts+=[prefix+opt,str(value)]else:opts+=["=".join((prefix+opt,str(value)))]returnopts