From b7f54119f86f916481aeccc67f07e77dc2a924c7 Mon Sep 17 00:00:00 2001 From: Reynold Xin <rxin@databricks.com> Date: Tue, 28 Jul 2015 17:03:59 -0700 Subject: [PATCH] [SPARK-9420][SQL] Move expressions in sql/core package to catalyst. Since catalyst package already depends on Spark core, we can move those expressions into catalyst, and simplify function registry. This is a followup of #7478. Author: Reynold Xin <rxin@databricks.com> Closes #7735 from rxin/SPARK-8003 and squashes the following commits: 2ffbdc3 [Reynold Xin] [SPARK-8003][SQL] Move expressions in sql/core package to catalyst. --- .../sql/catalyst/analysis/Analyzer.scala | 3 ++- .../catalyst/analysis/FunctionRegistry.scala | 17 +++++++------- .../MonotonicallyIncreasingID.scala | 3 +-- .../expressions/SparkPartitionID.scala | 3 +-- .../expressions}/NondeterministicSuite.scala | 4 +--- .../org/apache/spark/sql/SQLContext.scala | 11 +-------- .../sql/execution/expressions/package.scala | 23 ------------------- .../org/apache/spark/sql/functions.scala | 4 ++-- .../scala/org/apache/spark/sql/UDFSuite.scala | 4 ++-- .../apache/spark/sql/hive/HiveContext.scala | 13 ++--------- .../org/apache/spark/sql/hive/UDFSuite.scala | 4 ++-- 11 files changed, 23 insertions(+), 66 deletions(-) rename sql/{core/src/main/scala/org/apache/spark/sql/execution => catalyst/src/main/scala/org/apache/spark/sql/catalyst}/expressions/MonotonicallyIncreasingID.scala (95%) rename sql/{core/src/main/scala/org/apache/spark/sql/execution => catalyst/src/main/scala/org/apache/spark/sql/catalyst}/expressions/SparkPartitionID.scala (93%) rename sql/{core/src/test/scala/org/apache/spark/sql/execution/expression => catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions}/NondeterministicSuite.scala (83%) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a723e92114..a309ee35ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, AggregateExpression2, AggregateFunction2} import org.apache.spark.sql.catalyst.expressions._ @@ -25,7 +27,6 @@ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.catalyst.{SimpleCatalystConf, CatalystConf} import org.apache.spark.sql.types._ -import scala.collection.mutable.ArrayBuffer /** * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 9b60943a1e..372f80d4a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -161,13 +161,6 @@ object FunctionRegistry { expression[ToDegrees]("degrees"), expression[ToRadians]("radians"), - // misc functions - expression[Md5]("md5"), - expression[Sha2]("sha2"), - expression[Sha1]("sha1"), - expression[Sha1]("sha"), - expression[Crc32]("crc32"), - // aggregate functions expression[Average]("avg"), expression[Count]("count"), @@ -229,7 +222,15 @@ object FunctionRegistry { expression[Year]("year"), // collection functions - expression[Size]("size") + expression[Size]("size"), + + // misc functions + expression[Crc32]("crc32"), + expression[Md5]("md5"), + expression[Sha1]("sha"), + expression[Sha1]("sha1"), + expression[Sha2]("sha2"), + expression[SparkPartitionID]("spark_partition_id") ) val builtin: FunctionRegistry = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala similarity index 95% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala index eca36b3274..291b7a5bc3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala @@ -15,11 +15,10 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.expressions +package org.apache.spark.sql.catalyst.expressions import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Nondeterministic, LeafExpression} import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext} import org.apache.spark.sql.types.{LongType, DataType} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala similarity index 93% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala index 98c8eab837..3f6480bbf0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala @@ -15,11 +15,10 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.expressions +package org.apache.spark.sql.catalyst.expressions import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Nondeterministic, LeafExpression} import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext} import org.apache.spark.sql.types.{IntegerType, DataType} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala similarity index 83% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala index b6e79ff9cc..82894822ab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala @@ -15,11 +15,9 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.expression +package org.apache.spark.sql.catalyst.expressions import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions. ExpressionEvalHelper -import org.apache.spark.sql.execution.expressions.{SparkPartitionID, MonotonicallyIncreasingID} class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper { test("MonotonicallyIncreasingID") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 56cd8f22e7..dbb2a09846 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -31,8 +31,6 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{expression => FunctionExpression, FunctionBuilder} -import org.apache.spark.sql.execution.expressions.SparkPartitionID import org.apache.spark.sql.SQLConf.SQLConfEntry import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.errors.DialectException @@ -142,14 +140,7 @@ class SQLContext(@transient val sparkContext: SparkContext) // TODO how to handle the temp function per user session? @transient - protected[sql] lazy val functionRegistry: FunctionRegistry = { - val reg = FunctionRegistry.builtin - val extendedFunctions = List[(String, (ExpressionInfo, FunctionBuilder))]( - FunctionExpression[SparkPartitionID]("spark__partition__id") - ) - extendedFunctions.foreach { case(name, (info, fun)) => reg.registerFunction(name, info, fun) } - reg - } + protected[sql] lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin @transient protected[sql] lazy val analyzer: Analyzer = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala deleted file mode 100644 index 568b7ac2c5..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -/** - * Package containing expressions that are specific to Spark runtime. - */ -package object expressions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 0148991512..4261a5e7cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -634,7 +634,7 @@ object functions { * @group normal_funcs * @since 1.4.0 */ - def monotonicallyIncreasingId(): Column = execution.expressions.MonotonicallyIncreasingID() + def monotonicallyIncreasingId(): Column = MonotonicallyIncreasingID() /** * Return an alternative value `r` if `l` is NaN. @@ -741,7 +741,7 @@ object functions { * @group normal_funcs * @since 1.4.0 */ - def sparkPartitionId(): Column = execution.expressions.SparkPartitionID() + def sparkPartitionId(): Column = SparkPartitionID() /** * Computes the square root of the specified float value. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 9b326c1635..d9c8b380ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -51,10 +51,10 @@ class UDFSuite extends QueryTest { df.selectExpr("count(distinct a)") } - test("SPARK-8003 spark__partition__id") { + test("SPARK-8003 spark_partition_id") { val df = Seq((1, "Tearing down the walls that divide us")).toDF("id", "saying") df.registerTempTable("tmp_table") - checkAnswer(ctx.sql("select spark__partition__id() from tmp_table").toDF(), Row(0)) + checkAnswer(ctx.sql("select spark_partition_id() from tmp_table").toDF(), Row(0)) ctx.dropTempTable("tmp_table") } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 8b35c1275f..110f51a305 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -38,9 +38,6 @@ import org.apache.spark.Logging import org.apache.spark.SparkContext import org.apache.spark.annotation.Experimental import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{expression => FunctionExpression, FunctionBuilder} -import org.apache.spark.sql.catalyst.expressions.ExpressionInfo -import org.apache.spark.sql.execution.expressions.SparkPartitionID import org.apache.spark.sql.SQLConf.SQLConfEntry import org.apache.spark.sql.SQLConf.SQLConfEntry._ import org.apache.spark.sql.catalyst.{TableIdentifier, ParserDialect} @@ -375,14 +372,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging { // Note that HiveUDFs will be overridden by functions registered in this context. @transient - override protected[sql] lazy val functionRegistry: FunctionRegistry = { - val reg = new HiveFunctionRegistry(FunctionRegistry.builtin) - val extendedFunctions = List[(String, (ExpressionInfo, FunctionBuilder))]( - FunctionExpression[SparkPartitionID]("spark__partition__id") - ) - extendedFunctions.foreach { case(name, (info, fun)) => reg.registerFunction(name, info, fun) } - reg - } + override protected[sql] lazy val functionRegistry: FunctionRegistry = + new HiveFunctionRegistry(FunctionRegistry.builtin) /* An analyzer that uses the Hive metastore. */ @transient diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala index 9cea5d413c..37afc2142a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala @@ -35,9 +35,9 @@ class UDFSuite extends QueryTest { assert(ctx.sql("SELECT strlenscala('test', 1) FROM src LIMIT 1").head().getInt(0) === 5) } - test("SPARK-8003 spark__partition__id") { + test("SPARK-8003 spark_partition_id") { val df = Seq((1, "Two Fiiiiive")).toDF("id", "saying") ctx.registerDataFrameAsTable(df, "test_table") - checkAnswer(ctx.sql("select spark__partition__id() from test_table LIMIT 1").toDF(), Row(0)) + checkAnswer(ctx.sql("select spark_partition_id() from test_table LIMIT 1").toDF(), Row(0)) } } -- GitLab