Skip to content
Snippets Groups Projects
Commit 21c7539a authored by Tejas Patil's avatar Tejas Patil Committed by Herman van Hovell
Browse files

[SPARK-18038][SQL] Move output partitioning definition from UnaryNodeExec to its children

## What changes were proposed in this pull request?

Jira : https://issues.apache.org/jira/browse/SPARK-18038

This was a suggestion by rxin over one of the dev list discussion : http://apache-spark-developers-list.1001551.n3.nabble.com/Project-not-preserving-child-partitioning-td19417.html

His words:

>> It would be better (safer) to move the output partitioning definition into each of the operator and remove it from UnaryExecNode.

With this PR, following is the output partitioning and ordering for all the impls of `UnaryExecNode`.

UnaryExecNode's impl | outputPartitioning | outputOrdering | comment
------------ | ------------- | ------------ | ------------
AppendColumnsExec | child's | Nil | child's ordering can be used
AppendColumnsWithObjectExec | child's | Nil | child's ordering can be used
BroadcastExchangeExec | BroadcastPartitioning | Nil | -
CoalesceExec | UnknownPartitioning | Nil | -
CollectLimitExec | SinglePartition | Nil | -
DebugExec | child's | Nil | child's ordering can be used
DeserializeToObjectExec | child's | Nil | child's ordering can be used
ExpandExec | UnknownPartitioning | Nil | -
FilterExec | child's | child's | -
FlatMapGroupsInRExec | child's | Nil | child's ordering can be used
GenerateExec | child's | Nil | need to dig more
GlobalLimitExec | child's | child's | -
HashAggregateExec | child's | Nil | -
InputAdapter | child's | child's | -
InsertIntoHiveTable | child's | Nil | terminal node, doesn't need partitioning
LocalLimitExec | child's | child's | -
MapElementsExec | child's | child's | -
MapGroupsExec | child's | Nil | child's ordering can be used
MapPartitionsExec | child's | Nil | child's ordering can be used
ProjectExec | child's | child's | -
SampleExec | child's | Nil | child's ordering can be used
ScriptTransformation | child's | Nil | child's ordering can be used
SerializeFromObjectExec | child's | Nil | child's ordering can be used
ShuffleExchange | custom | Nil | -
SortAggregateExec | child's | sort over grouped exprs | -
SortExec | child's | custom | -
StateStoreRestoreExec  | child's | Nil | child's ordering can be used
StateStoreSaveExec | child's | Nil | child's ordering can be used
SubqueryExec | child's | child's | -
TakeOrderedAndProjectExec | SinglePartition | custom | -
WholeStageCodegenExec | child's | child's | -
WindowExec | child's | child's | -

## How was this patch tested?

This does NOT change any existing functionality so relying on existing tests

Author: Tejas Patil <tejasp@fb.com>

Closes #15575 from tejasapatil/SPARK-18038_UnaryNodeExec_output_partitioning.
parent eff4aed1
No related branches found
No related tags found
No related merge requests found
Showing
with 76 additions and 11 deletions
...@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution ...@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.metric.SQLMetrics
/** /**
...@@ -60,6 +61,8 @@ case class GenerateExec( ...@@ -60,6 +61,8 @@ case class GenerateExec(
override def producedAttributes: AttributeSet = AttributeSet(output) override def producedAttributes: AttributeSet = AttributeSet(output)
override def outputPartitioning: Partitioning = child.outputPartitioning
val boundGenerator = BindReferences.bindReference(generator, child.output) val boundGenerator = BindReferences.bindReference(generator, child.output)
protected override def doExecute(): RDD[InternalRow] = { protected override def doExecute(): RDD[InternalRow] = {
......
...@@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD ...@@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution} import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.metric.SQLMetrics
/** /**
...@@ -45,6 +45,10 @@ case class SortExec( ...@@ -45,6 +45,10 @@ case class SortExec(
override def outputOrdering: Seq[SortOrder] = sortOrder override def outputOrdering: Seq[SortOrder] = sortOrder
// sort performed is local within a given partition so will retain
// child operator's partitioning
override def outputPartitioning: Partitioning = child.outputPartitioning
override def requiredChildDistribution: Seq[Distribution] = override def requiredChildDistribution: Seq[Distribution] =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
......
...@@ -395,8 +395,6 @@ trait UnaryExecNode extends SparkPlan { ...@@ -395,8 +395,6 @@ trait UnaryExecNode extends SparkPlan {
def child: SparkPlan def child: SparkPlan
override final def children: Seq[SparkPlan] = child :: Nil override final def children: Seq[SparkPlan] = child :: Nil
override def outputPartitioning: Partitioning = child.outputPartitioning
} }
trait BinaryExecNode extends SparkPlan { trait BinaryExecNode extends SparkPlan {
......
...@@ -218,7 +218,9 @@ trait CodegenSupport extends SparkPlan { ...@@ -218,7 +218,9 @@ trait CodegenSupport extends SparkPlan {
case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupport { case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
override def output: Seq[Attribute] = child.output override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override def doExecute(): RDD[InternalRow] = { override def doExecute(): RDD[InternalRow] = {
...@@ -292,7 +294,9 @@ object WholeStageCodegenExec { ...@@ -292,7 +294,9 @@ object WholeStageCodegenExec {
case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport { case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
override def output: Seq[Attribute] = child.output override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override lazy val metrics = Map( override lazy val metrics = Map(
......
...@@ -63,6 +63,8 @@ case class HashAggregateExec( ...@@ -63,6 +63,8 @@ case class HashAggregateExec(
override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
override def outputPartitioning: Partitioning = child.outputPartitioning
override def producedAttributes: AttributeSet = override def producedAttributes: AttributeSet =
AttributeSet(aggregateAttributes) ++ AttributeSet(aggregateAttributes) ++
AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++ AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++
......
...@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.errors._ ...@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, UnspecifiedDistribution} import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
...@@ -66,6 +66,8 @@ case class SortAggregateExec( ...@@ -66,6 +66,8 @@ case class SortAggregateExec(
groupingExpressions.map(SortOrder(_, Ascending)) :: Nil groupingExpressions.map(SortOrder(_, Ascending)) :: Nil
} }
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = { override def outputOrdering: Seq[SortOrder] = {
groupingExpressions.map(SortOrder(_, Ascending)) groupingExpressions.map(SortOrder(_, Ascending))
} }
......
...@@ -78,6 +78,8 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) ...@@ -78,6 +78,8 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
} }
override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override def outputPartitioning: Partitioning = child.outputPartitioning
} }
...@@ -214,6 +216,8 @@ case class FilterExec(condition: Expression, child: SparkPlan) ...@@ -214,6 +216,8 @@ case class FilterExec(condition: Expression, child: SparkPlan)
} }
override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override def outputPartitioning: Partitioning = child.outputPartitioning
} }
/** /**
...@@ -234,6 +238,8 @@ case class SampleExec( ...@@ -234,6 +238,8 @@ case class SampleExec(
child: SparkPlan) extends UnaryExecNode with CodegenSupport { child: SparkPlan) extends UnaryExecNode with CodegenSupport {
override def output: Seq[Attribute] = child.output override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
override lazy val metrics = Map( override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
...@@ -517,7 +523,9 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { ...@@ -517,7 +523,9 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
"collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)")) "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)"))
override def output: Seq[Attribute] = child.output override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override def sameResult(o: SparkPlan): Boolean = o match { override def sameResult(o: SparkPlan): Boolean = o match {
......
...@@ -27,8 +27,8 @@ import org.apache.spark.sql._ ...@@ -27,8 +27,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{AccumulatorV2, LongAccumulator} import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
/** /**
...@@ -162,6 +162,8 @@ package object debug { ...@@ -162,6 +162,8 @@ package object debug {
} }
} }
override def outputPartitioning: Partitioning = child.outputPartitioning
override def inputRDDs(): Seq[RDD[InternalRow]] = { override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.asInstanceOf[CodegenSupport].inputRDDs() child.asInstanceOf[CodegenSupport].inputRDDs()
} }
......
...@@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.plans.physical._ ...@@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
/** /**
* Take the first `limit` elements and collect them to a single partition. * Take the first `limit` elements and collect them to a single partition.
* *
...@@ -54,8 +53,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode ...@@ -54,8 +53,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
trait BaseLimitExec extends UnaryExecNode with CodegenSupport { trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
val limit: Int val limit: Int
override def output: Seq[Attribute] = child.output override def output: Seq[Attribute] = child.output
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override def outputPartitioning: Partitioning = child.outputPartitioning
protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
iter.take(limit) iter.take(limit)
} }
...@@ -95,14 +93,22 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { ...@@ -95,14 +93,22 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
* Take the first `limit` elements of each child partition, but do not collect or shuffle them. * Take the first `limit` elements of each child partition, but do not collect or shuffle them.
*/ */
case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override def outputPartitioning: Partitioning = child.outputPartitioning
} }
/** /**
* Take the first `limit` elements of the child's single output partition. * Take the first `limit` elements of the child's single output partition.
*/ */
case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
} }
/** /**
...@@ -122,8 +128,6 @@ case class TakeOrderedAndProjectExec( ...@@ -122,8 +128,6 @@ case class TakeOrderedAndProjectExec(
projectList.map(_.toAttribute) projectList.map(_.toAttribute)
} }
override def outputPartitioning: Partitioning = SinglePartition
override def executeCollect(): Array[InternalRow] = { override def executeCollect(): Array[InternalRow] = {
val ord = new LazilyGeneratedOrdering(sortOrder, child.output) val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
...@@ -160,6 +164,8 @@ case class TakeOrderedAndProjectExec( ...@@ -160,6 +164,8 @@ case class TakeOrderedAndProjectExec(
override def outputOrdering: Seq[SortOrder] = sortOrder override def outputOrdering: Seq[SortOrder] = sortOrder
override def outputPartitioning: Partitioning = SinglePartition
override def simpleString: String = { override def simpleString: String = {
val orderByString = Utils.truncatedString(sortOrder, "[", ",", "]") val orderByString = Utils.truncatedString(sortOrder, "[", ",", "]")
val outputString = Utils.truncatedString(output, "[", ",", "]") val outputString = Utils.truncatedString(output, "[", ",", "]")
......
...@@ -68,6 +68,8 @@ case class DeserializeToObjectExec( ...@@ -68,6 +68,8 @@ case class DeserializeToObjectExec(
outputObjAttr: Attribute, outputObjAttr: Attribute,
child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with CodegenSupport { child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with CodegenSupport {
override def outputPartitioning: Partitioning = child.outputPartitioning
override def inputRDDs(): Seq[RDD[InternalRow]] = { override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.asInstanceOf[CodegenSupport].inputRDDs() child.asInstanceOf[CodegenSupport].inputRDDs()
} }
...@@ -102,6 +104,8 @@ case class SerializeFromObjectExec( ...@@ -102,6 +104,8 @@ case class SerializeFromObjectExec(
override def output: Seq[Attribute] = serializer.map(_.toAttribute) override def output: Seq[Attribute] = serializer.map(_.toAttribute)
override def outputPartitioning: Partitioning = child.outputPartitioning
override def inputRDDs(): Seq[RDD[InternalRow]] = { override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.asInstanceOf[CodegenSupport].inputRDDs() child.asInstanceOf[CodegenSupport].inputRDDs()
} }
...@@ -171,6 +175,8 @@ case class MapPartitionsExec( ...@@ -171,6 +175,8 @@ case class MapPartitionsExec(
child: SparkPlan) child: SparkPlan)
extends ObjectConsumerExec with ObjectProducerExec { extends ObjectConsumerExec with ObjectProducerExec {
override def outputPartitioning: Partitioning = child.outputPartitioning
override protected def doExecute(): RDD[InternalRow] = { override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsInternal { iter => child.execute().mapPartitionsInternal { iter =>
val getObject = ObjectOperator.unwrapObjectFromRow(child.output.head.dataType) val getObject = ObjectOperator.unwrapObjectFromRow(child.output.head.dataType)
...@@ -231,6 +237,8 @@ case class MapElementsExec( ...@@ -231,6 +237,8 @@ case class MapElementsExec(
} }
override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override def outputPartitioning: Partitioning = child.outputPartitioning
} }
/** /**
...@@ -244,6 +252,8 @@ case class AppendColumnsExec( ...@@ -244,6 +252,8 @@ case class AppendColumnsExec(
override def output: Seq[Attribute] = child.output ++ serializer.map(_.toAttribute) override def output: Seq[Attribute] = child.output ++ serializer.map(_.toAttribute)
override def outputPartitioning: Partitioning = child.outputPartitioning
private def newColumnSchema = serializer.map(_.toAttribute).toStructType private def newColumnSchema = serializer.map(_.toAttribute).toStructType
override protected def doExecute(): RDD[InternalRow] = { override protected def doExecute(): RDD[InternalRow] = {
...@@ -272,6 +282,8 @@ case class AppendColumnsWithObjectExec( ...@@ -272,6 +282,8 @@ case class AppendColumnsWithObjectExec(
override def output: Seq[Attribute] = (inputSerializer ++ newColumnsSerializer).map(_.toAttribute) override def output: Seq[Attribute] = (inputSerializer ++ newColumnsSerializer).map(_.toAttribute)
override def outputPartitioning: Partitioning = child.outputPartitioning
private def inputSchema = inputSerializer.map(_.toAttribute).toStructType private def inputSchema = inputSerializer.map(_.toAttribute).toStructType
private def newColumnSchema = newColumnsSerializer.map(_.toAttribute).toStructType private def newColumnSchema = newColumnsSerializer.map(_.toAttribute).toStructType
...@@ -304,6 +316,8 @@ case class MapGroupsExec( ...@@ -304,6 +316,8 @@ case class MapGroupsExec(
outputObjAttr: Attribute, outputObjAttr: Attribute,
child: SparkPlan) extends UnaryExecNode with ObjectProducerExec { child: SparkPlan) extends UnaryExecNode with ObjectProducerExec {
override def outputPartitioning: Partitioning = child.outputPartitioning
override def requiredChildDistribution: Seq[Distribution] = override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(groupingAttributes) :: Nil ClusteredDistribution(groupingAttributes) :: Nil
...@@ -347,6 +361,9 @@ case class FlatMapGroupsInRExec( ...@@ -347,6 +361,9 @@ case class FlatMapGroupsInRExec(
child: SparkPlan) extends UnaryExecNode with ObjectProducerExec { child: SparkPlan) extends UnaryExecNode with ObjectProducerExec {
override def output: Seq[Attribute] = outputObjAttr :: Nil override def output: Seq[Attribute] = outputObjAttr :: Nil
override def outputPartitioning: Partitioning = child.outputPartitioning
override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
override def requiredChildDistribution: Seq[Distribution] = override def requiredChildDistribution: Seq[Distribution] =
......
...@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow ...@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.execution.streaming.state._
...@@ -80,7 +81,10 @@ case class StateStoreRestoreExec( ...@@ -80,7 +81,10 @@ case class StateStoreRestoreExec(
} }
} }
} }
override def output: Seq[Attribute] = child.output override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
} }
/** /**
...@@ -116,6 +120,8 @@ case class StateStoreSaveExec( ...@@ -116,6 +120,8 @@ case class StateStoreSaveExec(
override def output: Seq[Attribute] = child.output override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
/** /**
* Save all the rows to the state store, and return all the rows in the state store. * Save all the rows to the state store, and return all the rows in the state store.
* Note that this returns an iterator that pipelines the saving to store with downstream * Note that this returns an iterator that pipelines the saving to store with downstream
......
...@@ -103,6 +103,8 @@ case class WindowExec( ...@@ -103,6 +103,8 @@ case class WindowExec(
override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override def outputPartitioning: Partitioning = child.outputPartitioning
/** /**
* Create a bound ordering object for a given frame type and offset. A bound ordering object is * Create a bound ordering object for a given frame type and offset. A bound ordering object is
* used to determine which input row lies within the frame boundaries of an output row. * used to determine which input row lies within the frame boundaries of an output row.
......
...@@ -57,4 +57,6 @@ case class ReferenceSort( ...@@ -57,4 +57,6 @@ case class ReferenceSort(
override def output: Seq[Attribute] = child.output override def output: Seq[Attribute] = child.output
override def outputOrdering: Seq[SortOrder] = sortOrder override def outputOrdering: Seq[SortOrder] = sortOrder
override def outputPartitioning: Partitioning = child.outputPartitioning
} }
...@@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.execution ...@@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.execution
import java.io.IOException import java.io.IOException
import java.net.URI import java.net.URI
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import java.util
import java.util.{Date, Random} import java.util.{Date, Random}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
...@@ -36,6 +35,7 @@ import org.apache.spark.rdd.RDD ...@@ -36,6 +35,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
...@@ -291,6 +291,8 @@ case class InsertIntoHiveTable( ...@@ -291,6 +291,8 @@ case class InsertIntoHiveTable(
Seq.empty[InternalRow] Seq.empty[InternalRow]
} }
override def outputPartitioning: Partitioning = child.outputPartitioning
override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
protected override def doExecute(): RDD[InternalRow] = { protected override def doExecute(): RDD[InternalRow] = {
......
...@@ -38,6 +38,7 @@ import org.apache.spark.rdd.RDD ...@@ -38,6 +38,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.HiveInspectors import org.apache.spark.sql.hive.HiveInspectors
import org.apache.spark.sql.hive.HiveShim._ import org.apache.spark.sql.hive.HiveShim._
...@@ -61,6 +62,8 @@ case class ScriptTransformation( ...@@ -61,6 +62,8 @@ case class ScriptTransformation(
override def producedAttributes: AttributeSet = outputSet -- inputSet override def producedAttributes: AttributeSet = outputSet -- inputSet
override def outputPartitioning: Partitioning = child.outputPartitioning
protected override def doExecute(): RDD[InternalRow] = { protected override def doExecute(): RDD[InternalRow] = {
def processIterator(inputIterator: Iterator[InternalRow], hadoopConf: Configuration) def processIterator(inputIterator: Iterator[InternalRow], hadoopConf: Configuration)
: Iterator[InternalRow] = { : Iterator[InternalRow] = {
......
...@@ -24,6 +24,7 @@ import org.apache.spark.{SparkException, TaskContext} ...@@ -24,6 +24,7 @@ import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryExecNode} import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryExecNode}
import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StringType
...@@ -135,5 +136,8 @@ private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExe ...@@ -135,5 +136,8 @@ private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExe
throw new IllegalArgumentException("intentional exception") throw new IllegalArgumentException("intentional exception")
} }
} }
override def output: Seq[Attribute] = child.output override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment