Skip to content
Snippets Groups Projects
user avatar
Josh Rosen authored
This patch significantly refactors CatalystTypeConverters to both clean up the code and enable these conversions to work with future Project Tungsten features.

At a high level, I've reorganized the code so that all functions dealing with the same type are grouped together into type-specific subclasses of `CatalystTypeConveter`.  In addition, I've added new methods that allow the Catalyst Row -> Scala Row conversions to access the Catalyst row's fields through type-specific `getTYPE()` methods rather than the generic `get()` / `Row.apply` methods.  This refactoring is a blocker to being able to unit test new operators that I'm developing as part of Project Tungsten, since those operators may output `UnsafeRow` instances which don't support the generic `get()`.

The stricter type usage of types here has uncovered some bugs in other parts of Spark SQL:

- #6217: DescribeCommand is assigned wrong output attributes in SparkStrategies
- #6218: DataFrame.describe() should cast all aggregates to String
- #6400: Use output schema, not relation schema, for data source input conversion

Spark SQL current has undefined behavior for what happens when you try to create a DataFrame from user-specified rows whose values don't match the declared schema.  According to the `createDataFrame()` Scaladoc:

>  It is important to make sure that the structure of every [[Row]] of the provided RDD matches the provided schema. Otherwise, there will be runtime exception.

Given this, it sounds like it's technically not a break of our API contract to fail-fast when the data types don't match. However, there appear to be many cases where we don't fail even though the types don't match. For example, `JavaHashingTFSuite.hasingTF` passes a column of integers values for a "label" column which is supposed to contain floats.  This column isn't actually read or modified as part of query processing, so its actual concrete type doesn't seem to matter. In other cases, there could be situations where we have generic numeric aggregates that tolerate being called with different numeric types than the schema specified, but this can be okay due to numeric conversions.

In the long run, we will probably want to come up with precise semantics for implicit type conversions / widening when converting Java / Scala rows to Catalyst rows.  Until then, though, I think that failing fast with a ClassCastException is a reasonable behavior; this is the approach taken in this patch.  Note that certain optimizations in the inbound conversion functions for primitive types mean that we'll probably preserve the old undefined behavior in a majority of cases.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #6222 from JoshRosen/catalyst-converters-refactoring and squashes the following commits:

740341b [Josh Rosen] Optimize method dispatch for primitive type conversions
befc613 [Josh Rosen] Add tests to document Option-handling behavior.
5989593 [Josh Rosen] Use new SparkFunSuite base in CatalystTypeConvertersSuite
6edf7f8 [Josh Rosen] Re-add convertToScala(), since a Hive test still needs it
3f7b2d8 [Josh Rosen] Initialize converters lazily so that the attributes are resolved first
6ad0ebb [Josh Rosen] Fix JavaHashingTFSuite ClassCastException
677ff27 [Josh Rosen] Fix null handling bug; add tests.
8033d4c [Josh Rosen] Fix serialization error in UserDefinedGenerator.
85bba9d [Josh Rosen] Fix wrong input data in InMemoryColumnarQuerySuite
9c0e4e1 [Josh Rosen] Remove last use of convertToScala().
ae3278d [Josh Rosen] Throw ClassCastException errors during inbound conversions.
7ca7fcb [Josh Rosen] Comments and cleanup
1e87a45 [Josh Rosen] WIP refactoring of CatalystTypeConverters
cafd5056
History

Apache Spark

Spark is a fast and general cluster computing system for Big Data. It provides high-level APIs in Scala, Java, and Python, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for stream processing.

http://spark.apache.org/

Online Documentation

You can find the latest Spark documentation, including a programming guide, on the project web page and project wiki. This README file only contains basic setup instructions.

Building Spark

Spark is built using Apache Maven. To build Spark and its example programs, run:

build/mvn -DskipTests clean package

(You do not need to do this if you downloaded a pre-built package.) More detailed documentation is available from the project site, at "Building Spark".

Interactive Scala Shell

The easiest way to start using Spark is through the Scala shell:

./bin/spark-shell

Try the following command, which should return 1000:

scala> sc.parallelize(1 to 1000).count()

Interactive Python Shell

Alternatively, if you prefer Python, you can use the Python shell:

./bin/pyspark

And run the following command, which should also return 1000:

>>> sc.parallelize(range(1000)).count()

Example Programs

Spark also comes with several sample programs in the examples directory. To run one of them, use ./bin/run-example <class> [params]. For example:

./bin/run-example SparkPi

will run the Pi example locally.

You can set the MASTER environment variable when running examples to submit examples to a cluster. This can be a mesos:// or spark:// URL, "yarn-cluster" or "yarn-client" to run on YARN, and "local" to run locally with one thread, or "local[N]" to run locally with N threads. You can also use an abbreviated class name if the class is in the examples package. For instance:

MASTER=spark://host:7077 ./bin/run-example SparkPi

Many of the example programs print usage help if no params are given.

Running Tests

Testing first requires building Spark. Once Spark is built, tests can be run using:

./dev/run-tests

Please see the guidance on how to run tests for a module, or individual tests.

A Note About Hadoop Versions

Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported storage systems. Because the protocols have changed in different versions of Hadoop, you must build Spark against the same version that your cluster runs.

Please refer to the build documentation at "Specifying the Hadoop Version" for detailed guidance on building for a particular distribution of Hadoop, including building for particular Hive and Hive Thriftserver distributions. See also "Third Party Hadoop Distributions" for guidance on building a Spark application that works with a particular distribution.

Configuration

Please refer to the Configuration guide in the online documentation for an overview on how to configure Spark.