Terrascope
  • Overview
  • Get started
  • Introduction
    • Terrascope Introduction
    • The Copernicus Programme
    • Registration and authentication
  • Data
    • Sentinel Missions
    • Sentinel-1
    • Sentinel-2
    • Sentinel-3
    • Sentinel-5P
    • PROBA-V mission
    • PROBA-V
    • SPOT-VGT mission
    • SPOT-VGT
    • Additional Products
  • APIs
    • catalogue APIs
    • OpenSearch
    • TerraCatalogueClient
    • STAC
    • Product download
    • Streamlined Data Access APIs
    • openEO
    • Additional Web-services
    • CropSAR Service
    • Web Map Service (WMS)
    • Web Map Tile Service (WMTS)
  • Tools
    • Terrascope GUI
    • Terrascope Viewer
    • openEO web editor
    • Virtual Environments
    • Virtual Machine
    • JupyterLab
    • Hadoop Cluster
    • EOplaza
    • Getting started
    • Manage your Organisation
    • Publish a Service
    • Execute a Service
    • Manage a Service
    • Reporting
  • Quotas and Limitations
  • Support
    • Contact
    • Terrascope Forum
    • Terrascope Sample Examples
  • FAQ

On this page

  • SparkContext initialization
  • Partitioning
  • Temporary files
  • Home directory
  • Handling preemption

Spark best practices

When working with Spark on the Hadoop cluster, there are some things you should be aware of. Below, you can find some best practices which will help integrating your code properly.

SparkContext initialization

In your Spark driver, it is best practice to first initialize the SparkContext before doing other logic like collecting the inputs that needs to be processed. Otherwise you can run into a timeout issue because creating the SparkContext took too long.

Sample exception in case SparkContext creation times out:

20/10/21 11:01:49 ERROR ApplicationMaster: Uncaught exception:
    java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
    at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:345)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply$mcV$sp(ApplicationMaster.scala:260)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$5.run(ApplicationMaster.scala:815)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
    at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:814)
    at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:259)
    at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:839)
    at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)

If the suggested approach doesn’t help, you can also increase the spark.yarn.am.waitTime (default is 100 seconds), e.g: -conf spark.yarn.am.waitTime=500s

Partitioning

Under the hood, a Spark RDD (Resilient Distributed Dataset) is split into multiple partitions. These partitions will be distributed to the executors. If you don’t specify a number of partitions when creating an RDD, Spark will decide how partitioning will be done. This might not be ideal for your case.

For example if you have a job processing 50 inputs, it may be logical to split the RDD into 50 partitions, so each partition (and thus each input) is processed by a different executor:

from pyspark import SparkContext
sc = SparkContext()
lst = list(range(0, 49))
# make sure each input ends up in a different partition
rdd = sc.parallelize(lst, len(lst))
rdd.foreach(lambda l : your_function(l))

Temporary files

While processing, it can happen that you need to store temporary data. These temporary files can be stored in the working directory of the YARN container. In a PySpark executor, the working directory can be accessed by referencing the current working directory as shown in the code snippet below:

import os
tmp_file = os.path.join('.', tmp.tiff')

Advantage of using the YARN working directory is that you don’t need to worry about cleaning up the temporary data, because this is done automatically when the container terminates, both on success and failure.

The YARN working directories are created on local data disks attached to the computing nodes. On these disks, disk space is limited, so make sure not to store more than 10GB of temporary data per task. If multiple tasks store too much data, the worker node will be flagged as lost and the tasks running on the worker node will fail.

Home directory

Some python programs will write temporary files to the user’s home folder by default. Since on the hadoop cluster users have no home folder, this defaults to /home/ which isn’t writable. To fix this, there are a couple options.

If the code is executed in the executors, you can override the HOME environment variable using --conf spark.executorEnv.HOME="." \ to make the HOME env point to the spark working directory.

However, for the driver the HOME environment variable can’t be overwritten. So if you need to change the HOME environment variable for code executed in the driver, you’ll need to set the HOME environment variable in your code. For Python, this can be done using:

import os
os.environ['HOME'] = '.'

Handling preemption

As explained in Spark Resource Management, each queue gets a guaranteed minimum share of all cluster resources. If additional resources are available, because some queues don’t use their minimum guaranteed capacity, a queue can exceed this minimum guaranteed capacity. This ensures that the cluster resources are optimally used. However, when another queue is underutilized and a new job is submitted in that queue, these additional resources can be re-claimed and tasks of a job running in a queue consuming too much resources can be killed. In YARN, this concept is called preemption. These tasks will be automatically rescheduled, but your job needs to be able to handle this correctly, especially when you e.g. save state about the status of the tasks yourself. In Python this can be done using a signal handler:

def handle_signal(signal, frame):
    # here goes your code

# catch signals to update state when job is killed
# trapping SIGKILL and SIGSTOP is not possible
signal.signal(signal.SIGHUP, handle_signal) # in case job is killed via Yarn
Back to top

Copyright 2018 - 2024 VITO NV All Rights reserved

 
  • Terms of Use

  • Privacy declaration

  • Cookie Policy

  • Contact