Skip to content
Snippets Groups Projects
Commit d2f4f30b authored by Yin Huai's avatar Yin Huai Committed by Reynold Xin
Browse files

[SPARK-2060][SQL] Querying JSON Datasets with SQL and DSL in Spark SQL

JIRA: https://issues.apache.org/jira/browse/SPARK-2060

Programming guide: http://yhuai.github.io/site/sql-programming-guide.html

Scala doc of SQLContext: http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.SQLContext

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #999 from yhuai/newJson and squashes the following commits:

227e89e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
ce8eedd [Yin Huai] rxin's comments.
bc9ac51 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
94ffdaa [Yin Huai] Remove "get" from method names.
ce31c81 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
e2773a6 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
79ea9ba [Yin Huai] Fix typos.
5428451 [Yin Huai] Newline
1f908ce [Yin Huai] Remove extra line.
d7a005c [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
7ea750e [Yin Huai] marmbrus's comments.
6a5f5ef [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
83013fb [Yin Huai] Update Java Example.
e7a6c19 [Yin Huai] SchemaRDD.javaToPython should convert a field with the StructType to a Map.
6d20b85 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
4fbddf0 [Yin Huai] Programming guide.
9df8c5a [Yin Huai] Python API.
7027634 [Yin Huai] Java API.
cff84cc [Yin Huai] Use a SchemaRDD for a JSON dataset.
d0bd412 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
ab810b0 [Yin Huai] Make JsonRDD private.
6df0891 [Yin Huai] Apache header.
8347f2e [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
66f9e76 [Yin Huai] Update docs and use the entire dataset to infer the schema.
8ffed79 [Yin Huai] Update the example.
a5a4b52 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
4325475 [Yin Huai] If a sampled dataset is used for schema inferring, update the schema of the JsonTable after first execution.
65b87f0 [Yin Huai] Fix sampling...
8846af5 [Yin Huai] API doc.
52a2275 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
0387523 [Yin Huai] Address PR comments.
666b957 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
a2313a6 [Yin Huai] Address PR comments.
f3ce176 [Yin Huai] After type conflict resolution, if a NullType is found, StringType is used.
0576406 [Yin Huai] Add Apache license header.
af91b23 [Yin Huai] Merge remote-tracking branch 'upstream/master' into newJson
f45583b [Yin Huai] Infer the schema of a JSON dataset (a text file with one JSON object per line or a RDD[String] with one JSON object per string) and returns a SchemaRDD.
f31065f [Yin Huai] A query plan or a SchemaRDD can print out its schema.
parent b2ebf429
No related branches found
No related tags found
No related merge requests found
Showing
with 995 additions and 108 deletions
......@@ -22,6 +22,7 @@ spark-env.sh.template
log4j-defaults.properties
sorttable.js
.*txt
.*json
.*data
.*log
cloudpickle.py
......
This diff is collapsed.
......@@ -18,6 +18,7 @@
package org.apache.spark.examples.sql;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
......@@ -56,6 +57,7 @@ public class JavaSparkSQL {
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaSQLContext sqlCtx = new JavaSQLContext(ctx);
System.out.println("=== Data source: RDD ===");
// Load a text file and convert each line to a Java Bean.
JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").map(
new Function<String, Person>() {
......@@ -84,16 +86,88 @@ public class JavaSparkSQL {
return "Name: " + row.getString(0);
}
}).collect();
for (String name: teenagerNames) {
System.out.println(name);
}
System.out.println("=== Data source: Parquet File ===");
// JavaSchemaRDDs can be saved as parquet files, maintaining the schema information.
schemaPeople.saveAsParquetFile("people.parquet");
// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
// Read in the parquet file created above.
// Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a JavaSchemaRDD.
JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable("parquetFile");
JavaSchemaRDD teenagers2 = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
JavaSchemaRDD teenagers2 =
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
teenagerNames = teenagers2.map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
for (String name: teenagerNames) {
System.out.println(name);
}
System.out.println("=== Data source: JSON Dataset ===");
// A JSON dataset is pointed by path.
// The path can be either a single text file or a directory storing text files.
String path = "examples/src/main/resources/people.json";
// Create a JavaSchemaRDD from the file(s) pointed by path
JavaSchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path);
// Because the schema of a JSON dataset is automatically inferred, to write queries,
// it is better to take a look at what is the schema.
peopleFromJsonFile.printSchema();
// The schema of people is ...
// root
// |-- age: IntegerType
// |-- name: StringType
// Register this JavaSchemaRDD as a table.
peopleFromJsonFile.registerAsTable("people");
// SQL statements can be run by using the sql methods provided by sqlCtx.
JavaSchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
// The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
teenagerNames = teenagers3.map(new Function<Row, String>() {
public String call(Row row) { return "Name: " + row.getString(0); }
}).collect();
for (String name: teenagerNames) {
System.out.println(name);
}
// Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by
// a RDD[String] storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
JavaSchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD);
// Take a look at the schema of this new JavaSchemaRDD.
peopleFromJsonRDD.printSchema();
// The schema of anotherPeople is ...
// root
// |-- address: StructType
// | |-- city: StringType
// | |-- state: StringType
// |-- name: StringType
peopleFromJsonRDD.registerAsTable("people2");
JavaSchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
List<String> nameAndCity = peopleWithCity.map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0) + ", City: " + row.getString(1);
}
}).collect();
for (String name: nameAndCity) {
System.out.println(name);
}
}
}
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
......@@ -76,7 +76,7 @@ object SparkBuild extends Build {
lazy val catalyst = Project("catalyst", file("sql/catalyst"), settings = catalystSettings) dependsOn(core)
lazy val sql = Project("sql", file("sql/core"), settings = sqlCoreSettings) dependsOn(core, catalyst)
lazy val sql = Project("sql", file("sql/core"), settings = sqlCoreSettings) dependsOn(core) dependsOn(catalyst % "compile->compile;test->test")
lazy val hive = Project("hive", file("sql/hive"), settings = hiveSettings) dependsOn(sql)
......@@ -501,9 +501,23 @@ object SparkBuild extends Build {
def sqlCoreSettings = sharedSettings ++ Seq(
name := "spark-sql",
libraryDependencies ++= Seq(
"com.twitter" % "parquet-column" % parquetVersion,
"com.twitter" % "parquet-hadoop" % parquetVersion
)
"com.twitter" % "parquet-column" % parquetVersion,
"com.twitter" % "parquet-hadoop" % parquetVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % "2.3.0" // json4s-jackson 3.2.6 requires jackson-databind 2.3.0.
),
initialCommands in console :=
"""
|import org.apache.spark.sql.catalyst.analysis._
|import org.apache.spark.sql.catalyst.dsl._
|import org.apache.spark.sql.catalyst.errors._
|import org.apache.spark.sql.catalyst.expressions._
|import org.apache.spark.sql.catalyst.plans.logical._
|import org.apache.spark.sql.catalyst.rules._
|import org.apache.spark.sql.catalyst.types._
|import org.apache.spark.sql.catalyst.util._
|import org.apache.spark.sql.execution
|import org.apache.spark.sql.test.TestSQLContext._
|import org.apache.spark.sql.parquet.ParquetTestData""".stripMargin
)
// Since we don't include hive in the main assembly this project also acts as an alternative
......
......@@ -15,7 +15,7 @@
# limitations under the License.
#
from pyspark.rdd import RDD
from pyspark.rdd import RDD, PipelinedRDD
from pyspark.serializers import BatchedSerializer, PickleSerializer
from py4j.protocol import Py4JError
......@@ -137,6 +137,53 @@ class SQLContext:
jschema_rdd = self._ssql_ctx.parquetFile(path)
return SchemaRDD(jschema_rdd, self)
def jsonFile(self, path):
"""Loads a text file storing one JSON object per line,
returning the result as a L{SchemaRDD}.
It goes through the entire dataset once to determine the schema.
>>> import tempfile, shutil
>>> jsonFile = tempfile.mkdtemp()
>>> shutil.rmtree(jsonFile)
>>> ofn = open(jsonFile, 'w')
>>> for json in jsonStrings:
... print>>ofn, json
>>> ofn.close()
>>> srdd = sqlCtx.jsonFile(jsonFile)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
>>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1")
>>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}},
... {"f1": 2, "f2": "row2", "f3":{"field4":22}},
... {"f1": 3, "f2": "row3", "f3":{"field4":33}}]
True
"""
jschema_rdd = self._ssql_ctx.jsonFile(path)
return SchemaRDD(jschema_rdd, self)
def jsonRDD(self, rdd):
"""Loads an RDD storing one JSON object per string, returning the result as a L{SchemaRDD}.
It goes through the entire dataset once to determine the schema.
>>> srdd = sqlCtx.jsonRDD(json)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
>>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1")
>>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}},
... {"f1": 2, "f2": "row2", "f3":{"field4":22}},
... {"f1": 3, "f2": "row3", "f3":{"field4":33}}]
True
"""
def func(split, iterator):
for x in iterator:
if not isinstance(x, basestring):
x = unicode(x)
yield x.encode("utf-8")
keyed = PipelinedRDD(rdd, func)
keyed._bypass_serializer = True
jrdd = keyed._jrdd.map(self._jvm.BytesToString())
jschema_rdd = self._ssql_ctx.jsonRDD(jrdd.rdd())
return SchemaRDD(jschema_rdd, self)
def sql(self, sqlQuery):
"""Return a L{SchemaRDD} representing the result of the given query.
......@@ -265,7 +312,7 @@ class SchemaRDD(RDD):
For normal L{pyspark.rdd.RDD} operations (map, count, etc.) the
L{SchemaRDD} is not operated on directly, as it's underlying
implementation is a RDD composed of Java objects. Instead it is
implementation is an RDD composed of Java objects. Instead it is
converted to a PythonRDD in the JVM, on which Python operations can
be done.
"""
......@@ -337,6 +384,14 @@ class SchemaRDD(RDD):
"""Creates a new table with the contents of this SchemaRDD."""
self._jschema_rdd.saveAsTable(tableName)
def schemaString(self):
"""Returns the output schema in the tree format."""
return self._jschema_rdd.schemaString()
def printSchema(self):
"""Prints out the schema in the tree format."""
print self.schemaString()
def count(self):
"""Return the number of elements in this RDD.
......@@ -436,6 +491,11 @@ def _test():
globs['sqlCtx'] = SQLContext(sc)
globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"},
{"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
jsonStrings = ['{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
'{"field1" : 2, "field2": "row2", "field3":{"field4":22}}',
'{"field1" : 3, "field2": "row3", "field3":{"field4":33}}']
globs['jsonStrings'] = jsonStrings
globs['json'] = sc.parallelize(jsonStrings)
globs['nestedRdd1'] = sc.parallelize([
{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
{"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}])
......
......@@ -66,6 +66,34 @@
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
<!--
This plugin forces the generation of jar containing catalyst test classes,
so that the tests classes of external modules can use them. The two execution profiles
are necessary - first one for 'mvn package', second one for 'mvn compile'. Ideally,
'mvn compile' should not compile test classes and therefore should not need this.
However, an open Maven bug (http://jira.codehaus.org/browse/MNG-3559)
causes the compilation to fail if catalyst test-jar is not generated. Hence, the
second execution profile for 'mvn compile'.
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
<execution>
<id>test-jar-on-compile</id>
<phase>compile</phase>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
......@@ -22,6 +22,16 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.types._
object HiveTypeCoercion {
// See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
// The conversion for integral and floating point types have a linear widening hierarchy:
val numericPrecedence =
Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType)
// Boolean is only wider than Void
val booleanPrecedence = Seq(NullType, BooleanType)
val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil
}
/**
* A collection of [[catalyst.rules.Rule Rules]] that can be used to coerce differing types that
* participate in operations into compatible ones. Most of these rules are based on Hive semantics,
......@@ -116,19 +126,18 @@ trait HiveTypeCoercion {
*
* Additionally, all types when UNION-ed with strings will be promoted to strings.
* Other string conversions are handled by PromoteStrings.
*
* Widening types might result in loss of precision in the following cases:
* - IntegerType to FloatType
* - LongType to FloatType
* - LongType to DoubleType
*/
object WidenTypes extends Rule[LogicalPlan] {
// See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
// The conversion for integral and floating point types have a linear widening hierarchy:
val numericPrecedence =
Seq(NullType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DecimalType)
// Boolean is only wider than Void
val booleanPrecedence = Seq(NullType, BooleanType)
val allPromotions: Seq[Seq[DataType]] = numericPrecedence :: booleanPrecedence :: Nil
def findTightestCommonType(t1: DataType, t2: DataType): Option[DataType] = {
// Try and find a promotion rule that contains both types in question.
val applicableConversion = allPromotions.find(p => p.contains(t1) && p.contains(t2))
val applicableConversion =
HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p.contains(t2))
// If found return the widest common type, otherwise None
applicableConversion.map(_.filter(t => t == t1 || t == t2).last)
......
......@@ -18,7 +18,9 @@
package org.apache.spark.sql.catalyst.plans
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.types.{ArrayType, DataType, StructField, StructType}
abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {
self: PlanType with Product =>
......@@ -123,4 +125,53 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
case other => Nil
}.toSeq
}
protected def generateSchemaString(schema: Seq[Attribute]): String = {
val builder = new StringBuilder
builder.append("root\n")
val prefix = " |"
schema.foreach { attribute =>
val name = attribute.name
val dataType = attribute.dataType
dataType match {
case fields: StructType =>
builder.append(s"$prefix-- $name: $StructType\n")
generateSchemaString(fields, s"$prefix |", builder)
case ArrayType(fields: StructType) =>
builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
generateSchemaString(fields, s"$prefix |", builder)
case ArrayType(elementType: DataType) =>
builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
case _ => builder.append(s"$prefix-- $name: $dataType\n")
}
}
builder.toString()
}
protected def generateSchemaString(
schema: StructType,
prefix: String,
builder: StringBuilder): StringBuilder = {
schema.fields.foreach {
case StructField(name, fields: StructType, _) =>
builder.append(s"$prefix-- $name: $StructType\n")
generateSchemaString(fields, s"$prefix |", builder)
case StructField(name, ArrayType(fields: StructType), _) =>
builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
generateSchemaString(fields, s"$prefix |", builder)
case StructField(name, ArrayType(elementType: DataType), _) =>
builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
case StructField(name, fieldType: DataType, _) =>
builder.append(s"$prefix-- $name: $fieldType\n")
}
builder
}
/** Returns the output schema in the tree format. */
def schemaString: String = generateSchemaString(output)
/** Prints out the schema in the tree format */
def printSchema(): Unit = println(schemaString)
}
......@@ -18,11 +18,12 @@
package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
class CombiningLimitsSuite extends OptimizerTest {
class CombiningLimitsSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
......
......@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.types._
......@@ -27,7 +28,7 @@ import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
class ConstantFoldingSuite extends OptimizerTest {
class ConstantFoldingSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
......
......@@ -20,13 +20,12 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.LeftOuter
import org.apache.spark.sql.catalyst.plans.RightOuter
import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.dsl.expressions._
class FilterPushdownSuite extends OptimizerTest {
class FilterPushdownSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
......
......@@ -19,13 +19,14 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.rules._
/* Implicit conversions */
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
class SimplifyCaseConversionExpressionsSuite extends OptimizerTest {
class SimplifyCaseConversionExpressionsSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
......
......@@ -15,19 +15,18 @@
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.optimizer
package org.apache.spark.sql.catalyst.plans
import org.scalatest.FunSuite
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.{ExprId, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
/**
* Provides helper methods for comparing plans produced by optimization rules with the expected
* result
* Provides helper methods for comparing plans.
*/
class OptimizerTest extends FunSuite {
class PlanTest extends FunSuite {
/**
* Since attribute references are given globally unique ids during analysis,
......
......@@ -43,6 +43,13 @@
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-column</artifactId>
......@@ -53,6 +60,11 @@
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
......
......@@ -22,24 +22,22 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.dsl.ExpressionConversions
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.SparkStrategies
import org.apache.spark.sql.json._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.SparkContext
/**
* :: AlphaComponent ::
......@@ -53,7 +51,7 @@ import org.apache.spark.sql.parquet.ParquetRelation
class SQLContext(@transient val sparkContext: SparkContext)
extends Logging
with SQLConf
with dsl.ExpressionConversions
with ExpressionConversions
with Serializable {
self =>
......@@ -98,6 +96,39 @@ class SQLContext(@transient val sparkContext: SparkContext)
def parquetFile(path: String): SchemaRDD =
new SchemaRDD(this, parquet.ParquetRelation(path))
/**
* Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]].
* It goes through the entire dataset once to determine the schema.
*
* @group userf
*/
def jsonFile(path: String): SchemaRDD = jsonFile(path, 1.0)
/**
* :: Experimental ::
*/
@Experimental
def jsonFile(path: String, samplingRatio: Double): SchemaRDD = {
val json = sparkContext.textFile(path)
jsonRDD(json, samplingRatio)
}
/**
* Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
* [[SchemaRDD]].
* It goes through the entire dataset once to determine the schema.
*
* @group userf
*/
def jsonRDD(json: RDD[String]): SchemaRDD = jsonRDD(json, 1.0)
/**
* :: Experimental ::
*/
@Experimental
def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD =
new SchemaRDD(this, JsonRDD.inferSchema(json, samplingRatio))
/**
* :: Experimental ::
* Creates an empty parquet file with the schema of class `A`, which can be registered as a table.
......
......@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.types.BooleanType
import org.apache.spark.sql.catalyst.types.{DataType, StructType, BooleanType}
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
import org.apache.spark.api.java.JavaRDD
import java.util.{Map => JMap}
......@@ -41,8 +41,10 @@ import java.util.{Map => JMap}
* whose elements are scala case classes into a SchemaRDD. This conversion can also be done
* explicitly using the `createSchemaRDD` function on a [[SQLContext]].
*
* A `SchemaRDD` can also be created by loading data in from external sources, for example,
* by using the `parquetFile` method on [[SQLContext]].
* A `SchemaRDD` can also be created by loading data in from external sources.
* Examples are loading data from Parquet files by using by using the
* `parquetFile` method on [[SQLContext]], and loading JSON datasets
* by using `jsonFile` and `jsonRDD` methods on [[SQLContext]].
*
* == SQL Queries ==
* A SchemaRDD can be registered as a table in the [[SQLContext]] that was used to create it. Once
......@@ -341,14 +343,38 @@ class SchemaRDD(
*/
def toJavaSchemaRDD: JavaSchemaRDD = new JavaSchemaRDD(sqlContext, logicalPlan)
/**
* Converts a JavaRDD to a PythonRDD. It is used by pyspark.
*/
private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name)
def rowToMap(row: Row, structType: StructType): JMap[String, Any] = {
val fields = structType.fields.map(field => (field.name, field.dataType))
val map: JMap[String, Any] = new java.util.HashMap
row.zip(fields).foreach {
case (obj, (name, dataType)) =>
dataType match {
case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct))
case other => map.put(name, obj)
}
}
map
}
// TODO: Actually, the schema of a row should be represented by a StructType instead of
// a Seq[Attribute]. Once we have finished that change, we can just use rowToMap to
// construct the Map for python.
val fields: Seq[(String, DataType)] = this.queryExecution.analyzed.output.map(
field => (field.name, field.dataType))
this.mapPartitions { iter =>
val pickle = new Pickler
iter.map { row =>
val map: JMap[String, Any] = new java.util.HashMap
row.zip(fieldNames).foreach { case (obj, name) =>
map.put(name, obj)
row.zip(fields).foreach { case (obj, (name, dataType)) =>
dataType match {
case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct))
case other => map.put(name, obj)
}
}
map
}.grouped(10).map(batched => pickle.dumps(batched.toArray))
......
......@@ -122,4 +122,10 @@ private[sql] trait SchemaRDDLike {
@Experimental
def saveAsTable(tableName: String): Unit =
sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
/** Returns the output schema in the tree format. */
def schemaString: String = queryExecution.analyzed.schemaString
/** Prints out the schema in the tree format. */
def printSchema(): Unit = println(schemaString)
}
......@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.sql.json.JsonRDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
import org.apache.spark.sql.catalyst.types._
......@@ -100,6 +101,25 @@ class JavaSQLContext(val sqlContext: SQLContext) {
def parquetFile(path: String): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, ParquetRelation(path))
/**
* Loads a JSON file (one object per line), returning the result as a [[JavaSchemaRDD]].
* It goes through the entire dataset once to determine the schema.
*
* @group userf
*/
def jsonFile(path: String): JavaSchemaRDD =
jsonRDD(sqlContext.sparkContext.textFile(path))
/**
* Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
* [[JavaSchemaRDD]].
* It goes through the entire dataset once to determine the schema.
*
* @group userf
*/
def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(json, 1.0))
/**
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
* during the lifetime of this instance of SQLContext.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.json
import scala.collection.JavaConversions._
import scala.math.BigDecimal
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
import org.apache.spark.sql.Logging
private[sql] object JsonRDD extends Logging {
private[sql] def inferSchema(
json: RDD[String],
samplingRatio: Double = 1.0): LogicalPlan = {
require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
val schemaData = if (samplingRatio > 0.99) json else json.sample(false, samplingRatio, 1)
val allKeys = parseJson(schemaData).map(allKeysWithValueTypes).reduce(_ ++ _)
val baseSchema = createSchema(allKeys)
createLogicalPlan(json, baseSchema)
}
private def createLogicalPlan(
json: RDD[String],
baseSchema: StructType): LogicalPlan = {
val schema = nullTypeToStringType(baseSchema)
SparkLogicalPlan(ExistingRdd(asAttributes(schema), parseJson(json).map(asRow(_, schema))))
}
private def createSchema(allKeys: Set[(String, DataType)]): StructType = {
// Resolve type conflicts
val resolved = allKeys.groupBy {
case (key, dataType) => key
}.map {
// Now, keys and types are organized in the format of
// key -> Set(type1, type2, ...).
case (key, typeSet) => {
val fieldName = key.substring(1, key.length - 1).split("`.`").toSeq
val dataType = typeSet.map {
case (_, dataType) => dataType
}.reduce((type1: DataType, type2: DataType) => compatibleType(type1, type2))
(fieldName, dataType)
}
}
def makeStruct(values: Seq[Seq[String]], prefix: Seq[String]): StructType = {
val (topLevel, structLike) = values.partition(_.size == 1)
val topLevelFields = topLevel.filter {
name => resolved.get(prefix ++ name).get match {
case ArrayType(StructType(Nil)) => false
case ArrayType(_) => true
case struct: StructType => false
case _ => true
}
}.map {
a => StructField(a.head, resolved.get(prefix ++ a).get, nullable = true)
}
val structFields: Seq[StructField] = structLike.groupBy(_(0)).map {
case (name, fields) => {
val nestedFields = fields.map(_.tail)
val structType = makeStruct(nestedFields, prefix :+ name)
val dataType = resolved.get(prefix :+ name).get
dataType match {
case array: ArrayType => Some(StructField(name, ArrayType(structType), nullable = true))
case struct: StructType => Some(StructField(name, structType, nullable = true))
// dataType is StringType means that we have resolved type conflicts involving
// primitive types and complex types. So, the type of name has been relaxed to
// StringType. Also, this field should have already been put in topLevelFields.
case StringType => None
}
}
}.flatMap(field => field).toSeq
StructType(
(topLevelFields ++ structFields).sortBy {
case StructField(name, _, _) => name
})
}
makeStruct(resolved.keySet.toSeq, Nil)
}
/**
* Returns the most general data type for two given data types.
*/
private[json] def compatibleType(t1: DataType, t2: DataType): DataType = {
// Try and find a promotion rule that contains both types in question.
val applicableConversion = HiveTypeCoercion.allPromotions.find(p => p.contains(t1) && p
.contains(t2))
// If found return the widest common type, otherwise None
val returnType = applicableConversion.map(_.filter(t => t == t1 || t == t2).last)
if (returnType.isDefined) {
returnType.get
} else {
// t1 or t2 is a StructType, ArrayType, or an unexpected type.
(t1, t2) match {
case (other: DataType, NullType) => other
case (NullType, other: DataType) => other
case (StructType(fields1), StructType(fields2)) => {
val newFields = (fields1 ++ fields2).groupBy(field => field.name).map {
case (name, fieldTypes) => {
val dataType = fieldTypes.map(field => field.dataType).reduce(
(type1: DataType, type2: DataType) => compatibleType(type1, type2))
StructField(name, dataType, true)
}
}
StructType(newFields.toSeq.sortBy {
case StructField(name, _, _) => name
})
}
case (ArrayType(elementType1), ArrayType(elementType2)) =>
ArrayType(compatibleType(elementType1, elementType2))
// TODO: We should use JsonObjectStringType to mark that values of field will be
// strings and every string is a Json object.
case (_, _) => StringType
}
}
}
private def typeOfPrimitiveValue(value: Any): DataType = {
value match {
case value: java.lang.String => StringType
case value: java.lang.Integer => IntegerType
case value: java.lang.Long => LongType
// Since we do not have a data type backed by BigInteger,
// when we see a Java BigInteger, we use DecimalType.
case value: java.math.BigInteger => DecimalType
case value: java.lang.Double => DoubleType
case value: java.math.BigDecimal => DecimalType
case value: java.lang.Boolean => BooleanType
case null => NullType
// Unexpected data type.
case _ => StringType
}
}
/**
* Returns the element type of an JSON array. We go through all elements of this array
* to detect any possible type conflict. We use [[compatibleType]] to resolve
* type conflicts. Right now, when the element of an array is another array, we
* treat the element as String.
*/
private def typeOfArray(l: Seq[Any]): ArrayType = {
val elements = l.flatMap(v => Option(v))
if (elements.isEmpty) {
// If this JSON array is empty, we use NullType as a placeholder.
// If this array is not empty in other JSON objects, we can resolve
// the type after we have passed through all JSON objects.
ArrayType(NullType)
} else {
val elementType = elements.map {
e => e match {
case map: Map[_, _] => StructType(Nil)
// We have an array of arrays. If those element arrays do not have the same
// element types, we will return ArrayType[StringType].
case seq: Seq[_] => typeOfArray(seq)
case value => typeOfPrimitiveValue(value)
}
}.reduce((type1: DataType, type2: DataType) => compatibleType(type1, type2))
ArrayType(elementType)
}
}
/**
* Figures out all key names and data types of values from a parsed JSON object
* (in the format of Map[Stirng, Any]). When the value of a key is an JSON object, we
* only use a placeholder (StructType(Nil)) to mark that it should be a struct
* instead of getting all fields of this struct because a field does not appear
* in this JSON object can appear in other JSON objects.
*/
private def allKeysWithValueTypes(m: Map[String, Any]): Set[(String, DataType)] = {
m.map{
// Quote the key with backticks to handle cases which have dots
// in the field name.
case (key, dataType) => (s"`$key`", dataType)
}.flatMap {
case (key: String, struct: Map[String, Any]) => {
// The value associted with the key is an JSON object.
allKeysWithValueTypes(struct).map {
case (k, dataType) => (s"$key.$k", dataType)
} ++ Set((key, StructType(Nil)))
}
case (key: String, array: List[Any]) => {
// The value associted with the key is an array.
typeOfArray(array) match {
case ArrayType(StructType(Nil)) => {
// The elements of this arrays are structs.
array.asInstanceOf[List[Map[String, Any]]].flatMap {
element => allKeysWithValueTypes(element)
}.map {
case (k, dataType) => (s"$key.$k", dataType)
} :+ (key, ArrayType(StructType(Nil)))
}
case ArrayType(elementType) => (key, ArrayType(elementType)) :: Nil
}
}
case (key: String, value) => (key, typeOfPrimitiveValue(value)) :: Nil
}.toSet
}
/**
* Converts a Java Map/List to a Scala Map/List.
* We do not use Jackson's scala module at here because
* DefaultScalaModule in jackson-module-scala will make
* the parsing very slow.
*/
private def scalafy(obj: Any): Any = obj match {
case map: java.util.Map[String, Object] =>
// .map(identity) is used as a workaround of non-serializable Map
// generated by .mapValues.
// This issue is documented at https://issues.scala-lang.org/browse/SI-7005
map.toMap.mapValues(scalafy).map(identity)
case list: java.util.List[Object] =>
list.toList.map(scalafy)
case atom => atom
}
private def parseJson(json: RDD[String]): RDD[Map[String, Any]] = {
// According to [Jackson-72: https://jira.codehaus.org/browse/JACKSON-72],
// ObjectMapper will not return BigDecimal when
// "DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS" is disabled
// (see NumberDeserializer.deserialize for the logic).
// But, we do not want to enable this feature because it will use BigDecimal
// for every float number, which will be slow.
// So, right now, we will have Infinity for those BigDecimal number.
// TODO: Support BigDecimal.
json.mapPartitions(iter => {
// When there is a key appearing multiple times (a duplicate key),
// the ObjectMapper will take the last value associated with this duplicate key.
// For example: for {"key": 1, "key":2}, we will get "key"->2.
val mapper = new ObjectMapper()
iter.map(record => mapper.readValue(record, classOf[java.util.Map[String, Any]]))
}).map(scalafy).map(_.asInstanceOf[Map[String, Any]])
}
private def toLong(value: Any): Long = {
value match {
case value: java.lang.Integer => value.asInstanceOf[Int].toLong
case value: java.lang.Long => value.asInstanceOf[Long]
}
}
private def toDouble(value: Any): Double = {
value match {
case value: java.lang.Integer => value.asInstanceOf[Int].toDouble
case value: java.lang.Long => value.asInstanceOf[Long].toDouble
case value: java.lang.Double => value.asInstanceOf[Double]
}
}
private def toDecimal(value: Any): BigDecimal = {
value match {
case value: java.lang.Integer => BigDecimal(value)
case value: java.lang.Long => BigDecimal(value)
case value: java.math.BigInteger => BigDecimal(value)
case value: java.lang.Double => BigDecimal(value)
case value: java.math.BigDecimal => BigDecimal(value)
}
}
private def toJsonArrayString(seq: Seq[Any]): String = {
val builder = new StringBuilder
builder.append("[")
var count = 0
seq.foreach {
element =>
if (count > 0) builder.append(",")
count += 1
builder.append(toString(element))
}
builder.append("]")
builder.toString()
}
private def toJsonObjectString(map: Map[String, Any]): String = {
val builder = new StringBuilder
builder.append("{")
var count = 0
map.foreach {
case (key, value) =>
if (count > 0) builder.append(",")
count += 1
builder.append(s"""\"${key}\":${toString(value)}""")
}
builder.append("}")
builder.toString()
}
private def toString(value: Any): String = {
value match {
case value: Map[String, Any] => toJsonObjectString(value)
case value: Seq[Any] => toJsonArrayString(value)
case value => Option(value).map(_.toString).orNull
}
}
private[json] def enforceCorrectType(value: Any, desiredType: DataType): Any ={
if (value == null) {
null
} else {
desiredType match {
case ArrayType(elementType) =>
value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
case StringType => toString(value)
case IntegerType => value.asInstanceOf[IntegerType.JvmType]
case LongType => toLong(value)
case DoubleType => toDouble(value)
case DecimalType => toDecimal(value)
case BooleanType => value.asInstanceOf[BooleanType.JvmType]
case NullType => null
}
}
}
private def asRow(json: Map[String,Any], schema: StructType): Row = {
val row = new GenericMutableRow(schema.fields.length)
schema.fields.zipWithIndex.foreach {
// StructType
case (StructField(name, fields: StructType, _), i) =>
row.update(i, json.get(name).flatMap(v => Option(v)).map(
v => asRow(v.asInstanceOf[Map[String, Any]], fields)).orNull)
// ArrayType(StructType)
case (StructField(name, ArrayType(structType: StructType), _), i) =>
row.update(i,
json.get(name).flatMap(v => Option(v)).map(
v => v.asInstanceOf[Seq[Any]].map(
e => asRow(e.asInstanceOf[Map[String, Any]], structType))).orNull)
// Other cases
case (StructField(name, dataType, _), i) =>
row.update(i, json.get(name).flatMap(v => Option(v)).map(
enforceCorrectType(_, dataType)).getOrElse(null))
}
row
}
private def nullTypeToStringType(struct: StructType): StructType = {
val fields = struct.fields.map {
case StructField(fieldName, dataType, nullable) => {
val newType = dataType match {
case NullType => StringType
case ArrayType(NullType) => ArrayType(StringType)
case struct: StructType => nullTypeToStringType(struct)
case other: DataType => other
}
StructField(fieldName, newType, nullable)
}
}
StructType(fields)
}
private def asAttributes(struct: StructType): Seq[AttributeReference] = {
struct.fields.map(f => AttributeReference(f.name, f.dataType, nullable = true)())
}
private def asStruct(attributes: Seq[AttributeReference]): StructType = {
val fields = attributes.map {
case AttributeReference(name, dataType, nullable) => StructField(name, dataType, nullable)
}
StructType(fields)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment