General Considerations to Job-/Task-Farming
Purpose of the Documentation Page
It is intended to give an educational introduction on the subject – a general idea and an overview. For specific technical questions, please consult the respectively dedicated docu pages.
What do we mean by Job-/Task-Farming? (When to use job farming?)
The common HPC cluster job approach is to place one workflow into one job. Resource managers like Slurm provide already the "job" as the unit for a work item. Users can submit one or more jobs, which are then independently scheduled on the available resources, which are maintained by the resource manager and scheduler (Slurm).
However, on many HPC centers, so also at the LRZ, cluster resources are available as full compute nodes - exclusive to the current user job. As modern compute nodes contain a rather large parallel capacity (many CPUs), and large amounts of memory, some software or application cases may not be capable to fill them reasonably – either because the cases are too small, or the software is not able to scale (i.e. to reasonably exploit more parallel resources efficiently). That represents sort of waste, because it causes idling resources.
In classical HPC, there are two approaches to increase the resource usage efficiency:
- optimization of software (detecting possibilities for parallelism, identification and removal of bottlenecks that prevent parallelism, ...)
- bundle many smaller none- or only weakly-parallelizable tasks – but then MANY of them -- into a job-/task-farming job. (These tasks, we then call sometimes "sub-jobs" to distinguish them from the containing larger job.)
This second approach, we usually refer to as task-/job-farming jobs.
Now, because a containing (Slurm) job allocates some resources (say, a number of compute nodes) for a certain period of time (the Slurm job's wall-clock time), there is a difficulty placed on the user:
- The user is now responsible for the distribution of the single tasks/sub-jobs to the resources available – possibly as efficient as possible.
- The user is responsible to respect some dependencies between these tasks (one task may only run after the successful execution of another one)
- The user must do bookkeeping over the set of tasks/sub-jobs – i.e. keep track of what did and what did not yet run successfully.
The first one can be answered by using dedicated tools (srun within Slurm, Flux-Framework, etc. ... sort of resource manager and scheduler within a Slurm job).
For the second point, there exist also already a list of possible tools (workflow tools - like radical cybertools, nextflow, ... ).
These workflow tools answer in part also point 3. What is meant is that the covering overall job might stop (timeout or premature abort due to some system error) before all tasks could run and finish.
So, a normal HPC user has a single or very few workflows, but with large request for parallel resources (CPUs, memory). A job farmer has many (possibly the same) workflows, on many/large amount of data. And therefore requires large(r) amount of parallel resources.
At times, there are certainly mixtures of both.
Which categories of Job-/Task-Farming exist (at least)?
The word "category" is maybe not fully suitable. What we mean is that depending on the characteristics of the single tasks, one can find simplified solution patterns for realizing/implementing task-farming workflows.
The normal/classical HPC Case
For instance, a single parallel MPI program on 2 compute nodes with say 20 CPUs per core runs for a period of time. One can usually assess from simple test, e.g. haw many time-steps in a CFD simulation are required until convergence is reached. This assessment plus some checkpointing/restart strategy (every now and again, the current simulation status is written to disk, from where one can start another simulation later) plus some safety margin (say 10% of the time assessment) is then used as the Slurm job's wall-clock time specification. The two nodes were chosen because some test before showed that 2 nodes contain enough RAM (memory) to contain our CFD case. Furthermore, we could see that for 40 CPUs we have substantial speedup (acceleration) in comparison to a serial (1 CPU) simulation. On the other hand, using more than 40 CPUs (2 nodes) does not significantly improve speedup over 2 nodes. So using more nodes doesn't help more (parallel efficiency drops below 70%, say ... see below; and Amdahl's Law).
These are the considerations, a normal HPC user with a sufficiently parallel tool/software approaches a computer center cluster. One workflow into one job.
The Pipeline Job-Farm Case (MPMD)
a) No scheduling: Let us assume there are not all too many sub-jobs. But all only moderately parallel (say 4 CPUs maximum), and run rather long – of the order of the containing farming-job (the Slurm job). Here, the challenge is then to run several of them in parallel. Using srun
(Slurm) that could look as follows:
Here, we already assumed that the software in the respective tasks are MPI parallel programs. We call this the "srun & wait
idiom".
If they are "only" thread-parallel programs (by example here with OpenMP), then an example looks as follows:
It can now be anticipated how it would look if the tasks are hybrid (MPI parallel + thread-parallel per MPI rank). If subsequent calls to srun would require more CPUs (resources) than are available in the containing job allocation, srun then only creates a so-called "step", which is scheduled, and waits, until resources are free again (i.e. another former step finishes). But here, we would consider that sort of a mis-configuration of the farming job setup when pipelining is desired (for the more general case, see below).
The wait
at the end prevents that the containing job just finishes, because the ampersand (&) at the and of each srun
call places that call in the background, and returns to bash script execution.
The peculiarities of Slurm are but not topic of this rather generic job-farming documentation page. Please, refer to the SchedMD Slurm documentation!
Instead of different programs (prog1
, prog2
, ... ) it could also be the same program – but with different command-line parameters.
If only one node is involved, one can also omit the srun
, and start the programs directly. We also assume here that the programs somehow output to some unique output file. Since most programs however also print to stdout/stderr, a redirection is recommended for clear separation of task output. (prog1 &> log.1
, for instance)
When the task software is only thread-parallel, also MPI itself can be used – and often much more easier and stably, and less error-prone. Here an example:
Here is less danger to try to acquire more CPUs than announced in the Slurm header. However, output redirection and separation are more difficult. That can be achieved by replacing the direct program call by a thin bash wrapper.
Each of the srun calls represents a slot (each rank in the mpiexec
case, when the tasks are thread-only), if it is sort of static. By that we mean that it runs (at least almost) the full period of the containing job's runtime.
b) Static Scheduling: Let us assume now that we have significantly more tasks than parallel slots/ranks. When we still can assume that the execution of each task requires the same resources, and the tasks run independently of each other, we can use the pipeline setup above, and distribute the single tasks to the slots before the job submission (i.e. not just at job runtime). This could be realized, for instance, by using single bash/python/... scripts which each handle one of the slots, in place of prog1
, prog2
, ... above.
Of course, static scheduling may have a very dramatic disadvantage when one slot/pipeline finishes much earlier than the others. So, it is in the responsibility of the user to well-balance the pipelines. If the task's run-times only slightly differ, statistics may help (as there are then as many shorter and longer tasks hopefully placed in each pipeline such that the mean runtime per task and slot is similar).
c) Dynamic Scheduling: If the assumptions in b) are not met, and the tasks's run-times are very different (or even bi- or more-modular), dynamic (i.e. run-time) scheduling may help. Unfortunately, that would require a communication among the pipelines (workload distribution/balancing or work stealing). Maybe, in such cases the general workflow is simpler ... paired with the use of the right tool. Julia's pmap scheduler is capable of doing simple dynamic scheduling within a setup with fixed pipelines (a single srun
call similar to mpiexec
). But for thread-only tasks only.
So, the pipeline concept assumes some regularity concerning the single tasks resource requirements (runtime, memory consumption, CPUs), which simplifies the workflow setup.
The general Case
In general, the single tasks may have quite different resource requirements. And may furthermore be dependent of each other. In both cases, workflow tools are most probably advisable.
Different individual requirements of tasks concerning run-time, number of CPUs (parallelism; MPI, OpenMP, ...), and amount of memory requires some system that keeps track of these resources, and tries to optimize their usage by some sophisticated scheduling strategy. Slurm is doing that. Flux is supposed to do it, too.
Elaborate task dependencies up to complicated task-dependencies trees (DAG == directed acyclic graph) places even a stronger constraint on the task execution, and can – in non-ideal circumstances – even lead to complete serialization of part of the task tree. For this, definitely only workflow-tools are recommended, because handling that complexity by other means is most probably foredoomed.
However, in order to use these tools efficiently on the LRZ cluster systems, they must be capable of interfacing e.g. Slurm from within a Slurm job (so, using srun
, and not sbatch
), or interface e.g. flux
(not flux mini
), or implement own task distributing strategies (via ssh
or mpiexec
e.g. which then requires that they do resource management and scheduling by themselves).
How to I implement a Job-/Task-Farming workflow?
Preparation
By example, we pick out the more common case (e.g. like for uncertainty quantification, or Monte Carlo simulations) of many independent tasks with individually having very similar resource requirements – almost equal run-times, CPU parallelism (MPI/OpenMP/hybrid), and memory.
Let us assume, the tasks require an approximate runtime T_t. This can be determined by running few tasks (say 10 ... ) beforehand, measure the individual run-times, and form the arithmetic mean. Here, we can also measure the memory consumption of each task. Should it be that the memory consumption already exceed the memory of a node, search of a bigger node. Or, if the task can be parallelized via MPI (distributed memory parallelism), which mitigates the per-node memory requirements, increase the per-task parallelism (more ranks ... ). If none of these two options is viable, you are screwed.
Memory:
- If the memory consumption per task is smaller (and here not just in average!!) than a node's total memory divided by the number of available CPUs on that node times the number of CPUs used by the task, then you can use all CPUs on the node. (For threaded programs as tasks there might be limitations when the total number of CPUs per node is not an integer multiple of the number of CPUs used by the task. With MPI/distributed memory parallelism, somewhat more freedom is given.)
- If the memory consumption per task is larger than a node's total memory divided by the number of available CPUs on that node times the number of CPUs used by the task, then you can stark only as many tasks in parallel on that node as corresponds to the ratio (total amount of node memory available divided by the task's used memory)
- If you are on a par – i.e. some tasks may need more memory than their allowed share – you may still use all CPUs. But at the risk that one task may run into out-of-memory kill if the node runs out of available memory. So, statics may help here as well.
Planning Number Crunching:
Next question is, how many nodes should I use for the later production run? That is usually quite simple. HPC centers impose both run-time, T_L, and maximum node, N_L, limits per (Slurm) job. So, the question is then how fast do you want to finish under these constraints?
Let us assume, you have N tasks. N rather large. Then the serial run-time (i.e. the serial subsequent execution of all tasks) would take the time T=NT_t. Let us assume that is way larger than the imposed (Slurm) job's run-time limit T_L. So, we we could ask: "How many parallel workers (slots)" would I need to be smaller or equal to T_L? Answer: M = NT_t/T_L.
Now, if M \le N_L we are fine. Otherwise, we have to distribute our tasks over several farming jobs.
Is there a lower limit? Yes. So-to-speak. T_t about (actually the maximum task run-time). In that case M = N. And if that is smaller than the node limit per job, that is essentially the pipeline scenario above.
So, how many nodes to chose? Here, you are rather free – as long as you stay within the imposed limits. In many cases, this freedom should be used to maximize the node resources. We mentioned above that the tasks may need a number of CPUs (or memory) which does not a nodes given resources. For MPI-parallel tasks, one could then take as many nodes as necessary in order that M parallel tasks can fill all CPUs – without the necessity to leave some idle.
For thread-only parallel tasks, this is not possible. If there are then k CPUs idle per node, and M nodes used, and the total (Slurm) job run-time about NT_t/M, then we simply waste kMNT_t/M = kNT_t CPU-hours. Inevitably. We could at most use as many threads per task such that an integer number of tasks can run in parallel on a node, and occupies all CPUs. Even if this means that the tasks run with a very bad parallel efficiency.
In the extreme case of 1-CPU-only tasks with each but large memory requirements, which – also in an extreme case – require all node's memory, then all CPUs except for one are idling all of the time. It is clear that this should be avoided if possible!
Tasks's parallelism:
Many users use tools they have not developed themselves. Many may already support some parallelism. So, what can one do for optimization? (If you are a developer, code optimization is mandatory! But even if you are not, the software you use may still be available as source code. And users still can do optimization during compilation – compiler optimization -O3 or even vectorization. But please carefully test that this does not spoil the program's correctness!)
Neglecting the memory intricacy mentioned above, one can simply try a strong scaling test. Be the serial run-time of a task T_1. Let us try to run it with n parallel units (threads/MPI ranks), we measure a run-time of T_n. The speed-up factor is then defined by S = T_1/T_n. And the parallel efficiency, we define as \varepsilon_n = 100\% \cdot (T_1/n) / T_n. That is – in short words – just the comparison of the in fact measured run-time, T_n, with the prognostic one assuming perfect scaling, T_1/n.
As most programs possess some bottlenecks which prevent in part parallelism, this efficiency is in most cases smaller than 100% (cache effects can but matter and change that). So, with increasing n, this serializing part may matter more and more, and decrease the parallel efficiency. It is now a question of a decision about a criterion, which efficiency is still acceptable. We usually impose that less than 70% are too wasteful to use more than so many parallel resources — as a corner number.
But as we mentioned above ... it might be nonetheless helpful – as long as a speed-up is still present – to use more CPUs in a task, when this makes a better CPU exploit of a node.
Production Run
With the preparations above, one can now decide which (Slurm) job parameters to use (wall-clock run-time, number of nodes, number of sub-jobs per node i.e. slots or number of tasks running parallel on a node, ... ), and also decide on the strategy and tool selection to process all tasks.
This should incorporate also considerations about the mentioned bookkeeping, and restart strategies. The former is there to know which progress we do during the farming job. It is also necessary to have some criterion whether a task ran reasonably or not! You cannot just trust that everything will go well!!
The latter (restart strategy) is necessary if you job failed. Of course, you should not (and usually don't want to) repeat all tasks that already ran successfully. So, bookkeeping must be done in fault-tolerant way such that you can restart the the containing (Slurm) job, and it processes only those tasks that are not finished, yet.
What if for some reason only 99% or 98% or so of the tasks run that way successfully? Perfect!! Please, do not assume job farming to be there for perfection!! What you want to achieve is mass production. The remaining rest can be most probably done in a post-processing ... possibly again on your local PC/laptop. The vast HPC resources were needed mostly because of the huge amount of tasks. But having process the vast majority of them already brought you close enough to your goal!
Debugging: What happens if something goes wrong? That means if a high percentage of tasks fails with (or without) some errors.
Well. The best thing you can do is to take such a failed task and scrutinize it separately. If it fails because of an internal error, you probably have to modify/adapt your input parameters for it or so. But if then runs, external issues are very probable. The first thing to check is the memory consumption on a node. As mentioned above, several tasks running in parallel may compete for memory. If that can be excluded ... restart the farming job once more. If the problem persists, ask the Service Desk.
Workflow Tools
There are many workflow tools out there, and we won't go much into detail, or even list them. But what we can place here is a short list of requirements such tools should fulfill.
Such tools should work within a Slurm-Job with the following characteristics:
- task distribution via srun (or flux, mpiexec, ...), when multi-node; otherwise local
- number of srun-step generation limited (to avoid overload of Slurm managers); better use alternatives like flux-framework, or e.g. GNU parallel to limit srun
- checkpointing-mechanism must be implemented; i.e. at job abort/timeout the same job can be restarted, and it continues with the unprocessed tasks (aka bookkeeping; but here must be fail-safe)
Explanations:
On our clusters, users get compute nodes mostly exclusively. That is much too coarse as granularity for many task-farming workflows with essentially only little parallelizing tasks (if not completely serial). Our Slurm configuration at least does not provide a shared node configuration on the vast amount of cluster resources. In order to efficiently exploit the broad parallel resources of a compute node, one needs to fill it with many tasks. This is it, what also accelerates these task-farming production runs.
For multi-node jobs, srun (or similar) must be used to distribute the tasks on the different nodes in the Slurm job allocation. srun and flux have the advantage of doing that in a rather automatic way, leaving the user mostly with focusing on the task workflows themselves. mpiexec also allows for such a distribution. But in order to place the tasks to the correct nodes and CPUs, both hostlists and CPU masks must be used. Doing such things explicitly can be very tedious.
Many workflow tools unfortunately work only on the level of sbatch
(when they support Slurm). Compute nodes are but no submit hosts on our clusters. This simply doesn't work!
The restriction of srun-steps, and especially not bluntly creating srun steps in the background (srun & wait
idiom) in a for loop, is important in order not to overwhelm the Slurm managers. That's because Slurm will handle each srun call as sort of a job submission as well, and tries to schedule these tasks as if they were normal Slurm jobs. That puts extra load on the Slurm-scheduler. Using flux-framework, you can completely circumvent that issue. If you prefer srun, you can use GNU parallel to limit the job steps aktive at a time.
By "checkpointing/restarting" we mean sort of a "statefulness" of the task-farming's progress in order to avoid repetition of tasks and work progression from an intermediate state after a job stopped working, and was restarted. Jobs can run into wallclock timeout, or may plain suffer from a node failure. "checkpointing/restarting" is a method of increasing efficiency resilience of jobs. This can be complemented by fail-safeness considerations, for which task-farming jobs might be very suitable. That is, a multi-node farming job can continue usually even if one node failed ... then just with less resources. The failed and unscheduled/unprocesses tasks on that node can be redistributed to the remaining nodes.