diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index a68ef33d3999c2f94f1de27246eccd3d638ae39e..4fa799ac55bdf57245b8eaf26682925804a9f594 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -576,7 +576,7 @@ class DataFrame(object): >>> df_as2 = df.alias("df_as2") >>> joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner') >>> joined_df.select("df_as1.name", "df_as2.name", "df_as2.age").collect() - [Row(name=u'Alice', name=u'Alice', age=2), Row(name=u'Bob', name=u'Bob', age=5)] + [Row(name=u'Bob', name=u'Bob', age=5), Row(name=u'Alice', name=u'Alice', age=2)] """ assert isinstance(alias, basestring), "alias should be a string" return DataFrame(getattr(self._jdf, "as")(alias), self.sql_ctx) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 518430f16d712f3fd4052b5e03ce18032e8bdc94..5d1868980163db894db5fe085d00c2905bb68563 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -120,8 +120,8 @@ object SQLConf { "nodes when performing a join. By setting this value to -1 broadcasting can be disabled. " + "Note that currently statistics are only supported for Hive Metastore tables where the " + "command<code>ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan</code> has been run.") - .intConf - .createWithDefault(10 * 1024 * 1024) + .longConf + .createWithDefault(10L * 1024 * 1024) val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes") .internal() @@ -599,14 +599,13 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def subexpressionEliminationEnabled: Boolean = getConf(SUBEXPRESSION_ELIMINATION_ENABLED) - def autoBroadcastJoinThreshold: Int = getConf(AUTO_BROADCASTJOIN_THRESHOLD) + def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD) def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN) def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED) - def defaultSizeInBytes: Long = - getConf(DEFAULT_SIZE_IN_BYTES, autoBroadcastJoinThreshold + 1L) + def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES, Long.MaxValue) def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index a6b83b3d075045c09591f9554c3b49ffabbe6eed..a5d8cb19eadc99c5dedb741117c710af4b932e4f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -438,7 +438,7 @@ class JoinSuite extends QueryTest with SharedSQLContext { spark.cacheManager.clearCache() sql("CACHE TABLE testData") - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1000000000") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) { Seq( ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[BroadcastHashJoinExec]), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..9523f6f9f5bbf20734dd19fe6ad62656bfe3e39c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ + +class StatisticsSuite extends QueryTest with SharedSQLContext { + + test("SPARK-15392: DataFrame created from RDD should not be broadcasted") { + val rdd = sparkContext.range(1, 100).map(i => Row(i, i)) + val df = spark.createDataFrame(rdd, new StructType().add("a", LongType).add("b", LongType)) + assert(df.queryExecution.analyzed.statistics.sizeInBytes > + spark.wrapped.conf.autoBroadcastJoinThreshold) + assert(df.selectExpr("a").queryExecution.analyzed.statistics.sizeInBytes > + spark.wrapped.conf.autoBroadcastJoinThreshold) + } + +}