Skip to content
Snippets Groups Projects
Commit 23448a9e authored by Davies Liu's avatar Davies Liu Committed by Davies Liu
Browse files

[SPARK-8931] [SQL] Fallback to interpreted evaluation if failed to compile in codegen

Exception will not be catched during tests.

cc marmbrus rxin

Author: Davies Liu <davies@databricks.com>

Closes #7309 from davies/fallback and squashes the following commits:

969a612 [Davies Liu] throw exception during tests
f844f77 [Davies Liu] fallback
a3091bc [Davies Liu] Merge branch 'master' of github.com:apache/spark into fallback
364a0d6 [Davies Liu] fallback to interpret mode if failed to compile
parent f88b1253
No related branches found
No related tags found
No related merge requests found
...@@ -153,12 +153,24 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ ...@@ -153,12 +153,24 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
buf.toArray.map(converter(_).asInstanceOf[Row]) buf.toArray.map(converter(_).asInstanceOf[Row])
} }
private[this] def isTesting: Boolean = sys.props.contains("spark.testing")
protected def newProjection( protected def newProjection(
expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = { expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = {
log.debug( log.debug(
s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled") s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
if (codegenEnabled) { if (codegenEnabled) {
GenerateProjection.generate(expressions, inputSchema) try {
GenerateProjection.generate(expressions, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate projection, fallback to interpret", e)
new InterpretedProjection(expressions, inputSchema)
}
}
} else { } else {
new InterpretedProjection(expressions, inputSchema) new InterpretedProjection(expressions, inputSchema)
} }
...@@ -170,17 +182,36 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ ...@@ -170,17 +182,36 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
log.debug( log.debug(
s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled") s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
if(codegenEnabled) { if(codegenEnabled) {
GenerateMutableProjection.generate(expressions, inputSchema) try {
GenerateMutableProjection.generate(expressions, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate mutable projection, fallback to interpreted", e)
() => new InterpretedMutableProjection(expressions, inputSchema)
}
}
} else { } else {
() => new InterpretedMutableProjection(expressions, inputSchema) () => new InterpretedMutableProjection(expressions, inputSchema)
} }
} }
protected def newPredicate( protected def newPredicate(
expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = { expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
if (codegenEnabled) { if (codegenEnabled) {
GeneratePredicate.generate(expression, inputSchema) try {
GeneratePredicate.generate(expression, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate predicate, fallback to interpreted", e)
InterpretedPredicate.create(expression, inputSchema)
}
}
} else { } else {
InterpretedPredicate.create(expression, inputSchema) InterpretedPredicate.create(expression, inputSchema)
} }
...@@ -190,7 +221,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ ...@@ -190,7 +221,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
order: Seq[SortOrder], order: Seq[SortOrder],
inputSchema: Seq[Attribute]): Ordering[InternalRow] = { inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
if (codegenEnabled) { if (codegenEnabled) {
GenerateOrdering.generate(order, inputSchema) try {
GenerateOrdering.generate(order, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate ordering, fallback to interpreted", e)
new RowOrdering(order, inputSchema)
}
}
} else { } else {
new RowOrdering(order, inputSchema) new RowOrdering(order, inputSchema)
} }
......
...@@ -276,7 +276,18 @@ private[sql] case class InsertIntoHadoopFsRelation( ...@@ -276,7 +276,18 @@ private[sql] case class InsertIntoHadoopFsRelation(
log.debug( log.debug(
s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled") s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
if (codegenEnabled) { if (codegenEnabled) {
GenerateProjection.generate(expressions, inputSchema)
try {
GenerateProjection.generate(expressions, inputSchema)
} catch {
case e: Exception =>
if (sys.props.contains("spark.testing")) {
throw e
} else {
log.error("failed to generate projection, fallback to interpreted", e)
new InterpretedProjection(expressions, inputSchema)
}
}
} else { } else {
new InterpretedProjection(expressions, inputSchema) new InterpretedProjection(expressions, inputSchema)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment