SlurmSpark

SlurmSpark is an RSS developed solution for standing up Spark clusters within a Slurm Job Allocation

Note: Please read and understand the document Basic Usage of SlurmSpark before continuing.

Examples adapted from:


SlurmSpark Shell

In this example we will estimate Pi.

First, start the SlurmSpark shell using ./slurmspark-shell.sh.

Command:

./slurmspark-shell.sh -j job_files/sbatch-srun-spark.sh

Output:

[quinnm@node SlurmSpark]$ ./slurmspark-shell.sh -j job_files/sbatch-srun-spark.sh
Submitted batch job 4635082
Waiting for SlurmSpark cluster...
Found the SlurmSpark master at spark://10.46.70.222:7077
Submitting "--name SlurmSparkShellExample" via spark-shell
srun: job 4635085 queued and waiting for resources
srun: job 4635085 has been allocated resources
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_151)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Once you have a scala> prompt, enter in the following:

Command:

val count = sc.parallelize(1 to 1000).filter { _ =>
  val x = math.random
  val y = math.random
  x*x + y*y < 1
}.count()
println(s"Pi is roughly ${4.0 * count / 1000}")

Output:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_151)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val count = sc.parallelize(1 to 1000).filter { _ =>
     |   val x = math.random
     |   val y = math.random
     |   x*x + y*y < 1
     | }.count()
count: Long = 777

scala> println(s"Pi is roughly ${4.0 * count / 1000}")
Pi is roughly 3.108

scala>

SlurmSpark Submit

In this example, we will also estimate Pi, but using a non-interactive job submitted to the Spark Cluster.

Command:

./slurmspark-submit.sh -j job_files/sbatch-srun-spark.sh -S "--class org.apache.spark.examples.SparkPi --executor-memory 1G --total-executor-cores 1 ${SPARK_HOME}/examples/jars/spark-examples_2.11-2.2.0.jar 1000"

Output:

[quinnm@node SlurmSpark]$ ./slurmspark-submit.sh -j job_files/sbatch-srun-spark.sh -S "--class org.apache.spark.examples.SparkPi --executor-memory 1G --total-executor-cores 1 ${SPARK_HOME}/examples/jars/spark-examples_2.11-2.2.0.jar 1000"
Submitted batch job 4635087
Waiting for SlurmSpark cluster...
[quinnm@lewis4-r710-login-node223 SlurmSpark]$ ./slurmspark-submit.sh -j job_files/sbatch-srun-spark.sh -S "--class org.apache.spark.examples.SparkPi --executor-memory 1G --total-executor-cores 1 ${SPARK_HOME}/examples/jars/spark-examples_2.11-2.2.0.jar 1000"
Submitted batch job 4635087
Waiting for SlurmSpark cluster...
Found the SlurmSpark master at spark://10.46.70.222:7077
Submitting "--class org.apache.spark.examples.SparkPi --executor-memory 1G --total-executor-cores 1 /cluster/software/spark/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar 1000" via spark-submit
srun: job 4635088 queued and waiting for resources
srun: job 4635088 has been allocated resources
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/02/20 09:14:28 INFO SparkContext: Running Spark version 2.2.0
...
18/02/20 09:14:56 INFO TaskSetManager: Finished task 998.0 in stage 0.0 (TID 998) in 10 ms on 10.46.70.197 (executor 0) (999/1000)
18/02/20 09:14:56 INFO TaskSetManager: Finished task 999.0 in stage 0.0 (TID 999) in 13 ms on 10.46.70.197 (executor 0) (1000/1000)
18/02/20 09:14:56 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
18/02/20 09:14:56 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 22.752 s
18/02/20 09:14:56 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 23.261414 s
Pi is roughly 3.141337711413377
18/02/20 09:14:56 INFO SparkUI: Stopped Spark web UI at http://10.46.70.196:4040
18/02/20 09:14:56 INFO StandaloneSchedulerBackend: Shutting down all executors
18/02/20 09:14:56 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
18/02/20 09:14:56 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/02/20 09:14:56 INFO MemoryStore: MemoryStore cleared
18/02/20 09:14:56 INFO BlockManager: BlockManager stopped
18/02/20 09:14:56 INFO BlockManagerMaster: BlockManagerMaster stopped
18/02/20 09:14:56 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/02/20 09:14:56 INFO SparkContext: Successfully stopped SparkContext
18/02/20 09:14:56 INFO ShutdownHookManager: Shutdown hook called
18/02/20 09:14:56 INFO ShutdownHookManager: Deleting directory /tmp/spark-d0ece1ad-251d-437c-aa4c-cfb963f3f122
Destroying SlurmSpark cluster...
[quinnm@node SlurmSpark]$

SlurmSpark Shell and SparkSQL

In this example we will read a json file into Spark, then using SparkSQL to query it using SQL syntax.

First, start the SlurmSpark shell.

./slurmspark-shell.sh -j job_files/sbatch-srun-spark.sh

Output:

[quinnm@node SlurmSpark]$ ./slurmspark-shell.sh -j job_files/sbatch-srun-spark.sh
Submitted batch job 4635082
Waiting for SlurmSpark cluster...
Found the SlurmSpark master at spark://10.46.70.222:7077
Submitting "--name SlurmSparkShellExample" via spark-shell
srun: job 4635085 queued and waiting for resources
srun: job 4635085 has been allocated resources
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_151)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Once you have a scala> prompt, create a DataFrame based on the content of the example json file examples/src/main/resources/people.json

Command:

val df = spark.read.json("/cluster/software/spark/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.json")
df.show()

Output:

scala> val df = spark.read.json("/cluster/software/spark/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()
df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


scala>

Now you can run SQL queries programmatically.

Command:

df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

Output:

scala> df.createOrReplaceTempView("people")
df.createOrReplaceTempView("people")

scala> val sqlDF = spark.sql("SELECT * FROM people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> sqlDF.show()
sqlDF.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


scala>

SlurmSpark Shell and SparkSQL with HDFS

In this example we will read a json file located on an HDFS into Spark, then use SparkSQL to query it using SQL syntax. You will need to load an HDFS module before interacting with HDFS.

First, start the SlurmSpark shell.

./slurmspark-shell.sh -j job_files/sbatch-srun-spark.sh

Output:

[quinnm@node SlurmSpark]$ ./slurmspark-shell.sh -j job_files/sbatch-srun-spark.sh
Submitted batch job 4635082
Waiting for SlurmSpark cluster...
Found the SlurmSpark master at spark://10.46.70.222:7077
Submitting "--name SlurmSparkShellExample" via spark-shell
srun: job 4635085 queued and waiting for resources
srun: job 4635085 has been allocated resources
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_151)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Once you have a scala> prompt, create a DataFrame based on the content of the example json file /testing/people.json

Command:

val df = spark.read.json("/testing/people.json")
df.show()

Output:

scala> val df = spark.read.json("/testing/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()
df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


scala>

Now you can run SQL queries programmatically.

Command:

df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

Output:

scala> df.createOrReplaceTempView("people")
df.createOrReplaceTempView("people")

scala> val sqlDF = spark.sql("SELECT * FROM people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> sqlDF.show()
sqlDF.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


scala>