Skip to content
Snippets Groups Projects
  1. Dec 20, 2016
    • Liang-Chi Hsieh's avatar
      [SPARK-18281] [SQL] [PYSPARK] Remove timeout for reading data through socket for local iterator · cd297c39
      Liang-Chi Hsieh authored
      ## What changes were proposed in this pull request?
      
      There is a timeout failure when using `rdd.toLocalIterator()` or `df.toLocalIterator()` for a PySpark RDD and DataFrame:
      
          df = spark.createDataFrame([[1],[2],[3]])
          it = df.toLocalIterator()
          row = next(it)
      
          df2 = df.repartition(1000)  # create many empty partitions which increase materialization time so causing timeout
          it2 = df2.toLocalIterator()
          row = next(it2)
      
      The cause of this issue is, we open a socket to serve the data from JVM side. We set timeout for connection and reading through the socket in Python side. In Python we use a generator to read the data, so we only begin to connect the socket once we start to ask data from it. If we don't consume it immediately, there is connection timeout.
      
      In the other side, the materialization time for RDD partitions is unpredictable. So we can't set a timeout for reading data through the socket. Otherwise, it is very possibly to fail.
      
      ## How was this patch tested?
      
      Added tests into PySpark.
      
      Please review http://spark.apache.org/contributing.html
      
       before opening a pull request.
      
      Author: Liang-Chi Hsieh <viirya@gmail.com>
      
      Closes #16263 from viirya/fix-pyspark-localiterator.
      
      (cherry picked from commit 95c95b71)
      Signed-off-by: default avatarDavies Liu <davies.liu@gmail.com>
      cd297c39
    • Josh Rosen's avatar
      [SPARK-18761][CORE] Introduce "task reaper" to oversee task killing in executors · 2971ae56
      Josh Rosen authored
      ## What changes were proposed in this pull request?
      
      Spark's current task cancellation / task killing mechanism is "best effort" because some tasks may not be interruptible or may not respond to their "killed" flags being set. If a significant fraction of a cluster's task slots are occupied by tasks that have been marked as killed but remain running then this can lead to a situation where new jobs and tasks are starved of resources that are being used by these zombie tasks.
      
      This patch aims to address this problem by adding a "task reaper" mechanism to executors. At a high-level, task killing now launches a new thread which attempts to kill the task and then watches the task and periodically checks whether it has been killed. The TaskReaper will periodically re-attempt to call `TaskRunner.kill()` and will log warnings if the task keeps running. I modified TaskRunner to rename its thread at the start of the task, allowing TaskReaper to take a thread dump and filter it in order to log stacktraces from the exact task thread that we are waiting to finish. If the task has not stopped after a configurable timeout then the TaskReaper will throw an exception to trigger executor JVM death, thereby forcibly freeing any resources consumed by the zombie tasks.
      
      This feature is flagged off by default and is controlled by four new configurations under the `spark.task.reaper.*` namespace. See the updated `configuration.md` doc for details.
      
      ## How was this patch tested?
      
      Tested via a new test case in `JobCancellationSuite`, plus manual testing.
      
      Author: Josh Rosen <joshrosen@databricks.com>
      
      Closes #16189 from JoshRosen/cancellation.
      2971ae56
  2. Dec 19, 2016
    • Josh Rosen's avatar
      [SPARK-18928] Check TaskContext.isInterrupted() in FileScanRDD, JDBCRDD & UnsafeSorter · f07e989c
      Josh Rosen authored
      
      ## What changes were proposed in this pull request?
      
      In order to respond to task cancellation, Spark tasks must periodically check `TaskContext.isInterrupted()`, but this check is missing on a few critical read paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in #16189).
      
      This patch aims to fix this problem by adding `TaskContext.isInterrupted()` checks to these paths. Note that I could have used `InterruptibleIterator` to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic into existing iterator subclasses.
      
      ## How was this patch tested?
      
      Tested manually in `spark-shell` with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here.
      
      Author: Josh Rosen <joshrosen@databricks.com>
      
      Closes #16340 from JoshRosen/sql-task-interruption.
      
      (cherry picked from commit 5857b9ac)
      Signed-off-by: default avatarHerman van Hovell <hvanhovell@databricks.com>
      f07e989c
    • Wenchen Fan's avatar
      [SPARK-18921][SQL] check database existence with Hive.databaseExists instead of getDatabase · c1a26b45
      Wenchen Fan authored
      
      ## What changes were proposed in this pull request?
      
      It's weird that we use `Hive.getDatabase` to check the existence of a database, while Hive has a `databaseExists` interface.
      
      What's worse, `Hive.getDatabase` will produce an error message if the database doesn't exist, which is annoying when we only want to check the database existence.
      
      This PR fixes this and use `Hive.databaseExists` to check database existence.
      
      ## How was this patch tested?
      
      N/A
      
      Author: Wenchen Fan <wenchen@databricks.com>
      
      Closes #16332 from cloud-fan/minor.
      
      (cherry picked from commit 7a75ee1c)
      Signed-off-by: default avatarYin Huai <yhuai@databricks.com>
      c1a26b45
    • xuanyuanking's avatar
      [SPARK-18700][SQL] Add StripedLock for each table's relation in cache · fc1b2566
      xuanyuanking authored
      ## What changes were proposed in this pull request?
      
      As the scenario describe in [SPARK-18700](https://issues.apache.org/jira/browse/SPARK-18700
      
      ), when cachedDataSourceTables invalided, the coming few queries will fetch all FileStatus in listLeafFiles function. In the condition of table has many partitions, these jobs will occupy much memory of driver finally may cause driver OOM.
      
      In this patch, add StripedLock for each table's relation in cache not for the whole cachedDataSourceTables, each table's load cache operation protected by it.
      
      ## How was this patch tested?
      
      Add a multi-thread access table test in `PartitionedTablePerfStatsSuite` and check it only loading once using metrics in `HiveCatalogMetrics`
      
      Author: xuanyuanking <xyliyuanjian@gmail.com>
      
      Closes #16135 from xuanyuanking/SPARK-18700.
      
      (cherry picked from commit 24482858)
      Signed-off-by: default avatarHerman van Hovell <hvanhovell@databricks.com>
      fc1b2566
  3. Dec 18, 2016
    • gatorsmile's avatar
      [SPARK-18703][SPARK-18675][SQL][BACKPORT-2.1] CTAS for hive serde table should... · 3080f995
      gatorsmile authored
      [SPARK-18703][SPARK-18675][SQL][BACKPORT-2.1] CTAS for hive serde table should work for all hive versions AND Drop Staging Directories and Data Files
      
      ### What changes were proposed in this pull request?
      
      This PR is to backport https://github.com/apache/spark/pull/16104 and https://github.com/apache/spark/pull/16134.
      
      ----------
      [[SPARK-18675][SQL] CTAS for hive serde table should work for all hive versions](https://github.com/apache/spark/pull/16104)
      
      Before hive 1.1, when inserting into a table, hive will create the staging directory under a common scratch directory. After the writing is finished, hive will simply empty the table directory and move the staging directory to it.
      
      After hive 1.1, hive will create the staging directory under the table directory, and when moving staging directory to table directory, hive will still empty the table directory, but will exclude the staging directory there.
      
      In `InsertIntoHiveTable`, we simply copy the code from hive 1.2, which means we will always create the staging directory under the table directory, no matter what the hive version is. This causes problems if the hive version is prior to 1.1, because the staging directory will be removed by hive when hive is trying to empty the table directory.
      
      This PR copies the code from hive 0.13, so that we have 2 branches to create staging directory. If hive version is prior to 1.1, we'll go to the old style branch(i.e. create the staging directory under a common scratch directory), else, go to the new style branch(i.e. create the staging directory under the table directory)
      
      ----------
      [[SPARK-18703] [SQL] Drop Staging Directories and Data Files After each Insertion/CTAS of Hive serde Tables](https://github.com/apache/spark/pull/16134)
      
      Below are the files/directories generated for three inserts againsts a Hive table:
      ```
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/._SUCCESS.crc
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/.part-00000.crc
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/_SUCCESS
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/part-00000
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/._SUCCESS.crc
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/.part-00000.crc
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/_SUCCESS
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/part-00000
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/._SUCCESS.crc
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/.part-00000.crc
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/_SUCCESS
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/part-00000
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.part-00000.crc
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/part-00000
      ```
      
      The first 18 files are temporary. We do not drop it until the end of JVM termination. If JVM does not appropriately terminate, these temporary files/directories will not be dropped.
      
      Only the last two files are needed, as shown below.
      ```
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.part-00000.crc
      /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/part-00000
      ```
      The temporary files/directories could accumulate a lot when we issue many inserts, since each insert generats at least six files. This could eat a lot of spaces and slow down the JVM termination. When the JVM does not terminates approprately, the files might not be dropped.
      
      This PR is to drop the created staging files and temporary data files after each insert/CTAS.
      
      ### How was this patch tested?
      Added test cases.
      
      Author: gatorsmile <gatorsmile@gmail.com>
      
      Closes #16325 from gatorsmile/backport-18703&18675.
      3080f995
    • Yuming Wang's avatar
      [SPARK-18827][CORE] Fix cannot read broadcast on disk · a5da8db8
      Yuming Wang authored
      ## What changes were proposed in this pull request?
      `NoSuchElementException` will throw since https://github.com/apache/spark/pull/15056
      
       if a broadcast cannot cache in memory. The reason is that that change cannot cover `!unrolled.hasNext` in `next()` function.
      
      This change is to cover the `!unrolled.hasNext` and check `hasNext` before calling `next` in `blockManager.getLocalValues` to make it  more robust.
      
      We can cache and read broadcast even it cannot fit in memory from this pull request.
      
      Exception log:
      ```
      16/12/10 10:10:04 INFO UnifiedMemoryManager: Will not store broadcast_131 as the required space (1048576 bytes) exceeds our memory limit (122764 bytes)
      16/12/10 10:10:04 WARN MemoryStore: Failed to reserve initial memory threshold of 1024.0 KB for computing block broadcast_131 in memory.
      16/12/10 10:10:04 WARN MemoryStore: Not enough space to cache broadcast_131 in memory! (computed 384.0 B so far)
      16/12/10 10:10:04 INFO MemoryStore: Memory use = 95.6 KB (blocks) + 0.0 B (scratch space shared across 0 tasks(s)) = 95.6 KB. Storage limit = 119.9 KB.
      16/12/10 10:10:04 ERROR Utils: Exception encountered
      java.util.NoSuchElementException
      	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
      	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
      	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
      	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
      	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
      	at scala.Option.map(Option.scala:146)
      	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
      	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
      	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
      	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
      	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
      	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
      	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
      	at org.apache.spark.scheduler.Task.run(Task.scala:108)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      16/12/10 10:10:04 ERROR Executor: Exception in task 1.0 in stage 86.0 (TID 134423)
      java.io.IOException: java.util.NoSuchElementException
      	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
      	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
      	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
      	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
      	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
      	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:86)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
      	at org.apache.spark.scheduler.Task.run(Task.scala:108)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.util.NoSuchElementException
      	at org.apache.spark.util.collection.PrimitiveVector$$anon$1.next(PrimitiveVector.scala:58)
      	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:700)
      	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
      	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
      	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$2.apply(TorrentBroadcast.scala:210)
      	at scala.Option.map(Option.scala:146)
      	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)
      	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
      	... 12 more
      ```
      
      ## How was this patch tested?
      
      Add unit test
      
      Author: Yuming Wang <wgyumg@gmail.com>
      
      Closes #16252 from wangyum/SPARK-18827.
      
      (cherry picked from commit 1e5c51f3)
      Signed-off-by: default avatarSean Owen <sowen@cloudera.com>
      Unverified
      a5da8db8
    • gatorsmile's avatar
      [SPARK-18918][DOC] Missing </td> in Configuration page · 4b8a643f
      gatorsmile authored
      ### What changes were proposed in this pull request?
      The configuration page looks messy now, as shown in the nightly build:
      https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/configuration.html
      
      Starting from the following location:
      
      ![screenshot 2016-12-18 00 26 33](https://cloud.githubusercontent.com/assets/11567269/21292396/ace4719c-c4b8-11e6-8dfd-d9ab95be43d5.png)
      
      ### How was this patch tested?
      Attached is the screenshot generated in my local computer after the fix.
      [Configuration - Spark 2.2.0 Documentation.pdf](https://github.com/apache/spark/files/659315/Configuration.-.Spark.2.2.0.Documentation.pdf
      
      )
      
      Author: gatorsmile <gatorsmile@gmail.com>
      
      Closes #16327 from gatorsmile/docFix.
      
      (cherry picked from commit c0c9e1d2)
      Signed-off-by: default avatarSean Owen <sowen@cloudera.com>
      Unverified
      4b8a643f
  4. Dec 17, 2016
  5. Dec 16, 2016
    • Shixiong Zhu's avatar
      [SPARK-18904][SS][TESTS] Merge two FileStreamSourceSuite files · d2a131a8
      Shixiong Zhu authored
      
      ## What changes were proposed in this pull request?
      
      Merge two FileStreamSourceSuite files into one file.
      
      ## How was this patch tested?
      
      Jenkins
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #16315 from zsxwing/FileStreamSourceSuite.
      
      (cherry picked from commit 4faa8a3e)
      Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
      d2a131a8
    • Dongjoon Hyun's avatar
      [SPARK-18897][SPARKR] Fix SparkR SQL Test to drop test table · df589be5
      Dongjoon Hyun authored
      
      ## What changes were proposed in this pull request?
      
      SparkR tests, `R/run-tests.sh`, succeeds only once because `test_sparkSQL.R` does not clean up the test table, `people`.
      
      As a result, the rows in `people` table are accumulated at every run and the test cases fail.
      
      The following is the failure result for the second run.
      
      ```r
      Failed -------------------------------------------------------------------------
      1. Failure: create DataFrame from RDD (test_sparkSQL.R#204) -------------------
      collect(sql("SELECT age from people WHERE name = 'Bob'"))$age not equal to c(16).
      Lengths differ: 2 vs 1
      
      2. Failure: create DataFrame from RDD (test_sparkSQL.R#206) -------------------
      collect(sql("SELECT height from people WHERE name ='Bob'"))$height not equal to c(176.5).
      Lengths differ: 2 vs 1
      ```
      
      ## How was this patch tested?
      
      Manual. Run `run-tests.sh` twice and check if it passes without failures.
      
      Author: Dongjoon Hyun <dongjoon@apache.org>
      
      Closes #16310 from dongjoon-hyun/SPARK-18897.
      
      (cherry picked from commit 1169db44)
      Signed-off-by: default avatarShivaram Venkataraman <shivaram@cs.berkeley.edu>
      df589be5
    • Takeshi YAMAMURO's avatar
      [SPARK-18108][SQL] Fix a schema inconsistent bug that makes a parquet reader fail to read data · d8ef0be8
      Takeshi YAMAMURO authored
      
      ## What changes were proposed in this pull request?
      A vectorized parquet reader fails to read column data if data schema and partition schema overlap with each other and inferred types in the partition schema differ from ones in the data schema. An example code to reproduce this bug is as follows;
      
      ```
      scala> case class A(a: Long, b: Int)
      scala> val as = Seq(A(1, 2))
      scala> spark.createDataFrame(as).write.parquet("/data/a=1/")
      scala> val df = spark.read.parquet("/data/")
      scala> df.printSchema
      root
       |-- a: long (nullable = true)
       |-- b: integer (nullable = true)
      scala> df.collect
      java.lang.NullPointerException
              at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getLong(OnHeapColumnVector.java:283)
              at org.apache.spark.sql.execution.vectorized.ColumnarBatch$Row.getLong(ColumnarBatch.java:191)
              at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
              at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
              at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
              at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      ```
      The root cause is that a logical layer (`HadoopFsRelation`) and a physical layer (`VectorizedParquetRecordReader`) have a different assumption on partition schema; the logical layer trusts the data schema to infer the type the overlapped partition columns, and, on the other hand, the physical layer trusts partition schema which is inferred from path string. To fix this bug, this pr simply updates `HadoopFsRelation.schema` to respect the partition columns position in data schema and respect the partition columns type in partition schema.
      
      ## How was this patch tested?
      Add tests in `ParquetPartitionDiscoverySuite`
      
      Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>
      
      Closes #16030 from maropu/SPARK-18108.
      
      (cherry picked from commit dc2a4d4a)
      Signed-off-by: default avatarWenchen Fan <wenchen@databricks.com>
      d8ef0be8
    • Shixiong Zhu's avatar
      [SPARK-18850][SS] Make StreamExecution and progress classes serializable · a73201da
      Shixiong Zhu authored
      
      ## What changes were proposed in this pull request?
      
      This PR adds StreamingQueryWrapper to make StreamExecution and progress classes serializable because it is too easy for it to get captured with normal usage. If StreamingQueryWrapper gets captured in a closure but no place calls its methods, it should not fail the Spark tasks. However if its methods are called, then this PR will throw a better message.
      
      ## How was this patch tested?
      
      `test("StreamingQuery should be Serializable but cannot be used in executors")`
      `test("progress classes should be Serializable")`
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #16272 from zsxwing/SPARK-18850.
      
      (cherry picked from commit d7f3058e)
      Signed-off-by: default avatarTathagata Das <tathagata.das1565@gmail.com>
      a73201da
  6. Dec 15, 2016
  7. Dec 14, 2016
    • Felix Cheung's avatar
      [SPARK-18849][ML][SPARKR][DOC] vignettes final check update · 2a8de2e1
      Felix Cheung authored
      ## What changes were proposed in this pull request?
      
      doc cleanup
      
      ## How was this patch tested?
      
      ~~vignettes is not building for me. I'm going to kick off a full clean build and try again and attach output here for review.~~
      Output html here: https://felixcheung.github.io/sparkr-vignettes.html
      
      
      
      Author: Felix Cheung <felixcheung_m@hotmail.com>
      
      Closes #16286 from felixcheung/rvignettespass.
      
      (cherry picked from commit 7d858bc5)
      Signed-off-by: default avatarShivaram Venkataraman <shivaram@cs.berkeley.edu>
      2a8de2e1
    • Dongjoon Hyun's avatar
      [SPARK-18875][SPARKR][DOCS] Fix R API doc generation by adding `DESCRIPTION` file · d399a297
      Dongjoon Hyun authored
      ## What changes were proposed in this pull request?
      
      Since Apache Spark 1.4.0, R API document page has a broken link on `DESCRIPTION file` because Jekyll plugin script doesn't copy the file. This PR aims to fix that.
      
      - Official Latest Website: http://spark.apache.org/docs/latest/api/R/index.html
      - Apache Spark 2.1.0-rc2: http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc2-docs/api/R/index.html
      
      
      
      ## How was this patch tested?
      
      Manual.
      
      ```bash
      cd docs
      SKIP_SCALADOC=1 jekyll build
      ```
      
      Author: Dongjoon Hyun <dongjoon@apache.org>
      
      Closes #16292 from dongjoon-hyun/SPARK-18875.
      
      (cherry picked from commit ec0eae48)
      Signed-off-by: default avatarShivaram Venkataraman <shivaram@cs.berkeley.edu>
      d399a297
    • Reynold Xin's avatar
      [SPARK-18869][SQL] Add TreeNode.p that returns BaseType · b14fc391
      Reynold Xin authored
      
      ## What changes were proposed in this pull request?
      After the bug fix in SPARK-18854, TreeNode.apply now returns TreeNode[_] rather than a more specific type. It would be easier for interactive debugging to introduce a function that returns the BaseType.
      
      ## How was this patch tested?
      N/A - this is a developer only feature used for interactive debugging. As long as it compiles, it should be good to go. I tested this in spark-shell.
      
      Author: Reynold Xin <rxin@databricks.com>
      
      Closes #16288 from rxin/SPARK-18869.
      
      (cherry picked from commit 5d510c69)
      Signed-off-by: default avatarReynold Xin <rxin@databricks.com>
      b14fc391
    • Wenchen Fan's avatar
      [SPARK-18856][SQL] non-empty partitioned table should not report zero size · cb2c8428
      Wenchen Fan authored
      
      ## What changes were proposed in this pull request?
      
      In `DataSource`, if the table is not analyzed, we will use 0 as the default value for table size. This is dangerous, we may broadcast a large table and cause OOM. We should use `defaultSizeInBytes` instead.
      
      ## How was this patch tested?
      
      new regression test
      
      Author: Wenchen Fan <wenchen@databricks.com>
      
      Closes #16280 from cloud-fan/bug.
      
      (cherry picked from commit d6f11a12)
      Signed-off-by: default avatarReynold Xin <rxin@databricks.com>
      cb2c8428
    • wm624@hotmail.com's avatar
      [SPARK-18865][SPARKR] SparkR vignettes MLP and LDA updates · 0d94201e
      wm624@hotmail.com authored
      
      ## What changes were proposed in this pull request?
      
      When do the QA work, I found that the following issues:
      
      1). `spark.mlp` doesn't include an example;
      2). `spark.mlp` and `spark.lda` have redundant parameter explanations;
      3). `spark.lda` document misses default values for some parameters.
      
      I also changed the `spark.logit` regParam in the examples, as we discussed in #16222.
      
      ## How was this patch tested?
      
      Manual test
      
      Author: wm624@hotmail.com <wm624@hotmail.com>
      
      Closes #16284 from wangmiao1981/ks.
      
      (cherry picked from commit 32438853)
      Signed-off-by: default avatarFelix Cheung <felixcheung@apache.org>
      0d94201e
    • Reynold Xin's avatar
      [SPARK-18854][SQL] numberedTreeString and apply(i) inconsistent for subqueries · 280c35af
      Reynold Xin authored
      
      ## What changes were proposed in this pull request?
      This is a bug introduced by subquery handling. numberedTreeString (which uses generateTreeString under the hood) numbers trees including innerChildren (used to print subqueries), but apply (which uses getNodeNumbered) ignores innerChildren. As a result, apply(i) would return the wrong plan node if there are subqueries.
      
      This patch fixes the bug.
      
      ## How was this patch tested?
      Added a test case in SubquerySuite.scala to test both the depth-first traversal of numbering as well as making sure the two methods are consistent.
      
      Author: Reynold Xin <rxin@databricks.com>
      
      Closes #16277 from rxin/SPARK-18854.
      
      (cherry picked from commit ffdd1fcd)
      Signed-off-by: default avatarReynold Xin <rxin@databricks.com>
      280c35af
    • Joseph K. Bradley's avatar
      [SPARK-18795][ML][SPARKR][DOC] Added KSTest section to SparkR vignettes · d0d9c572
      Joseph K. Bradley authored
      ## What changes were proposed in this pull request?
      
      Added short section for KSTest.
      Also added logreg model to list of ML models in vignette.  (This will be reorganized under SPARK-18849)
      
      ![screen shot 2016-12-14 at 1 37 31 pm](https://cloud.githubusercontent.com/assets/5084283/21202140/7f24e240-c202-11e6-9362-458208bb9159.png
      
      )
      
      ## How was this patch tested?
      
      Manually tested example locally.
      Built vignettes locally.
      
      Author: Joseph K. Bradley <joseph@databricks.com>
      
      Closes #16283 from jkbradley/ksTest-vignette.
      
      (cherry picked from commit 78627425)
      Signed-off-by: default avatarJoseph K. Bradley <joseph@databricks.com>
      d0d9c572
    • Shixiong Zhu's avatar
      [SPARK-18852][SS] StreamingQuery.lastProgress should be null when recentProgress is empty · c4de90fc
      Shixiong Zhu authored
      
      ## What changes were proposed in this pull request?
      
      Right now `StreamingQuery.lastProgress` throws NoSuchElementException and it's hard to be used in Python since Python user will just see Py4jError.
      
      This PR just makes it return null instead.
      
      ## How was this patch tested?
      
      `test("lastProgress should be null when recentProgress is empty")`
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #16273 from zsxwing/SPARK-18852.
      
      (cherry picked from commit 1ac6567b)
      Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
      c4de90fc
    • Reynold Xin's avatar
      [SPARK-18853][SQL] Project (UnaryNode) is way too aggressive in estimating statistics · e8866f9f
      Reynold Xin authored
      
      ## What changes were proposed in this pull request?
      This patch reduces the default number element estimation for arrays and maps from 100 to 1. The issue with the 100 number is that when nested (e.g. an array of map), 100 * 100 would be used as the default size. This sounds like just an overestimation which doesn't seem that bad (since it is usually better to overestimate than underestimate). However, due to the way we assume the size output for Project (new estimated column size / old estimated column size), this overestimation can become underestimation. It is actually in general in this case safer to assume 1 default element.
      
      ## How was this patch tested?
      This should be covered by existing tests.
      
      Author: Reynold Xin <rxin@databricks.com>
      
      Closes #16274 from rxin/SPARK-18853.
      
      (cherry picked from commit 5d799473)
      Signed-off-by: default avatarHerman van Hovell <hvanhovell@databricks.com>
      e8866f9f
    • hyukjinkwon's avatar
      [SPARK-18753][SQL] Keep pushed-down null literal as a filter in Spark-side... · af12a21c
      hyukjinkwon authored
      [SPARK-18753][SQL] Keep pushed-down null literal as a filter in Spark-side post-filter for FileFormat datasources
      
      ## What changes were proposed in this pull request?
      
      Currently, `FileSourceStrategy` does not handle the case when the pushed-down filter is `Literal(null)` and removes it at the post-filter in Spark-side.
      
      For example, the codes below:
      
      ```scala
      val df = Seq(Tuple1(Some(true)), Tuple1(None), Tuple1(Some(false))).toDF()
      df.filter($"_1" === "true").explain(true)
      ```
      
      shows it keeps `null` properly.
      
      ```
      == Parsed Logical Plan ==
      'Filter ('_1 = true)
      +- LocalRelation [_1#17]
      
      == Analyzed Logical Plan ==
      _1: boolean
      Filter (cast(_1#17 as double) = cast(true as double))
      +- LocalRelation [_1#17]
      
      == Optimized Logical Plan ==
      Filter (isnotnull(_1#17) && null)
      +- LocalRelation [_1#17]
      
      == Physical Plan ==
      *Filter (isnotnull(_1#17) && null)       << Here `null` is there
      +- LocalTableScan [_1#17]
      ```
      
      However, when we read it back from Parquet,
      
      ```scala
      val path = "/tmp/testfile"
      df.write.parquet(path)
      spark.read.parquet(path).filter($"_1" === "true").explain(true)
      ```
      
      `null` is removed at the post-filter.
      
      ```
      == Parsed Logical Plan ==
      'Filter ('_1 = true)
      +- Relation[_1#11] parquet
      
      == Analyzed Logical Plan ==
      _1: boolean
      Filter (cast(_1#11 as double) = cast(true as double))
      +- Relation[_1#11] parquet
      
      == Optimized Logical Plan ==
      Filter (isnotnull(_1#11) && null)
      +- Relation[_1#11] parquet
      
      == Physical Plan ==
      *Project [_1#11]
      +- *Filter isnotnull(_1#11)       << Here `null` is missing
         +- *FileScan parquet [_1#11] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/tmp/testfile], PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean>
      ```
      
      This PR fixes it to keep it properly. In more details,
      
      ```scala
      val partitionKeyFilters =
        ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
      ```
      
      This keeps this `null` in `partitionKeyFilters` as `Literal` always don't have `children` and `references` is being empty  which is always the subset of `partitionSet`.
      
      And then in
      
      ```scala
      val afterScanFilters = filterSet -- partitionKeyFilters
      ```
      
      `null` is always removed from the post filter. So, if the referenced fields are empty, it should be applied into data columns too.
      
      After this PR, it becomes as below:
      
      ```
      == Parsed Logical Plan ==
      'Filter ('_1 = true)
      +- Relation[_1#276] parquet
      
      == Analyzed Logical Plan ==
      _1: boolean
      Filter (cast(_1#276 as double) = cast(true as double))
      +- Relation[_1#276] parquet
      
      == Optimized Logical Plan ==
      Filter (isnotnull(_1#276) && null)
      +- Relation[_1#276] parquet
      
      == Physical Plan ==
      *Project [_1#276]
      +- *Filter (isnotnull(_1#276) && null)
         +- *FileScan parquet [_1#276] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/private/var/folders/9j/gf_c342d7d150mwrxvkqnc180000gn/T/spark-a5d59bdb-5b..., PartitionFilters: [null], PushedFilters: [IsNotNull(_1)], ReadSchema: struct<_1:boolean>
      ```
      
      ## How was this patch tested?
      
      Unit test in `FileSourceStrategySuite`
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #16184 from HyukjinKwon/SPARK-18753.
      
      (cherry picked from commit 89ae26dc)
      Signed-off-by: default avatarCheng Lian <lian@databricks.com>
      af12a21c
    • Cheng Lian's avatar
      [SPARK-18730] Post Jenkins test report page instead of the full console output page to GitHub · 16d4bd4a
      Cheng Lian authored
      
      ## What changes were proposed in this pull request?
      
      Currently, the full console output page of a Spark Jenkins PR build can be as large as several megabytes. It takes a relatively long time to load and may even freeze the browser for quite a while.
      
      This PR makes the build script to post the test report page link to GitHub instead. The test report page is way more concise and is usually the first page I'd like to check when investigating a Jenkins build failure.
      
      Note that for builds that a test report is not available (ongoing builds and builds that fail before test execution), the test report link automatically redirects to the build page.
      
      ## How was this patch tested?
      
      N/A.
      
      Author: Cheng Lian <lian@databricks.com>
      
      Closes #16163 from liancheng/jenkins-test-report.
      
      (cherry picked from commit ba4aab9b)
      Signed-off-by: default avatarReynold Xin <rxin@databricks.com>
      16d4bd4a
    • Nattavut Sutyanyong's avatar
      [SPARK-18814][SQL] CheckAnalysis rejects TPCDS query 32 · f999312e
      Nattavut Sutyanyong authored
      
      ## What changes were proposed in this pull request?
      Move the checking of GROUP BY column in correlated scalar subquery from CheckAnalysis
      to Analysis to fix a regression caused by SPARK-18504.
      
      This problem can be reproduced with a simple script now.
      
      Seq((1,1)).toDF("pk","pv").createOrReplaceTempView("p")
      Seq((1,1)).toDF("ck","cv").createOrReplaceTempView("c")
      sql("select * from p,c where p.pk=c.ck and c.cv = (select avg(c1.cv) from c c1 where c1.ck = p.pk)").show
      
      The requirements are:
      1. We need to reference the same table twice in both the parent and the subquery. Here is the table c.
      2. We need to have a correlated predicate but to a different table. Here is from c (as c1) in the subquery to p in the parent.
      3. We will then "deduplicate" c1.ck in the subquery to `ck#<n1>#<n2>` at `Project` above `Aggregate` of `avg`. Then when we compare `ck#<n1>#<n2>` and the original group by column `ck#<n1>` by their canonicalized form, which is #<n2> != #<n1>. That's how we trigger the exception added in SPARK-18504.
      
      ## How was this patch tested?
      
      SubquerySuite and a simplified version of TPCDS-Q32
      
      Author: Nattavut Sutyanyong <nsy.can@gmail.com>
      
      Closes #16246 from nsyca/18814.
      
      (cherry picked from commit cccd6439)
      Signed-off-by: default avatarHerman van Hovell <hvanhovell@databricks.com>
      f999312e
  8. Dec 13, 2016
Loading