Skip to content
Snippets Groups Projects
  1. Mar 21, 2017
    • Patrick Wendell's avatar
      Preparing Spark release v2.1.1-rc1 · 30abb95c
      Patrick Wendell authored
      30abb95c
    • Takeshi Yamamuro's avatar
      [SPARK-19980][SQL][BACKPORT-2.1] Add NULL checks in Bean serializer · a04428fe
      Takeshi Yamamuro authored
      ## What changes were proposed in this pull request?
      A Bean serializer in `ExpressionEncoder`  could change values when Beans having NULL. A concrete example is as follows;
      ```
      scala> :paste
      class Outer extends Serializable {
        private var cls: Inner = _
        def setCls(c: Inner): Unit = cls = c
        def getCls(): Inner = cls
      }
      
      class Inner extends Serializable {
        private var str: String = _
        def setStr(s: String): Unit = str = str
        def getStr(): String = str
      }
      
      scala> Seq("""{"cls":null}""", """{"cls": {"str":null}}""").toDF().write.text("data")
      scala> val encoder = Encoders.bean(classOf[Outer])
      scala> val schema = encoder.schema
      scala> val df = spark.read.schema(schema).json("data").as[Outer](encoder)
      scala> df.show
      +------+
      |   cls|
      +------+
      |[null]|
      |  null|
      +------+
      
      scala> df.map(x => x)(encoder).show()
      +------+
      |   cls|
      +------+
      |[null]|
      |[null]|     // <-- Value changed
      +------+
      ```
      
      This is because the Bean serializer does not have the NULL-check expressions that the serializer of Scala's product types has. Actually, this value change does not happen in Scala's product types;
      
      ```
      scala> :paste
      case class Outer(cls: Inner)
      case class Inner(str: String)
      
      scala> val encoder = Encoders.product[Outer]
      scala> val schema = encoder.schema
      scala> val df = spark.read.schema(schema).json("data").as[Outer](encoder)
      scala> df.show
      +------+
      |   cls|
      +------+
      |[null]|
      |  null|
      +------+
      
      scala> df.map(x => x)(encoder).show()
      +------+
      |   cls|
      +------+
      |[null]|
      |  null|
      +------+
      ```
      
      This pr added the NULL-check expressions in Bean serializer along with the serializer of Scala's product types.
      
      ## How was this patch tested?
      Added tests in `JavaDatasetSuite`.
      
      Author: Takeshi Yamamuro <yamamuro@apache.org>
      
      Closes #17372 from maropu/SPARK-19980-BACKPORT2.1.
      a04428fe
    • Will Manning's avatar
      clarify array_contains function description · 9dfdd2ad
      Will Manning authored
      ## What changes were proposed in this pull request?
      
      The description in the comment for array_contains is vague/incomplete (i.e., doesn't mention that it returns `null` if the array is `null`); this PR fixes that.
      
      ## How was this patch tested?
      
      No testing, since it merely changes a comment.
      
      Please review http://spark.apache.org/contributing.html
      
       before opening a pull request.
      
      Author: Will Manning <lwwmanning@gmail.com>
      
      Closes #17380 from lwwmanning/patch-1.
      
      (cherry picked from commit a04dcde8)
      Signed-off-by: default avatarReynold Xin <rxin@databricks.com>
      9dfdd2ad
  2. Mar 20, 2017
    • wangzhenhua's avatar
      [SPARK-19994][SQL] Wrong outputOrdering for right/full outer smj · af8bf218
      wangzhenhua authored
      
      ## What changes were proposed in this pull request?
      
      For right outer join, values of the left key will be filled with nulls if it can't match the value of the right key, so `nullOrdering` of the left key can't be guaranteed. We should output right key order instead of left key order.
      
      For full outer join, neither left key nor right key guarantees `nullOrdering`. We should not output any ordering.
      
      In tests, besides adding three test cases for left/right/full outer sort merge join, this patch also reorganizes code in `PlannerSuite` by putting together tests for `Sort`, and also extracts common logic in Sort tests into a method.
      
      ## How was this patch tested?
      
      Corresponding test cases are added.
      
      Author: wangzhenhua <wangzhenhua@huawei.com>
      Author: Zhenhua Wang <wzh_zju@163.com>
      
      Closes #17331 from wzhfy/wrongOrdering.
      
      (cherry picked from commit 965a5abc)
      Signed-off-by: default avatarWenchen Fan <wenchen@databricks.com>
      af8bf218
  3. Mar 17, 2017
    • Jacek Laskowski's avatar
      [SQL][MINOR] Fix scaladoc for UDFRegistration · 780f6060
      Jacek Laskowski authored
      
      ## What changes were proposed in this pull request?
      
      Fix scaladoc for UDFRegistration
      
      ## How was this patch tested?
      
      local build
      
      Author: Jacek Laskowski <jacek@japila.pl>
      
      Closes #17337 from jaceklaskowski/udfregistration-scaladoc.
      
      (cherry picked from commit 6326d406)
      Signed-off-by: default avatarReynold Xin <rxin@databricks.com>
      780f6060
    • Liwei Lin's avatar
      [SPARK-19721][SS][BRANCH-2.1] Good error message for version mismatch in log files · 710b5554
      Liwei Lin authored
      ## Problem
      
      There are several places where we write out version identifiers in various logs for structured streaming (usually `v1`). However, in the places where we check for this, we throw a confusing error message.
      
      ## What changes were proposed in this pull request?
      
      This patch made two major changes:
      1. added a `parseVersion(...)` method, and based on this method, fixed the following places the way they did version checking (no other place needed to do this checking):
      ```
      HDFSMetadataLog
        - CompactibleFileStreamLog  ------------> fixed with this patch
          - FileStreamSourceLog  ---------------> inherited the fix of `CompactibleFileStreamLog`
          - FileStreamSinkLog  -----------------> inherited the fix of `CompactibleFileStreamLog`
        - OffsetSeqLog  ------------------------> fixed with this patch
        - anonymous subclass in KafkaSource  ---> fixed with this patch
      ```
      
      2. changed the type of `FileStreamSinkLog.VERSION`, `FileStreamSourceLog.VERSION` etc. from `String` to `Int`, so that we can identify newer versions via `version > 1` instead of `version != "v1"`
          - note this didn't break any backwards compatibility -- we are still writing out `"v1"` and reading back `"v1"`
      
      ## Exception message with this patch
      ```
      java.lang.IllegalStateException: Failed to read log file /private/var/folders/nn/82rmvkk568sd8p3p8tb33trw0000gn/T/spark-86867b65-0069-4ef1-b0eb-d8bd258ff5b8/0. UnsupportedLogVersion: maximum supported log version is v1, but encountered v99. The log file was produced by a newer version of Spark and cannot be read by this version. Please upgrade.
      	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:202)
      	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:78)
      	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:75)
      	at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:133)
      	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite.withTempDir(OffsetSeqLogSuite.scala:26)
      	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply$mcV$sp(OffsetSeqLogSuite.scala:75)
      	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75)
      	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75)
      	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
      	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
      ```
      
      ## How was this patch tested?
      
      unit tests
      
      Author: Liwei Lin <lwlin7@gmail.com>
      
      Closes #17327 from lw-lin/good-msg-2.1.
      710b5554
  4. Mar 16, 2017
    • Xiao Li's avatar
      [SPARK-19765][SPARK-18549][SPARK-19093][SPARK-19736][BACKPORT-2.1][SQL]... · 4b977ff0
      Xiao Li authored
      [SPARK-19765][SPARK-18549][SPARK-19093][SPARK-19736][BACKPORT-2.1][SQL] Backport Three Cache-related PRs to Spark 2.1
      
      ### What changes were proposed in this pull request?
      
      Backport a few cache related PRs:
      
      ---
      [[SPARK-19093][SQL] Cached tables are not used in SubqueryExpression](https://github.com/apache/spark/pull/16493)
      
      Consider the plans inside subquery expressions while looking up cache manager to make
      use of cached data. Currently CacheManager.useCachedData does not consider the
      subquery expressions in the plan.
      
      ---
      [[SPARK-19736][SQL] refreshByPath should clear all cached plans with the specified path](https://github.com/apache/spark/pull/17064)
      
      Catalog.refreshByPath can refresh the cache entry and the associated metadata for all dataframes (if any), that contain the given data source path.
      
      However, CacheManager.invalidateCachedPath doesn't clear all cached plans with the specified path. It causes some strange behaviors reported in SPARK-15678.
      
      ---
      [[SPARK-19765][SPARK-18549][SQL] UNCACHE TABLE should un-cache all cached plans that refer to this table](https://github.com/apache/spark/pull/17097)
      
      When un-cache a table, we should not only remove the cache entry for this table, but also un-cache any other cached plans that refer to this table. The following commands trigger the table uncache: `DropTableCommand`, `TruncateTableCommand`, `AlterTableRenameCommand`, `UncacheTableCommand`, `RefreshTable` and `InsertIntoHiveTable`
      
      This PR also includes some refactors:
      - use java.util.LinkedList to store the cache entries, so that it's safer to remove elements while iterating
      - rename invalidateCache to recacheByPlan, which is more obvious about what it does.
      
      ### How was this patch tested?
      N/A
      
      Author: Xiao Li <gatorsmile@gmail.com>
      
      Closes #17319 from gatorsmile/backport-17097.
      4b977ff0
    • windpiger's avatar
      [SPARK-19329][SQL][BRANCH-2.1] Reading from or writing to a datasource table... · 9d032d02
      windpiger authored
      [SPARK-19329][SQL][BRANCH-2.1] Reading from or writing to a datasource table with a non pre-existing location should succeed
      
      ## What changes were proposed in this pull request?
      
      This is a backport pr of https://github.com/apache/spark/pull/16672 into branch-2.1.
      
      ## How was this patch tested?
      Existing tests.
      
      Author: windpiger <songjun@outlook.com>
      
      Closes #17317 from windpiger/backport-insertnotexists.
      9d032d02
  5. Mar 15, 2017
    • Reynold Xin's avatar
      [SPARK-19944][SQL] Move SQLConf from sql/core to sql/catalyst (branch-2.1) · 80ebca62
      Reynold Xin authored
      ## What changes were proposed in this pull request?
      This patch moves SQLConf from sql/core to sql/catalyst. To minimize the changes, the patch used type alias to still keep CatalystConf (as a type alias) and SimpleCatalystConf (as a concrete class that extends SQLConf).
      
      Motivation for the change is that it is pretty weird to have SQLConf only in sql/core and then we have to duplicate config options that impact optimizer/analyzer in sql/catalyst using CatalystConf.
      
      This is a backport into branch-2.1 to minimize merge conflicts.
      
      ## How was this patch tested?
      N/A
      
      Author: Reynold Xin <rxin@databricks.com>
      
      Closes #17301 from rxin/branch-2.1-conf.
      80ebca62
  6. Mar 14, 2017
    • Wenchen Fan's avatar
      [SPARK-19887][SQL] dynamic partition keys can be null or empty string · a0ce845d
      Wenchen Fan authored
      When dynamic partition value is null or empty string, we should write the data to a directory like `a=__HIVE_DEFAULT_PARTITION__`, when we read the data back, we should respect this special directory name and treat it as null.
      
      This is the same behavior of impala, see https://issues.apache.org/jira/browse/IMPALA-252
      
      
      
      new regression test
      
      Author: Wenchen Fan <wenchen@databricks.com>
      
      Closes #17277 from cloud-fan/partition.
      
      (cherry picked from commit dacc382f)
      Signed-off-by: default avatarWenchen Fan <wenchen@databricks.com>
      a0ce845d
    • Herman van Hovell's avatar
      [SPARK-19933][SQL] Do not change output of a subquery · 45457825
      Herman van Hovell authored
      
      ## What changes were proposed in this pull request?
      The `RemoveRedundantAlias` rule can change the output attributes (the expression id's to be precise) of a query by eliminating the redundant alias producing them. This is no problem for a regular query, but can cause problems for correlated subqueries: The attributes produced by the subquery are used in the parent plan; changing them will break the parent plan.
      
      This PR fixes this by wrapping a subquery in a `Subquery` top level node when it gets optimized. The `RemoveRedundantAlias` rule now recognizes `Subquery` and makes sure that the output attributes of the `Subquery` node are retained.
      
      ## How was this patch tested?
      Added a test case to `RemoveRedundantAliasAndProjectSuite` and added a regression test to `SubquerySuite`.
      
      Author: Herman van Hovell <hvanhovell@databricks.com>
      
      Closes #17278 from hvanhovell/SPARK-19933.
      
      (cherry picked from commit e04c05cf)
      Signed-off-by: default avatarHerman van Hovell <hvanhovell@databricks.com>
      45457825
  7. Mar 12, 2017
  8. Mar 10, 2017
    • Budde's avatar
      [SPARK-19611][SQL] Introduce configurable table schema inference · e481a738
      Budde authored
      Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties.
      
      - Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf
      - Add schemaPreservesCase field to CatalogTable (set to false when schema can't
        successfully be read from Hive table props)
      - Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is
        false, depending on spark.sql.hive.caseSensitiveInferenceMode
      - Add alterTableSchema() method to the ExternalCatalog interface
      - Add HiveSchemaInferenceSuite tests
      - Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as
        HiveMetastoreCatalog.mergeWithMetastoreSchema
      - Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite
      
      [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611)
      
      The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference is working as expected. ```ExternalCatalogSuite``` has also been extended to cover the new ```alterTableSchema()``` API.
      
      Author: Budde <budde@amazon.com>
      
      Closes #17229 from budde/SPARK-19611-2.1.
      e481a738
    • Wenchen Fan's avatar
      [SPARK-19893][SQL] should not run DataFrame set oprations with map type · 5a2ad431
      Wenchen Fan authored
      
      In spark SQL, map type can't be used in equality test/comparison, and `Intersect`/`Except`/`Distinct` do need equality test for all columns, we should not allow map type in `Intersect`/`Except`/`Distinct`.
      
      new regression test
      
      Author: Wenchen Fan <wenchen@databricks.com>
      
      Closes #17236 from cloud-fan/map.
      
      (cherry picked from commit fb9beda5)
      Signed-off-by: default avatarWenchen Fan <wenchen@databricks.com>
      5a2ad431
    • Tyson Condie's avatar
      [SPARK-19891][SS] Await Batch Lock notified on stream execution exit · f0d50fd5
      Tyson Condie authored
      ## What changes were proposed in this pull request?
      
      We need to notify the await batch lock when the stream exits early e.g., when an exception has been thrown.
      
      ## How was this patch tested?
      
      Current tests that throw exceptions at runtime will finish faster as a result of this update.
      
      zsxwing
      
      Please review http://spark.apache.org/contributing.html
      
       before opening a pull request.
      
      Author: Tyson Condie <tcondie@gmail.com>
      
      Closes #17231 from tcondie/kafka-writer.
      
      (cherry picked from commit 501b7111)
      Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
      f0d50fd5
  9. Mar 09, 2017
    • uncleGen's avatar
      [SPARK-19861][SS] watermark should not be a negative time. · ffe65b06
      uncleGen authored
      
      ## What changes were proposed in this pull request?
      
      `watermark` should not be negative. This behavior is invalid, check it before real run.
      
      ## How was this patch tested?
      
      add new unit test.
      
      Author: uncleGen <hustyugm@gmail.com>
      Author: dylon <hustyugm@gmail.com>
      
      Closes #17202 from uncleGen/SPARK-19861.
      
      (cherry picked from commit 30b18e69)
      Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
      ffe65b06
    • Jason White's avatar
      [SPARK-19561][SQL] add int case handling for TimestampType · 2a76e242
      Jason White authored
      ## What changes were proposed in this pull request?
      
      Add handling of input of type `Int` for dataType `TimestampType` to `EvaluatePython.scala`. Py4J serializes ints smaller than MIN_INT or larger than MAX_INT to Long, which are handled correctly already, but values between MIN_INT and MAX_INT are serialized to Int.
      
      These range limits correspond to roughly half an hour on either side of the epoch. As a result, PySpark doesn't allow TimestampType values to be created in this range.
      
      Alternatives attempted: patching the `TimestampType.toInternal` function to cast return values to `long`, so Py4J would always serialize them to Scala Long. Python3 does not have a `long` type, so this approach failed on Python3.
      
      ## How was this patch tested?
      
      Added a new PySpark-side test that fails without the change.
      
      The contribution is my original work and I license the work to the project under the project’s open source license.
      
      Resubmission of https://github.com/apache/spark/pull/16896
      
      . The original PR didn't go through Jenkins and broke the build. davies dongjoon-hyun
      
      cloud-fan Could you kick off a Jenkins run for me? It passed everything for me locally, but it's possible something has changed in the last few weeks.
      
      Author: Jason White <jason.white@shopify.com>
      
      Closes #17200 from JasonMWhite/SPARK-19561.
      
      (cherry picked from commit 206030bd)
      Signed-off-by: default avatarWenchen Fan <wenchen@databricks.com>
      2a76e242
    • uncleGen's avatar
      [SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one. · 0c140c16
      uncleGen authored
      
      ## What changes were proposed in this pull request?
      
      A follow up to SPARK-19859:
      
      - extract the calculation of `delayMs` and reuse it.
      - update EventTimeWatermarkExec
      - use the correct `delayMs` in EventTimeWatermark
      
      ## How was this patch tested?
      
      Jenkins.
      
      Author: uncleGen <hustyugm@gmail.com>
      
      Closes #17221 from uncleGen/SPARK-19859.
      
      (cherry picked from commit eeb1d6db)
      Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
      0c140c16
  10. Mar 08, 2017
    • Dilip Biswal's avatar
      [MINOR][SQL] The analyzer rules are fired twice for cases when... · 78cc5721
      Dilip Biswal authored
      [MINOR][SQL] The analyzer rules are fired twice for cases when AnalysisException is raised from analyzer.
      
      ## What changes were proposed in this pull request?
      In general we have a checkAnalysis phase which validates the logical plan and throws AnalysisException on semantic errors. However we also can throw AnalysisException from a few analyzer rules like ResolveSubquery.
      
      I found that we fire up the analyzer rules twice for the queries that throw AnalysisException from one of the analyzer rules. This is a very minor fix. We don't have to strictly fix it. I just got confused seeing the rule getting fired two times when i was not expecting it.
      
      ## How was this patch tested?
      
      Tested manually.
      
      Author: Dilip Biswal <dbiswal@us.ibm.com>
      
      Closes #17214 from dilipbiswal/analyis_twice.
      
      (cherry picked from commit d809ceed)
      Signed-off-by: default avatarXiao Li <gatorsmile@gmail.com>
      78cc5721
    • Shixiong Zhu's avatar
    • Burak Yavuz's avatar
      [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in... · f6c1ad2e
      Burak Yavuz authored
      [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in combination with maxFileAge in FileStreamSource
      
      ## What changes were proposed in this pull request?
      
      **The Problem**
      There is a file stream source option called maxFileAge which limits how old the files can be, relative the latest file that has been seen. This is used to limit the files that need to be remembered as "processed". Files older than the latest processed files are ignored. This values is by default 7 days.
      This causes a problem when both
      latestFirst = true
      maxFilesPerTrigger > total files to be processed.
      Here is what happens in all combinations
      1) latestFirst = false - Since files are processed in order, there wont be any unprocessed file older than the latest processed file. All files will be processed.
      2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is not, then all old files get processed in the first batch, and so no file is left behind.
      3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch process the latest X files. That sets the threshold latest file - maxFileAge, so files older than this threshold will never be considered for processing.
      The bug is with case 3.
      
      **The Solution**
      
      Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are set.
      
      ## How was this patch tested?
      
      Regression test in `FileStreamSourceSuite`
      
      Author: Burak Yavuz <brkyvz@gmail.com>
      
      Closes #17153 from brkyvz/maxFileAge.
      
      (cherry picked from commit a3648b5d)
      Signed-off-by: default avatarBurak Yavuz <brkyvz@gmail.com>
      f6c1ad2e
  11. Mar 07, 2017
  12. Mar 03, 2017
  13. Mar 02, 2017
  14. Mar 01, 2017
    • Stan Zhai's avatar
      [SPARK-19766][SQL] Constant alias columns in INNER JOIN should not be folded... · bbe0d8ca
      Stan Zhai authored
      [SPARK-19766][SQL] Constant alias columns in INNER JOIN should not be folded by FoldablePropagation rule
      
      ## What changes were proposed in this pull request?
      This PR fixes the code in Optimizer phase where the constant alias columns of a `INNER JOIN` query are folded in Rule `FoldablePropagation`.
      
      For the following query():
      
      ```
      val sqlA =
        """
          |create temporary view ta as
          |select a, 'a' as tag from t1 union all
          |select a, 'b' as tag from t2
        """.stripMargin
      
      val sqlB =
        """
          |create temporary view tb as
          |select a, 'a' as tag from t3 union all
          |select a, 'b' as tag from t4
        """.stripMargin
      
      val sql =
        """
          |select tb.* from ta inner join tb on
          |ta.a = tb.a and
          |ta.tag = tb.tag
        """.stripMargin
      ```
      
      The tag column is an constant alias column, it's folded by `FoldablePropagation` like this:
      
      ```
      TRACE SparkOptimizer:
      === Applying Rule org.apache.spark.sql.catalyst.optimizer.FoldablePropagation ===
       Project [a#4, tag#14]                              Project [a#4, tag#14]
      !+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14))   +- Join Inner, ((a#0 = a#4) && (a = a))
          :- Union                                           :- Union
          :  :- Project [a#0, a AS tag#8]                    :  :- Project [a#0, a AS tag#8]
          :  :  +- LocalRelation [a#0]                       :  :  +- LocalRelation [a#0]
          :  +- Project [a#2, b AS tag#9]                    :  +- Project [a#2, b AS tag#9]
          :     +- LocalRelation [a#2]                       :     +- LocalRelation [a#2]
          +- Union                                           +- Union
             :- Project [a#4, a AS tag#14]                      :- Project [a#4, a AS tag#14]
             :  +- LocalRelation [a#4]                          :  +- LocalRelation [a#4]
             +- Project [a#6, b AS tag#15]                      +- Project [a#6, b AS tag#15]
                +- LocalRelation [a#6]                             +- LocalRelation [a#6]
      ```
      
      Finally the Result of Batch Operator Optimizations is:
      
      ```
      Project [a#4, tag#14]                              Project [a#4, tag#14]
      !+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14))   +- Join Inner, (a#0 = a#4)
      !   :- SubqueryAlias ta, `ta`                          :- Union
      !   :  +- Union                                        :  :- LocalRelation [a#0]
      !   :     :- Project [a#0, a AS tag#8]                 :  +- LocalRelation [a#2]
      !   :     :  +- SubqueryAlias t1, `t1`                 +- Union
      !   :     :     +- Project [a#0]                          :- LocalRelation [a#4, tag#14]
      !   :     :        +- SubqueryAlias grouping              +- LocalRelation [a#6, tag#15]
      !   :     :           +- LocalRelation [a#0]
      !   :     +- Project [a#2, b AS tag#9]
      !   :        +- SubqueryAlias t2, `t2`
      !   :           +- Project [a#2]
      !   :              +- SubqueryAlias grouping
      !   :                 +- LocalRelation [a#2]
      !   +- SubqueryAlias tb, `tb`
      !      +- Union
      !         :- Project [a#4, a AS tag#14]
      !         :  +- SubqueryAlias t3, `t3`
      !         :     +- Project [a#4]
      !         :        +- SubqueryAlias grouping
      !         :           +- LocalRelation [a#4]
      !         +- Project [a#6, b AS tag#15]
      !            +- SubqueryAlias t4, `t4`
      !               +- Project [a#6]
      !                  +- SubqueryAlias grouping
      !                     +- LocalRelation [a#6]
      ```
      
      The condition `tag#8 = tag#14` of INNER JOIN has been removed. This leads to the data of inner join being wrong.
      
      After fix:
      
      ```
      === Result of Batch LocalRelation ===
       GlobalLimit 21                                           GlobalLimit 21
       +- LocalLimit 21                                         +- LocalLimit 21
          +- Project [a#4, tag#11]                                 +- Project [a#4, tag#11]
             +- Join Inner, ((a#0 = a#4) && (tag#8 = tag#11))         +- Join Inner, ((a#0 = a#4) && (tag#8 = tag#11))
      !         :- SubqueryAlias ta                                      :- Union
      !         :  +- Union                                              :  :- LocalRelation [a#0, tag#8]
      !         :     :- Project [a#0, a AS tag#8]                       :  +- LocalRelation [a#2, tag#9]
      !         :     :  +- SubqueryAlias t1                             +- Union
      !         :     :     +- Project [a#0]                                :- LocalRelation [a#4, tag#11]
      !         :     :        +- SubqueryAlias grouping                    +- LocalRelation [a#6, tag#12]
      !         :     :           +- LocalRelation [a#0]
      !         :     +- Project [a#2, b AS tag#9]
      !         :        +- SubqueryAlias t2
      !         :           +- Project [a#2]
      !         :              +- SubqueryAlias grouping
      !         :                 +- LocalRelation [a#2]
      !         +- SubqueryAlias tb
      !            +- Union
      !               :- Project [a#4, a AS tag#11]
      !               :  +- SubqueryAlias t3
      !               :     +- Project [a#4]
      !               :        +- SubqueryAlias grouping
      !               :           +- LocalRelation [a#4]
      !               +- Project [a#6, b AS tag#12]
      !                  +- SubqueryAlias t4
      !                     +- Project [a#6]
      !                        +- SubqueryAlias grouping
      !                           +- LocalRelation [a#6]
      ```
      
      ## How was this patch tested?
      
      add sql-tests/inputs/inner-join.sql
      All tests passed.
      
      Author: Stan Zhai <zhaishidan@haizhi.com>
      
      Closes #17099 from stanzhai/fix-inner-join.
      
      (cherry picked from commit 5502a9cf)
      Signed-off-by: default avatarXiao Li <gatorsmile@gmail.com>
      bbe0d8ca
    • Jeff Zhang's avatar
      [SPARK-19572][SPARKR] Allow to disable hive in sparkR shell · f719cccd
      Jeff Zhang authored
      
      ## What changes were proposed in this pull request?
      SPARK-15236 do this for scala shell, this ticket is for sparkR shell. This is not only for sparkR itself, but can also benefit downstream project like livy which use shell.R for its interactive session. For now, livy has no control of whether enable hive or not.
      
      ## How was this patch tested?
      
      Tested it manually, run `bin/sparkR --master local --conf spark.sql.catalogImplementation=in-memory` and verify hive is not enabled.
      
      Author: Jeff Zhang <zjffdu@apache.org>
      
      Closes #16907 from zjffdu/SPARK-19572.
      
      (cherry picked from commit 73158805)
      Signed-off-by: default avatarFelix Cheung <felixcheung@apache.org>
      f719cccd
  15. Feb 28, 2017
    • Roberto Agostino Vitillo's avatar
      [SPARK-19677][SS] Committing a delta file atop an existing one should not fail on HDFS · 947c0cd9
      Roberto Agostino Vitillo authored
      ## What changes were proposed in this pull request?
      
      HDFSBackedStateStoreProvider fails to rename files on HDFS but not on the local filesystem. According to the [implementation notes](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html
      
      ) of `rename()`, the behavior of the local filesystem and HDFS varies:
      
      > Destination exists and is a file
      > Renaming a file atop an existing file is specified as failing, raising an exception.
      >    - Local FileSystem : the rename succeeds; the destination file is replaced by the source file.
      >    - HDFS : The rename fails, no exception is raised. Instead the method call simply returns false.
      
      This patch ensures that `rename()` isn't called if the destination file already exists. It's still semantically correct because Structured Streaming requires that rerunning a batch should generate the same output.
      
      ## How was this patch tested?
      
      This patch was tested by running `StateStoreSuite`.
      
      Author: Roberto Agostino Vitillo <ra.vitillo@gmail.com>
      
      Closes #17012 from vitillo/fix_rename.
      
      (cherry picked from commit 9734a928)
      Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
      947c0cd9
    • windpiger's avatar
      [SPARK-19748][SQL] refresh function has a wrong order to do cache invalidate... · 4b4c3bf3
      windpiger authored
      [SPARK-19748][SQL] refresh function has a wrong order to do cache invalidate and regenerate the inmemory var for InMemoryFileIndex with FileStatusCache
      
      ## What changes were proposed in this pull request?
      
      If we refresh a InMemoryFileIndex with a FileStatusCache, it will first use the FileStatusCache to re-generate the cachedLeafFiles etc, then call FileStatusCache.invalidateAll.
      
      While the order to do these two actions is wrong, this lead to the refresh action does not take effect.
      
      ```
        override def refresh(): Unit = {
          refresh0()
          fileStatusCache.invalidateAll()
        }
      
        private def refresh0(): Unit = {
          val files = listLeafFiles(rootPaths)
          cachedLeafFiles =
            new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
          cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
          cachedPartitionSpec = null
        }
      ```
      ## How was this patch tested?
      unit test added
      
      Author: windpiger <songjun@outlook.com>
      
      Closes #17079 from windpiger/fixInMemoryFileIndexRefresh.
      
      (cherry picked from commit a350bc16)
      Signed-off-by: default avatarWenchen Fan <wenchen@databricks.com>
      4b4c3bf3
  16. Feb 26, 2017
    • Eyal Zituny's avatar
      [SPARK-19594][STRUCTURED STREAMING] StreamingQueryListener fails to handle... · 04fbb9e0
      Eyal Zituny authored
      [SPARK-19594][STRUCTURED STREAMING] StreamingQueryListener fails to handle QueryTerminatedEvent if more then one listeners exists
      
      ## What changes were proposed in this pull request?
      
      currently if multiple streaming queries listeners exists, when a QueryTerminatedEvent is triggered, only one of the listeners will be invoked while the rest of the listeners will ignore the event.
      this is caused since the the streaming queries listeners bus holds a set of running queries ids and when a termination event is triggered, after the first listeners is handling the event, the terminated query id is being removed from the set.
      in this PR, the query id will be removed from the set only after all the listeners handles the event
      
      ## How was this patch tested?
      
      a test with multiple listeners has been added to StreamingQueryListenerSuite
      
      Author: Eyal Zituny <eyal.zituny@equalum.io>
      
      Closes #16991 from eyalzit/master.
      
      (cherry picked from commit 9f8e3921)
      Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
      04fbb9e0
  17. Feb 24, 2017
    • Takeshi Yamamuro's avatar
      [SPARK-19691][SQL][BRANCH-2.1] Fix ClassCastException when calculating percentile of decimal column · 66a7ca28
      Takeshi Yamamuro authored
      ## What changes were proposed in this pull request?
      This is a backport of the two following commits: https://github.com/apache/spark/commit/93aa4271596a30752dc5234d869c3ae2f6e8e723
      
      This pr fixed a class-cast exception below;
      ```
      scala> spark.range(10).selectExpr("cast (id as decimal) as x").selectExpr("percentile(x, 0.5)").collect()
       java.lang.ClassCastException: org.apache.spark.sql.types.Decimal cannot be cast to java.lang.Number
      	at org.apache.spark.sql.catalyst.expressions.aggregate.Percentile.update(Percentile.scala:141)
      	at org.apache.spark.sql.catalyst.expressions.aggregate.Percentile.update(Percentile.scala:58)
      	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.update(interfaces.scala:514)
      	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
      	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
      	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:187)
      	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:181)
      	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:151)
      	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:78)
      	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
      	at
      ```
      This fix simply converts catalyst values (i.e., `Decimal`) into scala ones by using `CatalystTypeConverters`.
      
      ## How was this patch tested?
      Added a test in `DataFrameSuite`.
      
      Author: Takeshi Yamamuro <yamamuro@apache.org>
      
      Closes #17046 from maropu/SPARK-19691-BACKPORT2.1.
      66a7ca28
  18. Feb 23, 2017
  19. Feb 21, 2017
  20. Feb 17, 2017
    • Davies Liu's avatar
      [SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMap · 6e3abed8
      Davies Liu authored
      
      ## What changes were proposed in this pull request?
      
      Radix sort require that half of array as free (as temporary space), so we use 0.5 as the scale factor to make sure that BytesToBytesMap will not have more items than 1/2 of capacity. Turned out this is not true, the current implementation of append() could leave 1 more item than the threshold (1/2 of capacity) in the array, which break the requirement of radix sort (fail the assert in 2.2, or fail to insert into InMemorySorter in 2.1).
      
      This PR fix the off-by-one bug in BytesToBytesMap.
      
      This PR also fix a bug that the array will never grow if it fail to grow once (stay as initial capacity), introduced by #15722 .
      
      ## How was this patch tested?
      
      Added regression test.
      
      Author: Davies Liu <davies@databricks.com>
      
      Closes #16844 from davies/off_by_one.
      
      (cherry picked from commit 3d0c3af0)
      Signed-off-by: default avatarDavies Liu <davies.liu@gmail.com>
      6e3abed8
  21. Feb 15, 2017
    • Shixiong Zhu's avatar
      [SPARK-19603][SS] Fix StreamingQuery explain command · db7adb61
      Shixiong Zhu authored
      
      ## What changes were proposed in this pull request?
      
      `StreamingQuery.explain` doesn't show the correct streaming physical plan right now because `ExplainCommand` receives a runtime batch plan and its `logicalPlan.isStreaming` is always false.
      
      This PR adds `streaming` parameter to `ExplainCommand` to allow `StreamExecution` to specify that it's a streaming plan.
      
      Examples of the explain outputs:
      
      - streaming DataFrame.explain()
      ```
      == Physical Plan ==
      *HashAggregate(keys=[value#518], functions=[count(1)])
      +- StateStoreSave [value#518], OperatorStateId(<unknown>,0,0), Append, 0
         +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
            +- StateStoreRestore [value#518], OperatorStateId(<unknown>,0,0)
               +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
                  +- Exchange hashpartitioning(value#518, 5)
                     +- *HashAggregate(keys=[value#518], functions=[partial_count(1)])
                        +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
                           +- *MapElements <function1>, obj#517: java.lang.String
                              +- *DeserializeToObject value#513.toString, obj#516: java.lang.String
                                 +- StreamingRelation MemoryStream[value#513], [value#513]
      ```
      
      - StreamingQuery.explain(extended = false)
      ```
      == Physical Plan ==
      *HashAggregate(keys=[value#518], functions=[count(1)])
      +- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0
         +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
            +- StateStoreRestore [value#518], OperatorStateId(...,0,0)
               +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
                  +- Exchange hashpartitioning(value#518, 5)
                     +- *HashAggregate(keys=[value#518], functions=[partial_count(1)])
                        +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
                           +- *MapElements <function1>, obj#517: java.lang.String
                              +- *DeserializeToObject value#543.toString, obj#516: java.lang.String
                                 +- LocalTableScan [value#543]
      ```
      
      - StreamingQuery.explain(extended = true)
      ```
      == Parsed Logical Plan ==
      Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
      +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
         +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
            +- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String
               +- LocalRelation [value#543]
      
      == Analyzed Logical Plan ==
      value: string, count(1): bigint
      Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
      +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
         +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
            +- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String
               +- LocalRelation [value#543]
      
      == Optimized Logical Plan ==
      Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
      +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
         +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
            +- DeserializeToObject value#543.toString, obj#516: java.lang.String
               +- LocalRelation [value#543]
      
      == Physical Plan ==
      *HashAggregate(keys=[value#518], functions=[count(1)], output=[value#518, count(1)#524L])
      +- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0
         +- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L])
            +- StateStoreRestore [value#518], OperatorStateId(...,0,0)
               +- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L])
                  +- Exchange hashpartitioning(value#518, 5)
                     +- *HashAggregate(keys=[value#518], functions=[partial_count(1)], output=[value#518, count#530L])
                        +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
                           +- *MapElements <function1>, obj#517: java.lang.String
                              +- *DeserializeToObject value#543.toString, obj#516: java.lang.String
                                 +- LocalTableScan [value#543]
      ```
      
      ## How was this patch tested?
      
      The updated unit test.
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #16934 from zsxwing/SPARK-19603.
      
      (cherry picked from commit fc02ef95)
      Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
      db7adb61
    • Shixiong Zhu's avatar
      [SPARK-19599][SS] Clean up HDFSMetadataLog · 88c43f4f
      Shixiong Zhu authored
      ## What changes were proposed in this pull request?
      
      SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some cleanup for HDFSMetadataLog.
      
      This PR includes the following changes:
      - ~~Remove the workaround codes for HADOOP-10622.~~ Unfortunately, there is another issue [HADOOP-14084](https://issues.apache.org/jira/browse/HADOOP-14084
      
      ) that prevents us from removing the workaround codes.
      - Remove unnecessary `writer: (T, OutputStream) => Unit` and just call `serialize` directly.
      - Remove catching FileNotFoundException.
      
      ## How was this patch tested?
      
      Jenkins
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #16932 from zsxwing/metadata-cleanup.
      
      (cherry picked from commit 21b4ba2d)
      Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
      88c43f4f
    • Felix Cheung's avatar
      [SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column · 6c353990
      Felix Cheung authored
      
      Add coalesce on DataFrame for down partitioning without shuffle and coalesce on Column
      
      manual, unit tests
      
      Author: Felix Cheung <felixcheung_m@hotmail.com>
      
      Closes #16739 from felixcheung/rcoalesce.
      
      (cherry picked from commit 671bc08e)
      Signed-off-by: default avatarFelix Cheung <felixcheung@apache.org>
      6c353990
  22. Feb 13, 2017
  23. Feb 10, 2017
Loading