Source code for intake_spark.spark_sources

from intake.source.base import DataSource, Schema
from .base import SparkHolder
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions


[docs]class SparkRDD(DataSource): """A reference to an RDD definition in Spark RDDs are list-of-things objects, evaluated lazily in Spark. Examples -------- >>> args = [('textFile', ('text.*.files', )), ... ('map', (len,))] >>> context = {'master': 'spark://master.node:7077'} >>> source = SparkRDD(args, context) The output of `source.to_spark()` is an RDD object holding the lengths of the lines of the input files. """ container = 'python' version = __version__ name = 'spark_rdd' partition_access = True def __init__(self, args, context_kwargs=None, metadata=None): """ Parameters ---------- args, context_kwargs: Passed on to SparkHolder, see its docstrings and the examples. metadata: dict Arbitrary data to associate with this source. """ super(SparkRDD, self).__init__(metadata) self.holder = SparkHolder(False, args, context_kwargs) self.ref = None def _get_schema(self): if self.ref is None: self.ref = self.holder.setup() self.npartitions = self.ref.getNumPartitions() return Schema(npartitions=self.npartitions, extra_metadata=self.metadata)
[docs] def read_partition(self, i): """Returns one of the partitions of the RDD as a list of objects""" self._get_schema() sc = self.holder.sc[0] return sc.runJob(self.ref, lambda x: x, partitions=[i])
[docs] def to_spark(self): """Return the spark object for this data, an RDD""" self._get_schema() return self.ref
[docs] def read(self): """Materialise the whole RDD into a list of objects""" self._get_schema() return self.ref.collect()
def _close(self): self.ref = None
[docs]class SparkDataFrame(DataSource): """A reference to a DataFrame definition in Spark DataFrames are tabular spark objects containing a heterogeneous set of columns and potentially a large number of rows. They are similar in concept to Pandas or Dask data-frames. The Spark variety produced by this driver will be a handle to a lazy object, where computation will be managed by Spark. Examples -------- >>> args = [ ... ['read', ], ... ['format', ['csv', ]], ... ['option', ['header', 'true']], ... ['load', ['data.*.csv', ]] ... ] >>> context = {'master': 'spark://master.node:7077'} >>> source = SparkDataFrame(args, context) The output of `source.to_spark()` contains a spark object pointing to the parsed contents of the indicated CSV files """ container = 'dataframe' version = __version__ name = 'spark_dataframe' partition_access = True def __init__(self, args, context_kwargs=None, metadata=None): """ Parameters ---------- args, context_kwargs: Passed on to SparkHolder, see its docstrings and the examples. metadata: dict Arbitrary data to associate with this source. """ super(SparkDataFrame, self).__init__(metadata) self.holder = SparkHolder(True, args, context_kwargs) self.ref = None def _get_schema(self): if self.ref is None: self.ref = self.holder.setup() self.npartitions = self.ref.rdd.getNumPartitions() rows = self.ref.take(10) self.dtype = pandas_dtypes(self.ref.schema, rows) self.shape = (None, len(self.dtype)) return Schema(npartitions=self.npartitions, extra_metadata=self.metadata, dtype=self.dtype, shape=self.shape)
[docs] def read_partition(self, i): """Returns one partition of the data as a pandas data-frame""" import pandas as pd self._get_schema() sc = self.holder.sc[0] out = sc.runJob(self.ref.rdd, lambda x: x, partitions=[i]) df = pd.DataFrame.from_records(out) df.columns = list(self.dtype) return df
[docs] def to_spark(self): """Return the Spark object for this data, a DataFrame""" self._get_schema() return self.ref
[docs] def read(self): """Read all of the data into an in-memory Pandas data-frame""" self._get_schema() return self.ref.toPandas()
def _close(self): self.ref = None
def _to_corrected_pandas_type(dt): # Copied from pyspark 2.3 """ When converting Spark SQL records to Pandas DataFrame, the inferred data type may be wrong. This method gets the corrected data type for Pandas if that type may be inferred uncorrectly. """ import numpy as np from pyspark.sql.types import ByteType, ShortType, IntegerType, FloatType if type(dt) == ByteType: return np.int8 elif type(dt) == ShortType: return np.int16 elif type(dt) == IntegerType: return np.int32 elif type(dt) == FloatType: return np.float32 else: return None def pandas_dtypes(schema, rows): """Rough dtype for the given pyspark schema""" import pandas as pd from pyspark.sql.types import IntegralType # copied from toPandas() method df = pd.DataFrame.from_records(rows) df.columns = [s.name for s in schema] for field in schema: pandas_type = _to_corrected_pandas_type(field.dataType) if pandas_type is not None and not( isinstance(field.dataType, IntegralType) and field.nullable): df[field.name] = df[field.name].astype(pandas_type) return {k: str(v) for k, v in df.dtypes.to_dict().items()}