Spark best practices
When working with Spark on the Hadoop cluster, there are some things you should be aware of. Below, you can find some best practices which will help integrating your code properly.
SparkContext initialization
In your Spark driver, it is best practice to first initialize the SparkContext before doing other logic like collecting the inputs that needs to be processed. Otherwise you can run into a timeout issue because creating the SparkContext took too long.
Sample exception in case SparkContext creation times out:
20/10/21 11:01:49 ERROR ApplicationMaster: Uncaught exception:
.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
java.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at scala.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:498)
at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:345)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply$mcV$sp(ApplicationMaster.scala:260)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$5.run(ApplicationMaster.scala:815)
at org.security.AccessController.doPrivileged(Native Method)
at java.security.auth.Subject.doAs(Subject.java:422)
at javax.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:814)
at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:259)
at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:839)
at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) at org
If the suggested approach doesn’t help, you can also increase the spark.yarn.am.waitTime
(default is 100 seconds), e.g: -conf spark.yarn.am.waitTime=500s
Partitioning
Under the hood, a Spark RDD (Resilient Distributed Dataset) is split into multiple partitions. These partitions will be distributed to the executors. If you don’t specify a number of partitions when creating an RDD, Spark will decide how partitioning will be done. This might not be ideal for your case.
For example if you have a job processing 50 inputs, it may be logical to split the RDD into 50 partitions, so each partition (and thus each input) is processed by a different executor:
from pyspark import SparkContext
= SparkContext()
sc = list(range(0, 49))
lst # make sure each input ends up in a different partition
= sc.parallelize(lst, len(lst))
rdd lambda l : your_function(l)) rdd.foreach(
Temporary files
While processing, it can happen that you need to store temporary data. These temporary files can be stored in the working directory of the YARN container. In a PySpark executor, the working directory can be accessed by referencing the current working directory as shown in the code snippet below:
import os
= os.path.join('.', tmp.tiff') tmp_file
Advantage of using the YARN working directory is that you don’t need to worry about cleaning up the temporary data, because this is done automatically when the container terminates, both on success and failure.
The YARN working directories are created on local data disks attached to the computing nodes. On these disks, disk space is limited, so make sure not to store more than 10GB of temporary data per task. If multiple tasks store too much data, the worker node will be flagged as lost and the tasks running on the worker node will fail.
Home directory
Some python programs will write temporary files to the user’s home folder by default. Since on the hadoop cluster users have no home folder, this defaults to /home/
which isn’t writable. To fix this, there are a couple options.
If the code is executed in the executors, you can override the HOME environment variable using --conf spark.executorEnv.HOME="." \
to make the HOME env point to the spark working directory.
However, for the driver the HOME environment variable can’t be overwritten. So if you need to change the HOME environment variable for code executed in the driver, you’ll need to set the HOME environment variable in your code. For Python, this can be done using:
import os
'HOME'] = '.' os.environ[
Handling preemption
As explained in Spark Resource Management, each queue gets a guaranteed minimum share of all cluster resources. If additional resources are available, because some queues don’t use their minimum guaranteed capacity, a queue can exceed this minimum guaranteed capacity. This ensures that the cluster resources are optimally used. However, when another queue is underutilized and a new job is submitted in that queue, these additional resources can be re-claimed and tasks of a job running in a queue consuming too much resources can be killed. In YARN, this concept is called preemption. These tasks will be automatically rescheduled, but your job needs to be able to handle this correctly, especially when you e.g. save state about the status of the tasks yourself. In Python this can be done using a signal handler:
def handle_signal(signal, frame):
# here goes your code
# catch signals to update state when job is killed
# trapping SIGKILL and SIGSTOP is not possible
# in case job is killed via Yarn signal.signal(signal.SIGHUP, handle_signal)