Quickstart¶
intake-spark
provides quick and easy access to data via
Apache Spark
Installation¶
To use this plugin for intake, install with the following command:
conda install -c conda-forge intake-spark
Usage¶
Establishing a Context¶
Operations on a Spark cluster are achieved via a “context”, which is a python-side client to the remote system. All use of this package require a valid context in order to work. There are two ways to establish the context:
use the function
intake_spark.base.SparkHolder.set_class_session()
, passing forcontext=
andsession=
the respective existing objects that have been created previously using conventional means. In this case, Spark connection-specific parameters are not encoded in the catalog. Since intake-spark usesgetOrCreate()
, the existence of the objects should be enough for them to be picked up at data access time, but we still recommend calling the set function explicitly. Note that if only using RDDs and no SQl functionality, thesession
need not be created or provided.the same function can also take a set of parameters, in which case intake-spark will attempt to create the context and session for you, passing the parameters (master, app_name, executor_env, spark_home and config parameters) on to Spark. In this case, the parameters can be stored in a catalog (the
context_kwargs
parameter, a dictionary). If providing an empty set of parameters, Spark will create the default local context, which is useful only for testing.
# in-code example
from intake_spark.base import SparkHolder
import intake
SparkHolder.set_class_session(master='spark://myhost:7077', app_name='intake', hive=True)
Encoding Spark calls¶
Spark calls are often expressed as a chain of attribute look-ups with some being called as methods with arguments.
To encode these for Intake, we make each stage of the chain an element in a list, starting from the Context or
Session for the RDD and DataFrame versions of the driver. For example, the following encodes
sc.textfile('s3://bucket/files*.txt')
, i.e., a single element of attribute lookup
source = intake.open_spark_rdd([
['textFile', ['s3://bucket/files*.txt',]],
['map', [len, ]]
])
rdd = source.to_spark()
Here rdd
will be a pySpark RDD instance.
A more complicated example, for encoding spark.read.option("mergeSchema", "true").parquet("data/test_table")
for use with Intake
source = intake.open_spark_dataframe([
['read', ],
['option', ["mergeSchema", "true"]],
['parquet', ["data/test_table",]]
])
df = source.to_spark()
Note that you can pass functions using this formalism, but only encode python built-ins into a YAML file,
e.g., len => !!python/name:builtins.len ''
.
Using in a catalog¶
The above example could be expressed in YAML syntax as follows
sources:
spark_dataframe:
args:
args:
- - read
- - option
- - mergeSchema
- 'true'
- - parquet
- - data/test_table
context_kwargs:
master: "spark://myhost:7077"
app_name: intake
description: ''
driver: spark_dataframe
metadata: {}
Note the complex nesting pattern, and that in this case we are including arguments for creating an appropriate SparkContext and Session on the fly, if one has not already been made.