Skip to content
Snippets Groups Projects
Commit dd4f088c authored by Dilip Biswal's avatar Dilip Biswal Committed by Wenchen Fan
Browse files

[SPARK-18009][SQL] Fix ClassCastException while calling toLocalIterator() on...

[SPARK-18009][SQL] Fix ClassCastException while calling toLocalIterator() on dataframe produced by RunnableCommand

## What changes were proposed in this pull request?
A short code snippet that uses toLocalIterator() on a dataframe produced by a RunnableCommand
reproduces the problem. toLocalIterator() is called by thriftserver when
`spark.sql.thriftServer.incrementalCollect`is set to handle queries producing large result
set.

**Before**
```SQL
scala> spark.sql("show databases")
res0: org.apache.spark.sql.DataFrame = [databaseName: string]

scala> res0.toLocalIterator()
16/10/26 03:00:24 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow
```

**After**
```SQL
scala> spark.sql("drop database databases")
res30: org.apache.spark.sql.DataFrame = []

scala> spark.sql("show databases")
res31: org.apache.spark.sql.DataFrame = [databaseName: string]

scala> res31.toLocalIterator().asScala foreach println
[default]
[parquet]
```
## How was this patch tested?
Added a test in DDLSuite

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #15642 from dilipbiswal/SPARK-18009.
parent f1aeed8b
No related branches found
No related tags found
No related merge requests found
......@@ -66,6 +66,8 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan {
override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
override def executeToIterator: Iterator[InternalRow] = sideEffectResult.toIterator
override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray
protected override def doExecute(): RDD[InternalRow] = {
......
......@@ -1805,4 +1805,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
}
test("SPARK-18009 calling toLocalIterator on commands") {
import scala.collection.JavaConverters._
val df = sql("show databases")
val rows: Seq[Row] = df.toLocalIterator().asScala.toSeq
assert(rows.length > 0)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment