Skip to content
Snippets Groups Projects
Commit 424cb699 authored by kul's avatar kul Committed by Michael Armbrust
Browse files

[SPARK-5426][SQL] Add SparkSQL Java API helper methods.

Right now the PR adds few helper methods for java apis. But the issue was opened mainly to get rid of transformations in java api like `.rdd` and `.toJavaRDD` while working with `SQLContext` or `HiveContext`.

Author: kul <kuldeep.bora@gmail.com>

Closes #4243 from kul/master and squashes the following commits:

2390fba [kul] [SPARK-5426][SQL] Add SparkSQL Java API helper methods.
parent b90dd397
No related branches found
No related tags found
No related merge requests found
......@@ -221,6 +221,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
DataFrame(this, logicalPlan)
}
@DeveloperApi
def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
applySchema(rowRDD.rdd, schema);
}
/**
* Applies a schema to an RDD of Java Beans.
*
......@@ -305,6 +310,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
def jsonRDD(json: RDD[String]): DataFrame = jsonRDD(json, 1.0)
def jsonRDD(json: JavaRDD[String]): DataFrame = jsonRDD(json.rdd, 1.0)
/**
* :: Experimental ::
* Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
......@@ -323,6 +330,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
applySchema(rowRDD, appliedSchema)
}
@Experimental
def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
jsonRDD(json.rdd, schema)
}
/**
* :: Experimental ::
*/
......@@ -336,6 +348,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
applySchema(rowRDD, appliedSchema)
}
@Experimental
def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
jsonRDD(json.rdd, samplingRatio);
}
@Experimental
def load(path: String): DataFrame = {
val dataSourceName = conf.defaultDataSourceName
......
......@@ -98,7 +98,7 @@ public class JavaApplySchemaSuite implements Serializable {
fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = javaSqlCtx.applySchema(rowRDD.rdd(), schema);
DataFrame df = javaSqlCtx.applySchema(rowRDD, schema);
df.registerTempTable("people");
Row[] actual = javaSqlCtx.sql("SELECT * FROM people").collect();
......@@ -109,6 +109,48 @@ public class JavaApplySchemaSuite implements Serializable {
Assert.assertEquals(expected, Arrays.asList(actual));
}
@Test
public void dataFrameRDDOperations() {
List<Person> personList = new ArrayList<Person>(2);
Person person1 = new Person();
person1.setName("Michael");
person1.setAge(29);
personList.add(person1);
Person person2 = new Person();
person2.setName("Yin");
person2.setAge(28);
personList.add(person2);
JavaRDD<Row> rowRDD = javaCtx.parallelize(personList).map(
new Function<Person, Row>() {
public Row call(Person person) throws Exception {
return RowFactory.create(person.getName(), person.getAge());
}
});
List<StructField> fields = new ArrayList<StructField>(2);
fields.add(DataTypes.createStructField("name", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = javaSqlCtx.applySchema(rowRDD, schema);
df.registerTempTable("people");
List<String> actual = javaSqlCtx.sql("SELECT * FROM people").toJavaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return row.getString(0) + "_" + row.get(1).toString();
}
}).collect();
List<String> expected = new ArrayList<String>(2);
expected.add("Michael_29");
expected.add("Yin_28");
Assert.assertEquals(expected, actual);
}
@Test
public void applySchemaToJSON() {
JavaRDD<String> jsonRDD = javaCtx.parallelize(Arrays.asList(
......@@ -147,14 +189,14 @@ public class JavaApplySchemaSuite implements Serializable {
null,
"this is another simple string."));
DataFrame df1 = javaSqlCtx.jsonRDD(jsonRDD.rdd());
DataFrame df1 = javaSqlCtx.jsonRDD(jsonRDD);
StructType actualSchema1 = df1.schema();
Assert.assertEquals(expectedSchema, actualSchema1);
df1.registerTempTable("jsonTable1");
List<Row> actual1 = javaSqlCtx.sql("select * from jsonTable1").collectAsList();
Assert.assertEquals(expectedResult, actual1);
DataFrame df2 = javaSqlCtx.jsonRDD(jsonRDD.rdd(), expectedSchema);
DataFrame df2 = javaSqlCtx.jsonRDD(jsonRDD, expectedSchema);
StructType actualSchema2 = df2.schema();
Assert.assertEquals(expectedSchema, actualSchema2);
df2.registerTempTable("jsonTable2");
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment