Skip to content
Snippets Groups Projects
  1. Mar 15, 2017
    • jiangxingbo's avatar
      [SPARK-19960][CORE] Move `SparkHadoopWriter` to `internal/io/` · 97cc5e5a
      jiangxingbo authored
      ## What changes were proposed in this pull request?
      
      This PR introduces the following changes:
      1. Move `SparkHadoopWriter` to `core/internal/io/`, so that it's in the same directory with `SparkHadoopMapReduceWriter`;
      2. Move `SparkHadoopWriterUtils` to a separated file.
      
      After this PR is merged, we may consolidate `SparkHadoopWriter` and `SparkHadoopMapReduceWriter`, and make the new commit protocol support the old `mapred` package's committer;
      
      ## How was this patch tested?
      
      Tested by existing test cases.
      
      Author: jiangxingbo <jiangxb1987@gmail.com>
      
      Closes #17304 from jiangxb1987/writer.
      97cc5e5a
    • Tejas Patil's avatar
      [SPARK-13450] Introduce ExternalAppendOnlyUnsafeRowArray. Change... · 02c274ea
      Tejas Patil authored
      [SPARK-13450] Introduce ExternalAppendOnlyUnsafeRowArray. Change CartesianProductExec, SortMergeJoin, WindowExec to use it
      
      ## What issue does this PR address ?
      
      Jira: https://issues.apache.org/jira/browse/SPARK-13450
      
      In `SortMergeJoinExec`, rows of the right relation having the same value for a join key are buffered in-memory. In case of skew, this causes OOMs (see comments in SPARK-13450 for more details). Heap dump from a failed job confirms this : https://issues.apache.org/jira/secure/attachment/12846382/heap-dump-analysis.png . While its possible to increase the heap size to workaround, Spark should be resilient to such issues as skews can happen arbitrarily.
      
      ## Change proposed in this pull request
      
      - Introduces `ExternalAppendOnlyUnsafeRowArray`
        - It holds `UnsafeRow`s in-memory upto a certain threshold.
        - After the threshold is hit, it switches to `UnsafeExternalSorter` which enables spilling of the rows to disk. It does NOT sort the data.
        - Allows iterating the array multiple times. However, any alteration to the array (using `add` or `clear`) will invalidate the existing iterator(s)
      - `WindowExec` was already using `UnsafeExternalSorter` to support spilling. Changed it to use the new array
      - Changed `SortMergeJoinExec` to use the new array implementation
        - NOTE: I have not changed FULL OUTER JOIN to use this new array implementation. Changing that will need more surgery and I will rather put up a separate PR for that once this gets in.
      - Changed `CartesianProductExec` to use the new array implementation
      
      #### Note for reviewers
      
      The diff can be divided into 3 parts. My motive behind having all the changes in a single PR was to demonstrate that the API is sane and supports 2 use cases. If reviewing as 3 separate PRs would help, I am happy to make the split.
      
      ## How was this patch tested ?
      
      #### Unit testing
      - Added unit tests `ExternalAppendOnlyUnsafeRowArray` to validate all its APIs and access patterns
      - Added unit test for `SortMergeExec`
        - with and without spill for inner join, left outer join, right outer join to confirm that the spill threshold config behaves as expected and output is as expected.
        - This PR touches the scanning logic in `SortMergeExec` for _all_ joins (except FULL OUTER JOIN). However, I expect existing test cases to cover that there is no regression in correctness.
      - Added unit test for `WindowExec` to check behavior of spilling and correctness of results.
      
      #### Stress testing
      - Confirmed that OOM is gone by running against a production job which used to OOM
      - Since I cannot share details about prod workload externally, created synthetic data to mimic the issue. Ran before and after the fix to demonstrate the issue and query success with this PR
      
      Generating the synthetic data
      
      ```
      ./bin/spark-shell --driver-memory=6G
      
      import org.apache.spark.sql._
      val hc = SparkSession.builder.master("local").getOrCreate()
      
      hc.sql("DROP TABLE IF EXISTS spark_13450_large_table").collect
      hc.sql("DROP TABLE IF EXISTS spark_13450_one_row_table").collect
      
      val df1 = (0 until 1).map(i => ("10", "100", i.toString, (i * 2).toString)).toDF("i", "j", "str1", "str2")
      df1.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(100, "i", "j").sortBy("i", "j").saveAsTable("spark_13450_one_row_table")
      
      val df2 = (0 until 3000000).map(i => ("10", "100", i.toString, (i * 2).toString)).toDF("i", "j", "str1", "str2")
      df2.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(100, "i", "j").sortBy("i", "j").saveAsTable("spark_13450_large_table")
      ```
      
      Ran this against trunk VS local build with this PR. OOM repros with trunk and with the fix this query runs fine.
      
      ```
      ./bin/spark-shell --driver-java-options="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/spark.driver.heapdump.hprof"
      
      import org.apache.spark.sql._
      val hc = SparkSession.builder.master("local").getOrCreate()
      hc.sql("SET spark.sql.autoBroadcastJoinThreshold=1")
      hc.sql("SET spark.sql.sortMergeJoinExec.buffer.spill.threshold=10000")
      
      hc.sql("DROP TABLE IF EXISTS spark_13450_result").collect
      hc.sql("""
        CREATE TABLE spark_13450_result
        AS
        SELECT
          a.i AS a_i, a.j AS a_j, a.str1 AS a_str1, a.str2 AS a_str2,
          b.i AS b_i, b.j AS b_j, b.str1 AS b_str1, b.str2 AS b_str2
        FROM
          spark_13450_one_row_table a
        JOIN
          spark_13450_large_table b
        ON
          a.i=b.i AND
          a.j=b.j
      """)
      ```
      
      ## Performance comparison
      
      ### Macro-benchmark
      
      I ran a SMB join query over two real world tables (2 trillion rows (40 TB) and 6 million rows (120 GB)). Note that this dataset does not have skew so no spill happened. I saw improvement in CPU time by 2-4% over version without this PR. This did not add up as I was expected some regression. I think allocating array of capacity of 128 at the start (instead of starting with default size 16) is the sole reason for the perf. gain : https://github.com/tejasapatil/spark/blob/SPARK-13450_smb_buffer_oom/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala#L43 . I could remove that and rerun, but effectively the change will be deployed in this form and I wanted to see the effect of it over large workload.
      
      ### Micro-benchmark
      
      Two types of benchmarking can be found in `ExternalAppendOnlyUnsafeRowArrayBenchmark`:
      
      [A] Comparing `ExternalAppendOnlyUnsafeRowArray` against raw `ArrayBuffer` when all rows fit in-memory and there is no spill
      
      ```
      Array with 1000 rows:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
      ------------------------------------------------------------------------------------------------
      ArrayBuffer                                   7821 / 7941         33.5          29.8       1.0X
      ExternalAppendOnlyUnsafeRowArray              8798 / 8819         29.8          33.6       0.9X
      
      Array with 30000 rows:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
      ------------------------------------------------------------------------------------------------
      ArrayBuffer                                 19200 / 19206         25.6          39.1       1.0X
      ExternalAppendOnlyUnsafeRowArray            19558 / 19562         25.1          39.8       1.0X
      
      Array with 100000 rows:                  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
      ------------------------------------------------------------------------------------------------
      ArrayBuffer                                   5949 / 6028         17.2          58.1       1.0X
      ExternalAppendOnlyUnsafeRowArray              6078 / 6138         16.8          59.4       1.0X
      ```
      
      [B] Comparing `ExternalAppendOnlyUnsafeRowArray` against raw `UnsafeExternalSorter` when there is spilling of data
      
      ```
      Spilling with 1000 rows:                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
      ------------------------------------------------------------------------------------------------
      UnsafeExternalSorter                          9239 / 9470         28.4          35.2       1.0X
      ExternalAppendOnlyUnsafeRowArray              8857 / 8909         29.6          33.8       1.0X
      
      Spilling with 10000 rows:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
      ------------------------------------------------------------------------------------------------
      UnsafeExternalSorter                             4 /    5         39.3          25.5       1.0X
      ExternalAppendOnlyUnsafeRowArray                 5 /    6         29.8          33.5       0.8X
      ```
      
      Author: Tejas Patil <tejasp@fb.com>
      
      Closes #16909 from tejasapatil/SPARK-13450_smb_buffer_oom.
      02c274ea
    • hyukjinkwon's avatar
      [SPARK-19872] [PYTHON] Use the correct deserializer for RDD construction for coalesce/repartition · 7387126f
      hyukjinkwon authored
      ## What changes were proposed in this pull request?
      
      This PR proposes to use the correct deserializer, `BatchedSerializer` for RDD construction for coalesce/repartition when the shuffle is enabled. Currently, it is passing `UTF8Deserializer` as is not `BatchedSerializer` from the copied one.
      
      with the file, `text.txt` below:
      
      ```
      a
      b
      
      d
      e
      f
      g
      h
      i
      j
      k
      l
      
      ```
      
      - Before
      
      ```python
      >>> sc.textFile('text.txt').repartition(1).collect()
      ```
      
      ```
      UTF8Deserializer(True)
      Traceback (most recent call last):
        File "<stdin>", line 1, in <module>
        File ".../spark/python/pyspark/rdd.py", line 811, in collect
          return list(_load_from_socket(port, self._jrdd_deserializer))
        File ".../spark/python/pyspark/serializers.py", line 549, in load_stream
          yield self.loads(stream)
        File ".../spark/python/pyspark/serializers.py", line 544, in loads
          return s.decode("utf-8") if self.use_unicode else s
        File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode
          return codecs.utf_8_decode(input, errors, True)
      UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0: invalid start byte
      ```
      
      - After
      
      ```python
      >>> sc.textFile('text.txt').repartition(1).collect()
      ```
      
      ```
      [u'a', u'b', u'', u'd', u'e', u'f', u'g', u'h', u'i', u'j', u'k', u'l', u'']
      ```
      
      ## How was this patch tested?
      
      Unit test in `python/pyspark/tests.py`.
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #17282 from HyukjinKwon/SPARK-19872.
      7387126f
    • Herman van Hovell's avatar
      [SPARK-19889][SQL] Make TaskContext callbacks thread safe · 9ff85be3
      Herman van Hovell authored
      ## What changes were proposed in this pull request?
      It is sometimes useful to use multiple threads in a task to parallelize tasks. These threads might register some completion/failure listeners to clean up when the task completes or fails. We currently cannot register such a callback and be sure that it will get called, because the context might be in the process of invoking its callbacks, when the the callback gets registered.
      
      This PR improves this by making sure that you cannot add a completion/failure listener from a different thread when the context is being marked as completed/failed in another thread. This is done by synchronizing these methods on the task context itself.
      
      Failure listeners were called only once. Completion listeners now follow the same pattern; this lifts the idempotency requirement for completion listeners and makes it easier to implement them. In some cases we can (accidentally) add a completion/failure listener after the fact, these listeners will be called immediately in order make sure we can safely clean-up after a task.
      
      As a result of this change we could make the `failure` and `completed` flags non-volatile. The `isCompleted()` method now uses synchronization to ensure that updates are visible across threads.
      
      ## How was this patch tested?
      Adding tests to `TaskContestSuite` to test adding listeners to a completed/failed context.
      
      Author: Herman van Hovell <hvanhovell@databricks.com>
      
      Closes #17244 from hvanhovell/SPARK-19889.
      9ff85be3
    • jiangxingbo's avatar
      [SPARK-19877][SQL] Restrict the nested level of a view · ee36bc1c
      jiangxingbo authored
      ## What changes were proposed in this pull request?
      
      We should restrict the nested level of a view, to avoid stack overflow exception during the view resolution.
      
      ## How was this patch tested?
      
      Add new test case in `SQLViewSuite`.
      
      Author: jiangxingbo <jiangxb1987@gmail.com>
      
      Closes #17241 from jiangxb1987/view-depth.
      ee36bc1c
    • Liwei Lin's avatar
      [SPARK-19817][SS] Make it clear that `timeZone` is a general option in DataStreamReader/Writer · e1ac5534
      Liwei Lin authored
      ## What changes were proposed in this pull request?
      
      As timezone setting can also affect partition values, it works for all formats, we should make it clear.
      
      ## How was this patch tested?
      
      N/A
      
      Author: Liwei Lin <lwlin7@gmail.com>
      
      Closes #17299 from lw-lin/timezone.
      e1ac5534
  2. Mar 14, 2017
    • Xiao Li's avatar
      [SPARK-18112][SQL] Support reading data from Hive 2.1 metastore · f9a93b1b
      Xiao Li authored
      ### What changes were proposed in this pull request?
      This PR is to support reading data from Hive 2.1 metastore. Need to update shim class because of the Hive API changes caused by the following three Hive JIRAs:
      - [HIVE-12730 MetadataUpdater: provide a mechanism to edit the basic statistics of a table (or a partition)](https://issues.apache.org/jira/browse/HIVE-12730)
      - [Hive-13341 Stats state is not captured correctly: differentiate load table and create table](https://issues.apache.org/jira/browse/HIVE-13341)
      - [HIVE-13622 WriteSet tracking optimizations](https://issues.apache.org/jira/browse/HIVE-13622)
      
      There are three new fields added to Hive APIs.
      - `boolean hasFollowingStatsTask`. We always set it to `false`. This is to keep the existing behavior unchanged (starting from 0.13), no matter which Hive metastore client version users choose. If we set it to `true`, the basic table statistics is not collected by Hive. For example,
      
      ```SQL
      	CREATE TABLE tbl AS SELECT 1 AS a
      ```
      When setting `hasFollowingStatsTask ` to `false`, the table properties is like
      ```
      	Properties: [numFiles=1, transient_lastDdlTime=1489513927, totalSize=2]
      ```
      When setting `hasFollowingStatsTask ` to `true`, the table properties is like
      ```
      	Properties: [transient_lastDdlTime=1489513563]
      ```
      
      - `AcidUtils.Operation operation`. Obviously, we do not support ACID. Thus, we set it to `AcidUtils.Operation.NOT_ACID`.
      - `EnvironmentContext environmentContext`. So far, this is always set to `null`. This was introduced for supporting DDL `alter table s update statistics set ('numRows'='NaN')`. Using this DDL, users can specify the statistics. So far, our Spark SQL does not need it, because we use different table properties to store our generated statistics values. However, when Spark SQL issues ALTER TABLE DDL statements, Hive metastore always automatically invalidate the Hive-generated statistics.
      
      In the follow-up PR, we can fix it by explicitly adding a property to `environmentContext`.
      ```JAVA
      putToProperties(StatsSetupConst.STATS_GENERATED, StatsSetupConst.USER)
      ```
      Another alternative is to set `DO_NOT_UPDATE_STATS`to `TRUE`. See the Hive JIRA: https://issues.apache.org/jira/browse/HIVE-15653. We will not address it in this PR.
      
      ### How was this patch tested?
      Added test cases to VersionsSuite.scala
      
      Author: Xiao Li <gatorsmile@gmail.com>
      
      Closes #17232 from gatorsmile/Hive21.
      f9a93b1b
    • hyukjinkwon's avatar
      [SPARK-19828][R] Support array type in from_json in R · d1f6c64c
      hyukjinkwon authored
      ## What changes were proposed in this pull request?
      
      Since we could not directly define the array type in R, this PR proposes to support array types in R as string types that are used in `structField` as below:
      
      ```R
      jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"
      df <- as.DataFrame(list(list("people" = jsonArr)))
      collect(select(df, alias(from_json(df$people, "array<struct<name:string>>"), "arrcol")))
      ```
      
      prints
      
      ```R
            arrcol
      1 Bob, Alice
      ```
      
      ## How was this patch tested?
      
      Unit tests in `test_sparkSQL.R`.
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #17178 from HyukjinKwon/SPARK-19828.
      d1f6c64c
    • hyukjinkwon's avatar
      [SPARK-19918][SQL] Use TextFileFormat in implementation of TextInputJsonDataSource · 8fb2a02e
      hyukjinkwon authored
      ## What changes were proposed in this pull request?
      
      This PR proposes to use text datasource when Json schema inference.
      
      This basically proposes the similar approach in https://github.com/apache/spark/pull/15813 If we use Dataset for initial loading when inferring the schema, there are advantages. Please refer SPARK-18362
      
      It seems JSON one was supposed to be fixed together but taken out according to https://github.com/apache/spark/pull/15813
      
      > A similar problem also affects the JSON file format and this patch originally fixed that as well, but I've decided to split that change into a separate patch so as not to conflict with changes in another JSON PR.
      
      Also, this seems affecting some functionalities because it does not use `FileScanRDD`. This problem is described in SPARK-19885 (but it was CSV's case).
      
      ## How was this patch tested?
      
      Existing tests should cover this and manual test by `spark.read.json(path)` and check the UI.
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #17255 from HyukjinKwon/json-filescanrdd.
      8fb2a02e
    • Wenchen Fan's avatar
      [SPARK-19887][SQL] dynamic partition keys can be null or empty string · dacc382f
      Wenchen Fan authored
      ## What changes were proposed in this pull request?
      
      When dynamic partition value is null or empty string, we should write the data to a directory like `a=__HIVE_DEFAULT_PARTITION__`, when we read the data back, we should respect this special directory name and treat it as null.
      
      This is the same behavior of impala, see https://issues.apache.org/jira/browse/IMPALA-252
      
      ## How was this patch tested?
      
      new regression test
      
      Author: Wenchen Fan <wenchen@databricks.com>
      
      Closes #17277 from cloud-fan/partition.
      dacc382f
    • Takuya UESHIN's avatar
      [SPARK-19817][SQL] Make it clear that `timeZone` option is a general option in... · 7ded39c2
      Takuya UESHIN authored
      [SPARK-19817][SQL] Make it clear that `timeZone` option is a general option in DataFrameReader/Writer.
      
      ## What changes were proposed in this pull request?
      
      As timezone setting can also affect partition values, it works for all formats, we should make it clear.
      
      ## How was this patch tested?
      
      Existing tests.
      
      Author: Takuya UESHIN <ueshin@databricks.com>
      
      Closes #17281 from ueshin/issues/SPARK-19817.
      7ded39c2
    • Nattavut Sutyanyong's avatar
      [SPARK-18966][SQL] NOT IN subquery with correlated expressions may return incorrect result · 6eac9682
      Nattavut Sutyanyong authored
      ## What changes were proposed in this pull request?
      
      This PR fixes the following problem:
      ````
      Seq((1, 2)).toDF("a1", "a2").createOrReplaceTempView("a")
      Seq[(java.lang.Integer, java.lang.Integer)]((1, null)).toDF("b1", "b2").createOrReplaceTempView("b")
      
      // The expected result is 1 row of (1,2) as shown in the next statement.
      sql("select * from a where a1 not in (select b1 from b where b2 = a2)").show
      +---+---+
      | a1| a2|
      +---+---+
      +---+---+
      
      sql("select * from a where a1 not in (select b1 from b where b2 = 2)").show
      +---+---+
      | a1| a2|
      +---+---+
      |  1|  2|
      +---+---+
      ````
      There are a number of scenarios to consider:
      
      1. When the correlated predicate yields a match (i.e., B.B2 = A.A2)
      1.1. When the NOT IN expression yields a match (i.e., A.A1 = B.B1)
      1.2. When the NOT IN expression yields no match (i.e., A.A1 = B.B1 returns false)
      1.3. When A.A1 is null
      1.4. When B.B1 is null
      1.4.1. When A.A1 is not null
      1.4.2. When A.A1 is null
      
      2. When the correlated predicate yields no match (i.e.,B.B2 = A.A2 is false or unknown)
      2.1. When B.B2 is null and A.A2 is null
      2.2. When B.B2 is null and A.A2 is not null
      2.3. When the value of A.A2 does not match any of B.B2
      
      ````
       A.A1   A.A2      B.B1   B.B2
      -----  -----     -----  -----
          1      1         1      1    (1.1)
          2      1                     (1.2)
       null      1                     (1.3)
      
          1      3      null      3    (1.4.1)
       null      3                     (1.4.2)
      
          1   null         1   null    (2.1)
       null      2                     (2.2 & 2.3)
      ````
      
      We can divide the evaluation of the above correlated NOT IN subquery into 2 groups:-
      
      Group 1: The rows in A when there is a match from the correlated predicate (A.A1 = B.B1)
      
      In this case, the result of the subquery is not empty and the semantics of the NOT IN depends solely on the evaluation of the equality comparison of the columns of NOT IN, i.e., A1 = B1, which says
      
      - If A.A1 is null, the row is filtered (1.3 and 1.4.2)
      - If A.A1 = B.B1, the row is filtered (1.1)
      - If B.B1 is null, any rows of A in the same group (A.A2 = B.B2) is filtered (1.4.1 & 1.4.2)
      - Otherwise, the row is qualified.
      
      Hence, in this group, the result is the row from (1.2).
      
      Group 2: The rows in A when there is no match from the correlated predicate (A.A2 = B.B2)
      
      In this case, all the rows in A, including the rows where A.A1, are qualified because the subquery returns an empty set and by the semantics of the NOT IN, all rows from the parent side qualifies as the result set, that is, the rows from (2.1, 2.2 and 2.3).
      
      In conclusion, the correct result set of the above query is
      ````
       A.A1   A.A2
      -----  -----
          2      1    (1.2)
          1   null    (2.1)
       null      2    (2.2 & 2.3)
      ````
      ## How was this patch tested?
      unit tests, regression tests, and new test cases focusing on the problem being fixed.
      
      Author: Nattavut Sutyanyong <nsy.can@gmail.com>
      
      Closes #17294 from nsyca/18966.
      6eac9682
    • Herman van Hovell's avatar
      [SPARK-19933][SQL] Do not change output of a subquery · e04c05cf
      Herman van Hovell authored
      ## What changes were proposed in this pull request?
      The `RemoveRedundantAlias` rule can change the output attributes (the expression id's to be precise) of a query by eliminating the redundant alias producing them. This is no problem for a regular query, but can cause problems for correlated subqueries: The attributes produced by the subquery are used in the parent plan; changing them will break the parent plan.
      
      This PR fixes this by wrapping a subquery in a `Subquery` top level node when it gets optimized. The `RemoveRedundantAlias` rule now recognizes `Subquery` and makes sure that the output attributes of the `Subquery` node are retained.
      
      ## How was this patch tested?
      Added a test case to `RemoveRedundantAliasAndProjectSuite` and added a regression test to `SubquerySuite`.
      
      Author: Herman van Hovell <hvanhovell@databricks.com>
      
      Closes #17278 from hvanhovell/SPARK-19933.
      e04c05cf
    • Takeshi Yamamuro's avatar
      [SPARK-19923][SQL] Remove unnecessary type conversions per call in Hive · 6325a2f8
      Takeshi Yamamuro authored
      ## What changes were proposed in this pull request?
      This pr removed unnecessary type conversions per call in Hive: https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala#L116
      
      ## How was this patch tested?
      Existing tests
      
      Author: Takeshi Yamamuro <yamamuro@apache.org>
      
      Closes #17264 from maropu/SPARK-19923.
      6325a2f8
    • jiangxingbo's avatar
      [SPARK-18961][SQL] Support `SHOW TABLE EXTENDED ... PARTITION` statement · a02a0b17
      jiangxingbo authored
      ## What changes were proposed in this pull request?
      
      We should support the statement `SHOW TABLE EXTENDED LIKE 'table_identifier' PARTITION(partition_spec)`, just like that HIVE does.
      When partition is specified, the `SHOW TABLE EXTENDED` command should output the information of the partitions instead of the tables.
      Note that in this statement, we require exact matched partition spec. For example:
      ```
      CREATE TABLE show_t1(a String, b Int) PARTITIONED BY (c String, d String);
      ALTER TABLE show_t1 ADD PARTITION (c='Us', d=1) PARTITION (c='Us', d=22);
      
      -- Output the extended information of Partition(c='Us', d=1)
      SHOW TABLE EXTENDED LIKE 'show_t1' PARTITION(c='Us', d=1);
      -- Throw an AnalysisException
      SHOW TABLE EXTENDED LIKE 'show_t1' PARTITION(c='Us');
      ```
      
      ## How was this patch tested?
      Add new test sqls in file `show-tables.sql`.
      Add new test case in `DDLSuite`.
      
      Author: jiangxingbo <jiangxb1987@gmail.com>
      
      Closes #16373 from jiangxb1987/show-partition-extended.
      a02a0b17
    • Menglong TAN's avatar
      [SPARK-11569][ML] Fix StringIndexer to handle null value properly · 85941ecf
      Menglong TAN authored
      ## What changes were proposed in this pull request?
      
      This PR is to enhance StringIndexer with NULL values handling.
      
      Before the PR, StringIndexer will throw an exception when encounters NULL values.
      With this PR:
      - handleInvalid=error: Throw an exception as before
      - handleInvalid=skip: Skip null values as well as unseen labels
      - handleInvalid=keep: Give null values an additional index as well as unseen labels
      
      BTW, I noticed someone was trying to solve the same problem ( #9920 ) but seems getting no progress or response for a long time. Would you mind to give me a chance to solve it ? I'm eager to help. :-)
      
      ## How was this patch tested?
      
      new unit tests
      
      Author: Menglong TAN <tanmenglong@renrenche.com>
      Author: Menglong TAN <tanmenglong@gmail.com>
      
      Closes #17233 from crackcell/11569_StringIndexer_NULL.
      85941ecf
    • zero323's avatar
      [SPARK-19940][ML][MINOR] FPGrowthModel.transform should skip duplicated items · d4a637cd
      zero323 authored
      ## What changes were proposed in this pull request?
      
      This commit moved `distinct` in its intended place to avoid duplicated predictions and adds unit test covering the issue.
      
      ## How was this patch tested?
      
      Unit tests.
      
      Author: zero323 <zero323@users.noreply.github.com>
      
      Closes #17283 from zero323/SPARK-19940.
      d4a637cd
    • Asher Krim's avatar
      [SPARK-19922][ML] small speedups to findSynonyms · 5e96a57b
      Asher Krim authored
      Currently generating synonyms using a large model (I've tested with 3m words) is very slow. These efficiencies have sped things up for us by ~17%
      
      I wasn't sure if such small changes were worthy of a jira, but the guidelines seemed to suggest that that is the preferred approach
      
      ## What changes were proposed in this pull request?
      
      Address a few small issues in the findSynonyms logic:
      1) remove usage of ``Array.fill`` to zero out the ``cosineVec`` array. The default float value in Scala and Java is 0.0f, so explicitly setting the values to zero is not needed
      2) use Floats throughout. The conversion to Doubles before doing the ``priorityQueue`` is totally superfluous, since all the similarity computations are done using Floats anyway. Creating a second large array just serves to put extra strain on the GC
      3) convert the slow ``for(i <- cosVec.indices)`` to an ugly, but faster, ``while`` loop
      
      These efficiencies are really only apparent when working with a large model
      ## How was this patch tested?
      
      Existing unit tests + some in-house tests to time the difference
      
      cc jkbradley MLNick srowen
      
      Author: Asher Krim <krim.asher@gmail.com>
      Author: Asher Krim <krim.asher@gmail>
      
      Closes #17263 from Krimit/fasterFindSynonyms.
      5e96a57b
    • Herman van Hovell's avatar
      [SPARK-18874][SQL] Fix 2.10 build after moving the subquery rules to optimization · 1c7275ef
      Herman van Hovell authored
      ## What changes were proposed in this pull request?
      Commit https://github.com/apache/spark/commit/4ce970d71488c7de6025ef925f75b8b92a5a6a79 in accidentally broke the 2.10 build for Spark. This PR fixes this by simplifying the offending pattern match.
      
      ## How was this patch tested?
      Existing tests.
      
      Author: Herman van Hovell <hvanhovell@databricks.com>
      
      Closes #17288 from hvanhovell/SPARK-18874.
      1c7275ef
    • Herman van Hovell's avatar
      [SPARK-19850][SQL] Allow the use of aliases in SQL function calls · a0b92f73
      Herman van Hovell authored
      ## What changes were proposed in this pull request?
      We currently cannot use aliases in SQL function calls. This is inconvenient when you try to create a struct. This SQL query for example `select struct(1, 2) st`, will create a struct with column names `col1` and `col2`. This is even more problematic when we want to append a field to an existing struct. For example if we want to a field to struct `st` we would issue the following SQL query `select struct(st.*, 1) as st from src`, the result will be struct `st` with an a column with a non descriptive name `col3` (if `st` itself has 2 fields).
      
      This PR proposes to change this by allowing the use of aliased expression in function parameters. For example `select struct(1 as a, 2 as b) st`, will create a struct with columns `a` & `b`.
      
      ## How was this patch tested?
      Added a test to `ExpressionParserSuite` and added a test file for `SQLQueryTestSuite`.
      
      Author: Herman van Hovell <hvanhovell@databricks.com>
      
      Closes #17245 from hvanhovell/SPARK-19850.
      a0b92f73
    • Reynold Xin's avatar
      [SPARK-19944][SQL] Move SQLConf from sql/core to sql/catalyst · 0ee38a39
      Reynold Xin authored
      ## What changes were proposed in this pull request?
      This patch moves SQLConf from sql/core to sql/catalyst. To minimize the changes, the patch used type alias to still keep CatalystConf (as a type alias) and SimpleCatalystConf (as a concrete class that extends SQLConf).
      
      Motivation for the change is that it is pretty weird to have SQLConf only in sql/core and then we have to duplicate config options that impact optimizer/analyzer in sql/catalyst using CatalystConf.
      
      ## How was this patch tested?
      N/A
      
      Author: Reynold Xin <rxin@databricks.com>
      
      Closes #17285 from rxin/SPARK-19944.
      0ee38a39
    • Nattavut Sutyanyong's avatar
      [SPARK-18874][SQL] First phase: Deferring the correlated predicate pull up to Optimizer phase · 4ce970d7
      Nattavut Sutyanyong authored
      ## What changes were proposed in this pull request?
      Currently Analyzer as part of ResolveSubquery, pulls up the correlated predicates to its
      originating SubqueryExpression. The subquery plan is then transformed to remove the correlated
      predicates after they are moved up to the outer plan. In this PR, the task of pulling up
      correlated predicates is deferred to Optimizer. This is the initial work that will allow us to
      support the form of correlated subqueries that we don't support today. The design document
      from nsyca can be found in the following link :
      [DesignDoc](https://docs.google.com/document/d/1QDZ8JwU63RwGFS6KVF54Rjj9ZJyK33d49ZWbjFBaIgU/edit#)
      
      The brief description of code changes (hopefully to aid with code review) can be be found in the
      following link:
      [CodeChanges](https://docs.google.com/document/d/18mqjhL9V1An-tNta7aVE13HkALRZ5GZ24AATA-Vqqf0/edit#)
      
      ## How was this patch tested?
      The test case PRs were submitted earlier using.
      [16337](https://github.com/apache/spark/pull/16337) [16759](https://github.com/apache/spark/pull/16759) [16841](https://github.com/apache/spark/pull/16841) [16915](https://github.com/apache/spark/pull/16915) [16798](https://github.com/apache/spark/pull/16798) [16712](https://github.com/apache/spark/pull/16712) [16710](https://github.com/apache/spark/pull/16710) [16760](https://github.com/apache/spark/pull/16760) [16802](https://github.com/apache/spark/pull/16802)
      
      Author: Dilip Biswal <dbiswal@us.ibm.com>
      
      Closes #16954 from dilipbiswal/SPARK-18874.
      4ce970d7
    • actuaryzhang's avatar
      [SPARK-19391][SPARKR][ML] Tweedie GLM API for SparkR · f6314eab
      actuaryzhang authored
      ## What changes were proposed in this pull request?
      Port Tweedie GLM  #16344  to SparkR
      
      felixcheung yanboliang
      
      ## How was this patch tested?
      new test in SparkR
      
      Author: actuaryzhang <actuaryzhang10@gmail.com>
      
      Closes #16729 from actuaryzhang/sparkRTweedie.
      f6314eab
    • Xiao Li's avatar
      [SPARK-19921][SQL][TEST] Enable end-to-end testing using different Hive metastore versions. · 415f9f34
      Xiao Li authored
      ### What changes were proposed in this pull request?
      
      To improve the quality of our Spark SQL in different Hive metastore versions, this PR is to enable end-to-end testing using different versions. This PR allows the test cases in sql/hive to pass the existing Hive client to create a SparkSession.
      - Since Derby does not allow concurrent connections, the pre-built Hive clients use different database from the TestHive's built-in 1.2.1 client.
      - Since our test cases in sql/hive only can create a single Spark context in the same JVM, the newly created SparkSession share the same spark context with the existing TestHive's corresponding SparkSession.
      
      ### How was this patch tested?
      Fixed the existing test cases.
      
      Author: Xiao Li <gatorsmile@gmail.com>
      
      Closes #17260 from gatorsmile/versionSuite.
      415f9f34
  3. Mar 13, 2017
    • Xiao Li's avatar
      [SPARK-19924][SQL] Handle InvocationTargetException for all Hive Shim · 4dc3a817
      Xiao Li authored
      ### What changes were proposed in this pull request?
      Since we are using shim for most Hive metastore APIs, the exceptions thrown by the underlying method of Method.invoke() are wrapped by `InvocationTargetException`. Instead of doing it one by one, we should handle all of them in the `withClient`. If any of them is missing, the error message could looks unfriendly. For example, below is an example for dropping tables.
      
      ```
      Expected exception org.apache.spark.sql.AnalysisException to be thrown, but java.lang.reflect.InvocationTargetException was thrown.
      ScalaTestFailureLocation: org.apache.spark.sql.catalyst.catalog.ExternalCatalogSuite$$anonfun$14 at (ExternalCatalogSuite.scala:193)
      org.scalatest.exceptions.TestFailedException: Expected exception org.apache.spark.sql.AnalysisException to be thrown, but java.lang.reflect.InvocationTargetException was thrown.
      	at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:496)
      	at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
      	at org.scalatest.Assertions$class.intercept(Assertions.scala:1004)
      	at org.scalatest.FunSuite.intercept(FunSuite.scala:1555)
      	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogSuite$$anonfun$14.apply$mcV$sp(ExternalCatalogSuite.scala:193)
      	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogSuite$$anonfun$14.apply(ExternalCatalogSuite.scala:183)
      	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogSuite$$anonfun$14.apply(ExternalCatalogSuite.scala:183)
      	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
      	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
      	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
      	at org.scalatest.Transformer.apply(Transformer.scala:22)
      	at org.scalatest.Transformer.apply(Transformer.scala:20)
      	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
      	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:68)
      	at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
      	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
      	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
      	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
      	at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
      	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(ExternalCatalogSuite.scala:40)
      	at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255)
      	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogSuite.runTest(ExternalCatalogSuite.scala:40)
      	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
      	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
      	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
      	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
      	at scala.collection.immutable.List.foreach(List.scala:381)
      	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
      	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
      	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
      	at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
      	at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
      	at org.scalatest.Suite$class.run(Suite.scala:1424)
      	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
      	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
      	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
      	at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
      	at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
      	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:31)
      	at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
      	at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
      	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:31)
      	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
      	at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
      	at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
      	at scala.collection.immutable.List.foreach(List.scala:381)
      	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
      	at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
      	at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
      	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
      	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
      	at org.scalatest.tools.Runner$.run(Runner.scala:883)
      	at org.scalatest.tools.Runner.run(Runner.scala)
      	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
      	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
      Caused by: java.lang.reflect.InvocationTargetException
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.spark.sql.hive.client.Shim_v0_14.dropTable(HiveShim.scala:736)
      	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$dropTable$1.apply$mcV$sp(HiveClientImpl.scala:451)
      	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$dropTable$1.apply(HiveClientImpl.scala:451)
      	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$dropTable$1.apply(HiveClientImpl.scala:451)
      	at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:287)
      	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:228)
      	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:227)
      	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:270)
      	at org.apache.spark.sql.hive.client.HiveClientImpl.dropTable(HiveClientImpl.scala:450)
      	at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$dropTable$1.apply$mcV$sp(HiveExternalCatalog.scala:456)
      	at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$dropTable$1.apply(HiveExternalCatalog.scala:454)
      	at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$dropTable$1.apply(HiveExternalCatalog.scala:454)
      	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:94)
      	at org.apache.spark.sql.hive.HiveExternalCatalog.dropTable(HiveExternalCatalog.scala:454)
      	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogSuite$$anonfun$14$$anonfun$apply$mcV$sp$8.apply$mcV$sp(ExternalCatalogSuite.scala:194)
      	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogSuite$$anonfun$14$$anonfun$apply$mcV$sp$8.apply(ExternalCatalogSuite.scala:194)
      	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogSuite$$anonfun$14$$anonfun$apply$mcV$sp$8.apply(ExternalCatalogSuite.scala:194)
      	at org.scalatest.Assertions$class.intercept(Assertions.scala:997)
      	... 57 more
      Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: NoSuchObjectException(message:db2.unknown_table table not found)
      	at org.apache.hadoop.hive.ql.metadata.Hive.dropTable(Hive.java:1038)
      	... 79 more
      Caused by: NoSuchObjectException(message:db2.unknown_table table not found)
      	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table_core(HiveMetaStore.java:1808)
      	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1778)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107)
      	at com.sun.proxy.$Proxy10.get_table(Unknown Source)
      	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1208)
      	at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.getTable(SessionHiveMetaStoreClient.java:131)
      	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.dropTable(HiveMetaStoreClient.java:952)
      	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.dropTable(HiveMetaStoreClient.java:904)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:156)
      	at com.sun.proxy.$Proxy11.dropTable(Unknown Source)
      	at org.apache.hadoop.hive.ql.metadata.Hive.dropTable(Hive.java:1035)
      	... 79 more
      ```
      
      After unwrapping the exception, the message is like
      ```
      org.apache.hadoop.hive.ql.metadata.HiveException: NoSuchObjectException(message:db2.unknown_table table not found);
      org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: NoSuchObjectException(message:db2.unknown_table table not found);
      	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:100)
      	at org.apache.spark.sql.hive.HiveExternalCatalog.dropTable(HiveExternalCatalog.scala:460)
      	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogSuite$$anonfun$14.apply$mcV$sp(ExternalCatalogSuite.scala:193)
      	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogSuite$$anonfun$14.apply(ExternalCatalogSuite.scala:183)
      	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogSuite$$anonfun$14.apply(ExternalCatalogSuite.scala:183)
      	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
      ...
      ```
      
      ### How was this patch tested?
      Covered by the existing test case in `test("drop table when database/table does not exist")` in `ExternalCatalogSuite`.
      
      Author: Xiao Li <gatorsmile@gmail.com>
      
      Closes #17265 from gatorsmile/InvocationTargetException.
      4dc3a817
    • Joseph K. Bradley's avatar
      [MINOR][ML] Improve MLWriter overwrite error message · 72c66dbb
      Joseph K. Bradley authored
      ## What changes were proposed in this pull request?
      
      Give proper syntax for Java and Python in addition to Scala.
      
      ## How was this patch tested?
      
      Manually.
      
      Author: Joseph K. Bradley <joseph@databricks.com>
      
      Closes #17215 from jkbradley/write-err-msg.
      72c66dbb
    • Wenchen Fan's avatar
      [SPARK-19916][SQL] simplify bad file handling · 05887fc3
      Wenchen Fan authored
      ## What changes were proposed in this pull request?
      
      We should only have one centre place to try catch the exception for corrupted files.
      
      ## How was this patch tested?
      
      existing test
      
      Author: Wenchen Fan <wenchen@databricks.com>
      
      Closes #17253 from cloud-fan/bad-file.
      05887fc3
  4. Mar 12, 2017
    • Tejas Patil's avatar
      [SPARK-17495][SQL] Support date, timestamp and interval types in Hive hash · 94566885
      Tejas Patil authored
      ## What changes were proposed in this pull request?
      
      - Timestamp hashing is done as per [TimestampWritable.hashCode()](https://github.com/apache/hive/blob/ff67cdda1c538dc65087878eeba3e165cf3230f4/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java#L406) in Hive
      - Interval hashing is done as per [HiveIntervalDayTime.hashCode()](https://github.com/apache/hive/blob/ff67cdda1c538dc65087878eeba3e165cf3230f4/storage-api/src/java/org/apache/hadoop/hive/common/type/HiveIntervalDayTime.java#L178). Note that there are inherent differences in how Hive and Spark store intervals under the hood which limits the ability to be in completely sync with hive's hashing function. I have explained this in the method doc.
      - Date type was already supported. This PR adds test for that.
      
      ## How was this patch tested?
      
      Added unit tests
      
      Author: Tejas Patil <tejasp@fb.com>
      
      Closes #17062 from tejasapatil/SPARK-17495_time_related_types.
      94566885
    • uncleGen's avatar
      [SPARK-19853][SS] uppercase kafka topics fail when startingOffsets are SpecificOffsets · 0a4d06a7
      uncleGen authored
      When using the KafkaSource with Structured Streaming, consumer assignments are not what the user expects if startingOffsets is set to an explicit set of topics/partitions in JSON where the topic(s) happen to have uppercase characters. When StartingOffsets is constructed, the original string value from options is transformed toLowerCase to make matching on "earliest" and "latest" case insensitive. However, the toLowerCase JSON is passed to SpecificOffsets for the terminal condition, so topic names may not be what the user intended by the time assignments are made with the underlying KafkaConsumer.
      
      KafkaSourceProvider.scala:
      ```
      val startingOffsets = caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
          case Some("latest") => LatestOffsets
          case Some("earliest") => EarliestOffsets
          case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json))
          case None => LatestOffsets
        }
      ```
      
      Thank cbowden for reporting.
      
      Jenkins
      
      Author: uncleGen <hustyugm@gmail.com>
      
      Closes #17209 from uncleGen/SPARK-19853.
      0a4d06a7
    • Xin Ren's avatar
      [SPARK-19282][ML][SPARKR] RandomForest Wrapper and GBT Wrapper return param "maxDepth" to R models · 9f8ce482
      Xin Ren authored
      ## What changes were proposed in this pull request?
      
      RandomForest R Wrapper and GBT R Wrapper return param `maxDepth` to R models.
      
      Below 4 R wrappers are changed:
      * `RandomForestClassificationWrapper`
      * `RandomForestRegressionWrapper`
      * `GBTClassificationWrapper`
      * `GBTRegressionWrapper`
      
      ## How was this patch tested?
      
      Test manually on my local machine.
      
      Author: Xin Ren <iamshrek@126.com>
      
      Closes #17207 from keypointt/SPARK-19282.
      9f8ce482
    • xiaojian.fxj's avatar
      [SPARK-19831][CORE] Reuse the existing cleanupThreadExecutor to clean up the... · 2f5187bd
      xiaojian.fxj authored
      [SPARK-19831][CORE] Reuse the existing cleanupThreadExecutor to clean up the directories of finished applications to avoid the block
      
      Cleaning the application may cost much time at worker, then it will block that the worker send heartbeats master because the worker is extend ThreadSafeRpcEndpoint. If the heartbeat from a worker is blocked by the message ApplicationFinished, master will think the worker is dead. If the worker has a driver, the driver will be scheduled by master again.
      It had better reuse the existing cleanupThreadExecutor to clean up the directories of finished applications to avoid the block.
      
      Author: xiaojian.fxj <xiaojian.fxj@alibaba-inc.com>
      
      Closes #17189 from hustfxj/worker-hearbeat.
      2f5187bd
    • uncleGen's avatar
      [DOCS][SS] fix structured streaming python example · e29a74d5
      uncleGen authored
      ## What changes were proposed in this pull request?
      
      - SS python example: `TypeError: 'xxx' object is not callable`
      - some other doc issue.
      
      ## How was this patch tested?
      
      Jenkins.
      
      Author: uncleGen <hustyugm@gmail.com>
      
      Closes #17257 from uncleGen/docs-ss-python.
      e29a74d5
  5. Mar 10, 2017
    • windpiger's avatar
      [SPARK-19723][SQL] create datasource table with an non-existent location should work · f6fdf92d
      windpiger authored
      ## What changes were proposed in this pull request?
      
      This JIRA is a follow up work after [SPARK-19583](https://issues.apache.org/jira/browse/SPARK-19583)
      
      As we discussed in that [PR](https://github.com/apache/spark/pull/16938)
      
      The following DDL for datasource table with an non-existent location should work:
      ```
      CREATE TABLE ... (PARTITIONED BY ...) LOCATION path
      ```
      Currently it will throw exception that path not exists for datasource table for datasource table
      
      ## How was this patch tested?
      unit test added
      
      Author: windpiger <songjun@outlook.com>
      
      Closes #17055 from windpiger/CTDataSourcePathNotExists.
      f6fdf92d
    • Wenchen Fan's avatar
      [SPARK-19893][SQL] should not run DataFrame set oprations with map type · fb9beda5
      Wenchen Fan authored
      ## What changes were proposed in this pull request?
      
      In spark SQL, map type can't be used in equality test/comparison, and `Intersect`/`Except`/`Distinct` do need equality test for all columns, we should not allow map type in `Intersect`/`Except`/`Distinct`.
      
      ## How was this patch tested?
      
      new regression test
      
      Author: Wenchen Fan <wenchen@databricks.com>
      
      Closes #17236 from cloud-fan/map.
      fb9beda5
    • Cheng Lian's avatar
      [SPARK-19905][SQL] Bring back Dataset.inputFiles for Hive SerDe tables · ffee4f1c
      Cheng Lian authored
      ## What changes were proposed in this pull request?
      
      `Dataset.inputFiles` works by matching `FileRelation`s in the query plan. In Spark 2.1, Hive SerDe tables are represented by `MetastoreRelation`, which inherits from `FileRelation`. However, in Spark 2.2, Hive SerDe tables are now represented by `CatalogRelation`, which doesn't inherit from `FileRelation` anymore, due to the unification of Hive SerDe tables and data source tables. This change breaks `Dataset.inputFiles` for Hive SerDe tables.
      
      This PR tries to fix this issue by explicitly matching `CatalogRelation`s that are Hive SerDe tables in `Dataset.inputFiles`. Note that we can't make `CatalogRelation` inherit from `FileRelation` since not all `CatalogRelation`s are file based (e.g., JDBC data source tables).
      
      ## How was this patch tested?
      
      New test case added in `HiveDDLSuite`.
      
      Author: Cheng Lian <lian@databricks.com>
      
      Closes #17247 from liancheng/spark-19905-hive-table-input-files.
      ffee4f1c
    • Budde's avatar
      [SPARK-19611][SQL] Preserve metastore field order when merging inferred schema · bc303514
      Budde authored
      ## What changes were proposed in this pull request?
      
      The ```HiveMetastoreCatalog.mergeWithMetastoreSchema()``` method added in #16944 may
      not preserve the same field order as the metastore schema in some cases, which can cause
      queries to fail. This change ensures that the metastore field order is preserved.
      
      ## How was this patch tested?
      
      A test for ensuring that metastore order is preserved was added to ```HiveSchemaInferenceSuite.```
      The particular failure usecase from #16944 was tested manually as well.
      
      Author: Budde <budde@amazon.com>
      
      Closes #17249 from budde/PreserveMetastoreFieldOrder.
      bc303514
    • Yong Tang's avatar
      [SPARK-17979][SPARK-14453] Remove deprecated SPARK_YARN_USER_ENV and SPARK_JAVA_OPTS · 8f0490e2
      Yong Tang authored
      This fix removes deprecated support for config `SPARK_YARN_USER_ENV`, as is mentioned in SPARK-17979.
      This fix also removes deprecated support for the following:
      ```
      SPARK_YARN_USER_ENV
      SPARK_JAVA_OPTS
      SPARK_CLASSPATH
      SPARK_WORKER_INSTANCES
      ```
      
      Related JIRA:
      [SPARK-14453]: https://issues.apache.org/jira/browse/SPARK-14453
      [SPARK-12344]: https://issues.apache.org/jira/browse/SPARK-12344
      [SPARK-15781]: https://issues.apache.org/jira/browse/SPARK-15781
      
      Existing tests should pass.
      
      Author: Yong Tang <yong.tang.github@outlook.com>
      
      Closes #17212 from yongtang/SPARK-17979.
      8f0490e2
    • Carson Wang's avatar
      [SPARK-19620][SQL] Fix incorrect exchange coordinator id in the physical plan · dd9049e0
      Carson Wang authored
      ## What changes were proposed in this pull request?
      When adaptive execution is enabled, an exchange coordinator is used in the Exchange operators. For Join, the same exchange coordinator is used for its two Exchanges. But the physical plan shows two different coordinator Ids which is confusing.
      
      This PR is to fix the incorrect exchange coordinator id in the physical plan. The coordinator object instead of the `Option[ExchangeCoordinator]` should be used to generate the identity hash code of the same coordinator.
      
      ## How was this patch tested?
      Before the patch, the physical plan shows two different exchange coordinator id for Join.
      ```
      == Physical Plan ==
      *Project [key1#3L, value2#12L]
      +- *SortMergeJoin [key1#3L], [key2#11L], Inner
         :- *Sort [key1#3L ASC NULLS FIRST], false, 0
         :  +- Exchange(coordinator id: 1804587700) hashpartitioning(key1#3L, 10), coordinator[target post-shuffle partition size: 67108864]
         :     +- *Project [(id#0L % 500) AS key1#3L]
         :        +- *Filter isnotnull((id#0L % 500))
         :           +- *Range (0, 1000, step=1, splits=Some(10))
         +- *Sort [key2#11L ASC NULLS FIRST], false, 0
            +- Exchange(coordinator id: 793927319) hashpartitioning(key2#11L, 10), coordinator[target post-shuffle partition size: 67108864]
               +- *Project [(id#8L % 500) AS key2#11L, id#8L AS value2#12L]
                  +- *Filter isnotnull((id#8L % 500))
                     +- *Range (0, 1000, step=1, splits=Some(10))
      ```
      After the patch, two exchange coordinator id are the same.
      
      Author: Carson Wang <carson.wang@intel.com>
      
      Closes #16952 from carsonwang/FixCoordinatorId.
      dd9049e0
    • Kazuaki Ishizaki's avatar
      [SPARK-19786][SQL] Facilitate loop optimizations in a JIT compiler regarding range() · fcb68e0f
      Kazuaki Ishizaki authored
      ## What changes were proposed in this pull request?
      
      This PR improves performance of operations with `range()` by changing Java code generated by Catalyst. This PR is inspired by the [blog article](https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html).
      
      This PR changes generated code in the following two points.
      1. Replace a while-loop with long instance variables a for-loop with int local varibles
      2. Suppress generation of `shouldStop()` method if this method is unnecessary (e.g. `append()` is not generated).
      
      These points facilitates compiler optimizations in a JIT compiler by feeding the simplified Java code into the JIT compiler. The performance is improved by 7.6x.
      
      Benchmark program:
      ```java
      val N = 1 << 29
      val iters = 2
      val benchmark = new Benchmark("range.count", N * iters)
      benchmark.addCase(s"with this PR") { i =>
        var n = 0
        var len = 0
        while (n < iters) {
          len += sparkSession.range(N).selectExpr("count(id)").collect.length
          n += 1
        }
      }
      benchmark.run
      ```
      
      Performance result without this PR
      ```
      OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
      Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
      range.count:                             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
      ------------------------------------------------------------------------------------------------
      w/o this PR                                   1349 / 1356        796.2           1.3       1.0X
      ```
      
      Performance result with this PR
      ```
      OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
      Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
      range.count:                             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
      ------------------------------------------------------------------------------------------------
      with this PR                                   177 /  271       6065.3           0.2       1.0X
      ```
      
      Here is a comparison between generated code w/o and with this PR. Only the method ```agg_doAggregateWithoutKey``` is changed.
      
      Generated code without this PR
      ```java
      
      /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
      /* 006 */   private Object[] references;
      /* 007 */   private scala.collection.Iterator[] inputs;
      /* 008 */   private boolean agg_initAgg;
      /* 009 */   private boolean agg_bufIsNull;
      /* 010 */   private long agg_bufValue;
      /* 011 */   private org.apache.spark.sql.execution.metric.SQLMetric range_numOutputRows;
      /* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric range_numGeneratedRows;
      /* 013 */   private boolean range_initRange;
      /* 014 */   private long range_number;
      /* 015 */   private TaskContext range_taskContext;
      /* 016 */   private InputMetrics range_inputMetrics;
      /* 017 */   private long range_batchEnd;
      /* 018 */   private long range_numElementsTodo;
      /* 019 */   private scala.collection.Iterator range_input;
      /* 020 */   private UnsafeRow range_result;
      /* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder;
      /* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter;
      /* 023 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows;
      /* 024 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
      /* 025 */   private UnsafeRow agg_result;
      /* 026 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
      /* 027 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
      /* 028 */
      /* 029 */   public GeneratedIterator(Object[] references) {
      /* 030 */     this.references = references;
      /* 031 */   }
      /* 032 */
      /* 033 */   public void init(int index, scala.collection.Iterator[] inputs) {
      /* 034 */     partitionIndex = index;
      /* 035 */     this.inputs = inputs;
      /* 036 */     agg_initAgg = false;
      /* 037 */
      /* 038 */     this.range_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
      /* 039 */     this.range_numGeneratedRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
      /* 040 */     range_initRange = false;
      /* 041 */     range_number = 0L;
      /* 042 */     range_taskContext = TaskContext.get();
      /* 043 */     range_inputMetrics = range_taskContext.taskMetrics().inputMetrics();
      /* 044 */     range_batchEnd = 0;
      /* 045 */     range_numElementsTodo = 0L;
      /* 046 */     range_input = inputs[0];
      /* 047 */     range_result = new UnsafeRow(1);
      /* 048 */     this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0);
      /* 049 */     this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1);
      /* 050 */     this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
      /* 051 */     this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
      /* 052 */     agg_result = new UnsafeRow(1);
      /* 053 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
      /* 054 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
      /* 055 */
      /* 056 */   }
      /* 057 */
      /* 058 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
      /* 059 */     // initialize aggregation buffer
      /* 060 */     agg_bufIsNull = false;
      /* 061 */     agg_bufValue = 0L;
      /* 062 */
      /* 063 */     // initialize Range
      /* 064 */     if (!range_initRange) {
      /* 065 */       range_initRange = true;
      /* 066 */       initRange(partitionIndex);
      /* 067 */     }
      /* 068 */
      /* 069 */     while (true) {
      /* 070 */       while (range_number != range_batchEnd) {
      /* 071 */         long range_value = range_number;
      /* 072 */         range_number += 1L;
      /* 073 */
      /* 074 */         // do aggregate
      /* 075 */         // common sub-expressions
      /* 076 */
      /* 077 */         // evaluate aggregate function
      /* 078 */         boolean agg_isNull1 = false;
      /* 079 */
      /* 080 */         long agg_value1 = -1L;
      /* 081 */         agg_value1 = agg_bufValue + 1L;
      /* 082 */         // update aggregation buffer
      /* 083 */         agg_bufIsNull = false;
      /* 084 */         agg_bufValue = agg_value1;
      /* 085 */
      /* 086 */         if (shouldStop()) return;
      /* 087 */       }
      /* 088 */
      /* 089 */       if (range_taskContext.isInterrupted()) {
      /* 090 */         throw new TaskKilledException();
      /* 091 */       }
      /* 092 */
      /* 093 */       long range_nextBatchTodo;
      /* 094 */       if (range_numElementsTodo > 1000L) {
      /* 095 */         range_nextBatchTodo = 1000L;
      /* 096 */         range_numElementsTodo -= 1000L;
      /* 097 */       } else {
      /* 098 */         range_nextBatchTodo = range_numElementsTodo;
      /* 099 */         range_numElementsTodo = 0;
      /* 100 */         if (range_nextBatchTodo == 0) break;
      /* 101 */       }
      /* 102 */       range_numOutputRows.add(range_nextBatchTodo);
      /* 103 */       range_inputMetrics.incRecordsRead(range_nextBatchTodo);
      /* 104 */
      /* 105 */       range_batchEnd += range_nextBatchTodo * 1L;
      /* 106 */     }
      /* 107 */
      /* 108 */   }
      /* 109 */
      /* 110 */   private void initRange(int idx) {
      /* 111 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
      /* 112 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L);
      /* 113 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(10000L);
      /* 114 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
      /* 115 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
      /* 117 */
      /* 118 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
      /* 119 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
      /* 120 */       range_number = Long.MAX_VALUE;
      /* 121 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
      /* 122 */       range_number = Long.MIN_VALUE;
      /* 123 */     } else {
      /* 124 */       range_number = st.longValue();
      /* 125 */     }
      /* 126 */     range_batchEnd = range_number;
      /* 127 */
      /* 128 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
      /* 129 */     .multiply(step).add(start);
      /* 130 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
      /* 131 */       partitionEnd = Long.MAX_VALUE;
      /* 132 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
      /* 133 */       partitionEnd = Long.MIN_VALUE;
      /* 134 */     } else {
      /* 135 */       partitionEnd = end.longValue();
      /* 136 */     }
      /* 137 */
      /* 138 */     java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
      /* 139 */       java.math.BigInteger.valueOf(range_number));
      /* 140 */     range_numElementsTodo  = startToEnd.divide(step).longValue();
      /* 141 */     if (range_numElementsTodo < 0) {
      /* 142 */       range_numElementsTodo = 0;
      /* 143 */     } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
      /* 144 */       range_numElementsTodo++;
      /* 145 */     }
      /* 146 */   }
      /* 147 */
      /* 148 */   protected void processNext() throws java.io.IOException {
      /* 149 */     while (!agg_initAgg) {
      /* 150 */       agg_initAgg = true;
      /* 151 */       long agg_beforeAgg = System.nanoTime();
      /* 152 */       agg_doAggregateWithoutKey();
      /* 153 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
      /* 154 */
      /* 155 */       // output the result
      /* 156 */
      /* 157 */       agg_numOutputRows.add(1);
      /* 158 */       agg_rowWriter.zeroOutNullBytes();
      /* 159 */
      /* 160 */       if (agg_bufIsNull) {
      /* 161 */         agg_rowWriter.setNullAt(0);
      /* 162 */       } else {
      /* 163 */         agg_rowWriter.write(0, agg_bufValue);
      /* 164 */       }
      /* 165 */       append(agg_result);
      /* 166 */     }
      /* 167 */   }
      /* 168 */ }
      ```
      
      Generated code with this PR
      ```java
      /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
      /* 006 */   private Object[] references;
      /* 007 */   private scala.collection.Iterator[] inputs;
      /* 008 */   private boolean agg_initAgg;
      /* 009 */   private boolean agg_bufIsNull;
      /* 010 */   private long agg_bufValue;
      /* 011 */   private org.apache.spark.sql.execution.metric.SQLMetric range_numOutputRows;
      /* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric range_numGeneratedRows;
      /* 013 */   private boolean range_initRange;
      /* 014 */   private long range_number;
      /* 015 */   private TaskContext range_taskContext;
      /* 016 */   private InputMetrics range_inputMetrics;
      /* 017 */   private long range_batchEnd;
      /* 018 */   private long range_numElementsTodo;
      /* 019 */   private scala.collection.Iterator range_input;
      /* 020 */   private UnsafeRow range_result;
      /* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder;
      /* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter;
      /* 023 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows;
      /* 024 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
      /* 025 */   private UnsafeRow agg_result;
      /* 026 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
      /* 027 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
      /* 028 */
      /* 029 */   public GeneratedIterator(Object[] references) {
      /* 030 */     this.references = references;
      /* 031 */   }
      /* 032 */
      /* 033 */   public void init(int index, scala.collection.Iterator[] inputs) {
      /* 034 */     partitionIndex = index;
      /* 035 */     this.inputs = inputs;
      /* 036 */     agg_initAgg = false;
      /* 037 */
      /* 038 */     this.range_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
      /* 039 */     this.range_numGeneratedRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
      /* 040 */     range_initRange = false;
      /* 041 */     range_number = 0L;
      /* 042 */     range_taskContext = TaskContext.get();
      /* 043 */     range_inputMetrics = range_taskContext.taskMetrics().inputMetrics();
      /* 044 */     range_batchEnd = 0;
      /* 045 */     range_numElementsTodo = 0L;
      /* 046 */     range_input = inputs[0];
      /* 047 */     range_result = new UnsafeRow(1);
      /* 048 */     this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0);
      /* 049 */     this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1);
      /* 050 */     this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
      /* 051 */     this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
      /* 052 */     agg_result = new UnsafeRow(1);
      /* 053 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
      /* 054 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
      /* 055 */
      /* 056 */   }
      /* 057 */
      /* 058 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
      /* 059 */     // initialize aggregation buffer
      /* 060 */     agg_bufIsNull = false;
      /* 061 */     agg_bufValue = 0L;
      /* 062 */
      /* 063 */     // initialize Range
      /* 064 */     if (!range_initRange) {
      /* 065 */       range_initRange = true;
      /* 066 */       initRange(partitionIndex);
      /* 067 */     }
      /* 068 */
      /* 069 */     while (true) {
      /* 070 */       long range_range = range_batchEnd - range_number;
      /* 071 */       if (range_range != 0L) {
      /* 072 */         int range_localEnd = (int)(range_range / 1L);
      /* 073 */         for (int range_localIdx = 0; range_localIdx < range_localEnd; range_localIdx++) {
      /* 074 */           long range_value = ((long)range_localIdx * 1L) + range_number;
      /* 075 */
      /* 076 */           // do aggregate
      /* 077 */           // common sub-expressions
      /* 078 */
      /* 079 */           // evaluate aggregate function
      /* 080 */           boolean agg_isNull1 = false;
      /* 081 */
      /* 082 */           long agg_value1 = -1L;
      /* 083 */           agg_value1 = agg_bufValue + 1L;
      /* 084 */           // update aggregation buffer
      /* 085 */           agg_bufIsNull = false;
      /* 086 */           agg_bufValue = agg_value1;
      /* 087 */
      /* 088 */           // shouldStop check is eliminated
      /* 089 */         }
      /* 090 */         range_number = range_batchEnd;
      /* 091 */       }
      /* 092 */
      /* 093 */       if (range_taskContext.isInterrupted()) {
      /* 094 */         throw new TaskKilledException();
      /* 095 */       }
      /* 096 */
      /* 097 */       long range_nextBatchTodo;
      /* 098 */       if (range_numElementsTodo > 1000L) {
      /* 099 */         range_nextBatchTodo = 1000L;
      /* 100 */         range_numElementsTodo -= 1000L;
      /* 101 */       } else {
      /* 102 */         range_nextBatchTodo = range_numElementsTodo;
      /* 103 */         range_numElementsTodo = 0;
      /* 104 */         if (range_nextBatchTodo == 0) break;
      /* 105 */       }
      /* 106 */       range_numOutputRows.add(range_nextBatchTodo);
      /* 107 */       range_inputMetrics.incRecordsRead(range_nextBatchTodo);
      /* 108 */
      /* 109 */       range_batchEnd += range_nextBatchTodo * 1L;
      /* 110 */     }
      /* 111 */
      /* 112 */   }
      /* 113 */
      /* 114 */   private void initRange(int idx) {
      /* 115 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
      /* 116 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L);
      /* 117 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(10000L);
      /* 118 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
      /* 119 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
      /* 120 */     long partitionEnd;
      /* 121 */
      /* 122 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
      /* 123 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
      /* 124 */       range_number = Long.MAX_VALUE;
      /* 125 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
      /* 126 */       range_number = Long.MIN_VALUE;
      /* 127 */     } else {
      /* 128 */       range_number = st.longValue();
      /* 129 */     }
      /* 130 */     range_batchEnd = range_number;
      /* 131 */
      /* 132 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
      /* 133 */     .multiply(step).add(start);
      /* 134 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
      /* 135 */       partitionEnd = Long.MAX_VALUE;
      /* 136 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
      /* 137 */       partitionEnd = Long.MIN_VALUE;
      /* 138 */     } else {
      /* 139 */       partitionEnd = end.longValue();
      /* 140 */     }
      /* 141 */
      /* 142 */     java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
      /* 143 */       java.math.BigInteger.valueOf(range_number));
      /* 144 */     range_numElementsTodo  = startToEnd.divide(step).longValue();
      /* 145 */     if (range_numElementsTodo < 0) {
      /* 146 */       range_numElementsTodo = 0;
      /* 147 */     } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
      /* 148 */       range_numElementsTodo++;
      /* 149 */     }
      /* 150 */   }
      /* 151 */
      /* 152 */   protected void processNext() throws java.io.IOException {
      /* 153 */     while (!agg_initAgg) {
      /* 154 */       agg_initAgg = true;
      /* 155 */       long agg_beforeAgg = System.nanoTime();
      /* 156 */       agg_doAggregateWithoutKey();
      /* 157 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
      /* 158 */
      /* 159 */       // output the result
      /* 160 */
      /* 161 */       agg_numOutputRows.add(1);
      /* 162 */       agg_rowWriter.zeroOutNullBytes();
      /* 163 */
      /* 164 */       if (agg_bufIsNull) {
      /* 165 */         agg_rowWriter.setNullAt(0);
      /* 166 */       } else {
      /* 167 */         agg_rowWriter.write(0, agg_bufValue);
      /* 168 */       }
      /* 169 */       append(agg_result);
      /* 170 */     }
      /* 171 */   }
      /* 172 */ }
      ```
      
      A part of suppressing `shouldStop()` was originally developed by inouehrs
      
      ## How was this patch tested?
      
      Add new tests into `DataFrameRangeSuite`
      
      Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
      
      Closes #17122 from kiszk/SPARK-19786.
      fcb68e0f
    • Tyson Condie's avatar
      [SPARK-19891][SS] Await Batch Lock notified on stream execution exit · 501b7111
      Tyson Condie authored
      ## What changes were proposed in this pull request?
      
      We need to notify the await batch lock when the stream exits early e.g., when an exception has been thrown.
      
      ## How was this patch tested?
      
      Current tests that throw exceptions at runtime will finish faster as a result of this update.
      
      zsxwing
      
      Please review http://spark.apache.org/contributing.html before opening a pull request.
      
      Author: Tyson Condie <tcondie@gmail.com>
      
      Closes #17231 from tcondie/kafka-writer.
      501b7111
Loading