Skip to content
Snippets Groups Projects
Commit 9696580c authored by Jacek Laskowski's avatar Jacek Laskowski Committed by gatorsmile
Browse files

[SPARK-21886][SQL] Use SparkSession.internalCreateDataFrame to create…

… Dataset with LogicalRDD logical operator

## What changes were proposed in this pull request?

Reusing `SparkSession.internalCreateDataFrame` wherever possible (to cut dups)

## How was this patch tested?

Local build and waiting for Jenkins

Author: Jacek Laskowski <jacek@japila.pl>

Closes #19095 from jaceklaskowski/SPARK-21886-internalCreateDataFrame.
parent 19b0240d
No related branches found
No related tags found
No related merge requests found
......@@ -410,10 +410,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
parsedOptions.columnNameOfCorruptRecord)
iter.flatMap(parser.parse)
}
Dataset.ofRows(
sparkSession,
LogicalRDD(schema.toAttributes, parsed, isStreaming = jsonDataset.isStreaming)(sparkSession))
sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming)
}
/**
......@@ -473,10 +470,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
parsedOptions.columnNameOfCorruptRecord)
iter.flatMap(parser.parse)
}
Dataset.ofRows(
sparkSession,
LogicalRDD(schema.toAttributes, parsed, isStreaming = csvDataset.isStreaming)(sparkSession))
sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = csvDataset.isStreaming)
}
/**
......
......@@ -559,8 +559,7 @@ class SparkSession private(
}
/**
* Creates a `DataFrame` from an RDD[Row].
* User can specify whether the input rows should be converted to Catalyst rows.
* Creates a `DataFrame` from an `RDD[InternalRow]`.
*/
private[sql] def internalCreateDataFrame(
catalystRows: RDD[InternalRow],
......@@ -576,7 +575,7 @@ class SparkSession private(
}
/**
* Creates a `DataFrame` from an RDD[Row].
* Creates a `DataFrame` from an `RDD[Row]`.
* User can specify whether the input rows should be converted to Catalyst rows.
*/
private[sql] def createDataFrame(
......@@ -589,10 +588,9 @@ class SparkSession private(
val encoder = RowEncoder(schema)
rowRDD.map(encoder.toRow)
} else {
rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)}
rowRDD.map { r: Row => InternalRow.fromSeq(r.toSeq) }
}
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
Dataset.ofRows(self, logicalPlan)
internalCreateDataFrame(catalystRows, schema)
}
......@@ -737,13 +735,15 @@ class SparkSession private(
}
/**
* Apply a schema defined by the schema to an RDD. It is only used by PySpark.
* Apply `schema` to an RDD.
*
* @note Used by PySpark only
*/
private[sql] def applySchemaToPythonRDD(
rdd: RDD[Array[Any]],
schema: StructType): DataFrame = {
val rowRdd = rdd.map(r => python.EvaluatePython.fromJava(r, schema).asInstanceOf[InternalRow])
Dataset.ofRows(self, LogicalRDD(schema.toAttributes, rowRdd)(self))
internalCreateDataFrame(rowRdd, schema)
}
/**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment