Spark resource management
Spark jobs run on a shared processing cluster where YARN manages resources such as CPU and memory. The cluster allocates these resources among all running jobs based on specific parameters. The resources are organized into queues, each with a minimum guaranteed share of the cluster’s total CPU and memory. By default, jobs are placed in the ‘default’ queue, though other queues are also available.
Memory allocation
Users can specify the allocated memory for executors using --executor-memory 1G
.
During PySpark job execution, most memory is consumed off-heap. Thus it is advisable to specify memory overhead using
--conf spark.executor.memoryOverhead 4000
Limiting memory requirements in applications to facilitate easier job scheduling is advisable. This can be achieved by partitioning input data into smaller blocks.
Number of parallel jobs
Spark can dynamically determine the number of tasks processed in parallel. This dynamic task allocation can be enabled with the following configurations:
--conf spark.shuffle.service.enabled=true
--conf spark.dynamicAllocation.enabled=true
Optionally, upper and lower bounds can be set:
--conf spark.dynamicAllocation.maxExecutors=30
--conf spark.dynamicAllocation.minExecutors=10
If a fixed number of executors is preferred, use:
--num-executors 10
However, this approach is not recommended as it limits the cluster resource manager’s ability to optimize resource allocation. Executors allocated in this manner remain reserved even when tasks are not actively processed.
Using multiple cores per task
By default, each Spark executor utilizes one CPU core. To accommodate tasks requiring multiple cores, adjust the --executor-cores
parameter and specify the spark.task.cpus
option accordingly. For tasks needing two executor cores:
spark-submit ... --executor-cores 2 --conf spark.task.cpus=2 ...
Spark blacklisting
If tasks consistently fail on the same node(s), enabling Spark blacklisting can be beneficial. This feature temporarily excludes a worker node (default duration is one hour). To activate blacklisting, use --conf spark.blacklist.enabled=true
. Further blacklisting options are detailed in the Spark documentation.
It’s important to note that Spark blacklisting does not address failures where the YARN container containing the Spark task cannot start. In such cases, Spark cannot detect the failure and, therefore, cannot blacklist the node. This issue is acknowledged as a known problem in the current Spark version (2.3.2).