Skip to content
Snippets Groups Projects
  1. Mar 06, 2017
    • Tyson Condie's avatar
      [SPARK-19719][SS] Kafka writer for both structured streaming and batch queires · fd6c6d5c
      Tyson Condie authored
      ## What changes were proposed in this pull request?
      
      Add a new Kafka Sink and Kafka Relation for writing streaming and batch queries, respectively, to Apache Kafka.
      ### Streaming Kafka Sink
      - When addBatch is called
      -- If batchId is great than the last written batch
      --- Write batch to Kafka
      ---- Topic will be taken from the record, if present, or from a topic option, which overrides topic in record.
      -- Else ignore
      
      ### Batch Kafka Sink
      - KafkaSourceProvider will implement CreatableRelationProvider
      - CreatableRelationProvider#createRelation will write the passed in Dataframe to a Kafka
      - Topic will be taken from the record, if present, or from topic option, which overrides topic in record.
      - Save modes Append and ErrorIfExist supported under identical semantics. Other save modes result in an AnalysisException
      
      tdas zsxwing
      
      ## How was this patch tested?
      
      ### The following unit tests will be included
      - write to stream with topic field: valid stream write with data that includes an existing topic in the schema
      - write structured streaming aggregation w/o topic field, with default topic: valid stream write with data that does not include a topic field, but the configuration includes a default topic
      - write data with bad schema: various cases of writing data that does not conform to a proper schema e.g., 1. no topic field or default topic, and 2. no value field
      - write data with valid schema but wrong types: data with a complete schema but wrong types e.g., key and value types are integers.
      - write to non-existing topic: write a stream to a topic that does not exist in Kafka, which has been configured to not auto-create topics.
      - write batch to kafka: simple write batch to Kafka, which goes through the same code path as streaming scenario, so validity checks will not be redone here.
      
      ### Examples
      ```scala
      // Structured Streaming
      val writer = inputStringStream.map(s => s.get(0).toString.getBytes()).toDF("value")
       .selectExpr("value as key", "value as value")
       .writeStream
       .format("kafka")
       .option("checkpointLocation", checkpointDir)
       .outputMode(OutputMode.Append)
       .option("kafka.bootstrap.servers", brokerAddress)
       .option("topic", topic)
       .queryName("kafkaStream")
       .start()
      
      // Batch
      val df = spark
       .sparkContext
       .parallelize(Seq("1", "2", "3", "4", "5"))
       .map(v => (topic, v))
       .toDF("topic", "value")
      
      df.write
       .format("kafka")
       .option("kafka.bootstrap.servers",brokerAddress)
       .option("topic", topic)
       .save()
      ```
      Please review http://spark.apache.org/contributing.html before opening a pull request.
      
      Author: Tyson Condie <tcondie@gmail.com>
      
      Closes #17043 from tcondie/kafka-writer.
      fd6c6d5c
  2. Mar 05, 2017
    • uncleGen's avatar
      [SPARK-19822][TEST] CheckpointSuite.testCheckpointedOperation: should not... · ca7a7e8a
      uncleGen authored
      [SPARK-19822][TEST] CheckpointSuite.testCheckpointedOperation: should not filter checkpointFilesOfLatestTime with the PATH string.
      
      ## What changes were proposed in this pull request?
      
      https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73800/testReport/
      
      
      
      ```
      sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedDueToTimeoutException: The code
      passed to eventually never returned normally. Attempted 617 times over 10.003740484 seconds.
      Last failure message: 8 did not equal 2.
      	at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
      	at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
      	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
      	at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336)
      	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
      	at org.apache.spark.streaming.DStreamCheckpointTester$class.generateOutput(CheckpointSuite
      .scala:172)
      	at org.apache.spark.streaming.CheckpointSuite.generateOutput(CheckpointSuite.scala:211)
      ```
      
      the check condition is:
      
      ```
      val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter {
           _.toString.contains(clock.getTimeMillis.toString)
      }
      // Checkpoint files are written twice for every batch interval. So assert that both
      // are written to make sure that both of them have been written.
      assert(checkpointFilesOfLatestTime.size === 2)
      ```
      
      the path string may contain the `clock.getTimeMillis.toString`, like `3500` :
      
      ```
      file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-500
      file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-1000
      file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-1500
      file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-2000
      file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-2500
      file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3000
      file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3500.bk
      file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3500
                                                             ▲▲▲▲
      ```
      
      so we should only check the filename, but not the whole path.
      
      ## How was this patch tested?
      
      Jenkins.
      
      Author: uncleGen <hustyugm@gmail.com>
      
      Closes #17167 from uncleGen/flaky-CheckpointSuite.
      
      (cherry picked from commit 207067ea)
      Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
      ca7a7e8a
  3. Mar 03, 2017
    • Shixiong Zhu's avatar
      [SPARK-19816][SQL][TESTS] Fix an issue that DataFrameCallbackSuite doesn't recover the log level · 664c9795
      Shixiong Zhu authored
      
      ## What changes were proposed in this pull request?
      
      "DataFrameCallbackSuite.execute callback functions when a DataFrame action failed" sets the log level to "fatal" but doesn't recover it. Hence, tests running after it won't output any logs except fatal logs.
      
      This PR uses `testQuietly` instead to avoid changing the log level.
      
      ## How was this patch tested?
      
      Jenkins
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #17156 from zsxwing/SPARK-19816.
      
      (cherry picked from commit fbc40580)
      Signed-off-by: default avatarYin Huai <yhuai@databricks.com>
      664c9795
    • Burak Yavuz's avatar
      [SPARK-19774] StreamExecution should call stop() on sources when a stream fails · da04d45c
      Burak Yavuz authored
      
      ## What changes were proposed in this pull request?
      
      We call stop() on a Structured Streaming Source only when the stream is shutdown when a user calls streamingQuery.stop(). We should actually stop all sources when the stream fails as well, otherwise we may leak resources, e.g. connections to Kafka.
      
      ## How was this patch tested?
      
      Unit tests in `StreamingQuerySuite`.
      
      Author: Burak Yavuz <brkyvz@gmail.com>
      
      Closes #17107 from brkyvz/close-source.
      
      (cherry picked from commit 9314c083)
      Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
      da04d45c
    • Zhe Sun's avatar
      [SPARK-19797][DOC] ML pipeline document correction · accbed7c
      Zhe Sun authored
      ## What changes were proposed in this pull request?
      Description about pipeline in this paragraph is incorrect https://spark.apache.org/docs/latest/ml-pipeline.html#how-it-works
      
      
      
      > If the Pipeline had more **stages**, it would call the LogisticRegressionModel’s transform() method on the DataFrame before passing the DataFrame to the next stage.
      
      Reason: Transformer could also be a stage. But only another Estimator will invoke an transform call and pass the data to next stage. The description in the document misleads ML pipeline users.
      
      ## How was this patch tested?
      This is a tiny modification of **docs/ml-pipelines.md**. I jekyll build the modification and check the compiled document.
      
      Author: Zhe Sun <ymwdalex@gmail.com>
      
      Closes #17137 from ymwdalex/SPARK-19797-ML-pipeline-document-correction.
      
      (cherry picked from commit 0bac3e4c)
      Signed-off-by: default avatarSean Owen <sowen@cloudera.com>
      accbed7c
  4. Mar 02, 2017
  5. Mar 01, 2017
    • Michael Gummelt's avatar
      [SPARK-19373][MESOS] Base spark.scheduler.minRegisteredResourceRatio … · 27347b5f
      Michael Gummelt authored
      …on registered cores rather than accepted cores
      
      See JIRA
      
      Unit tests, Mesos/Spark integration tests
      
      cc skonto susanxhuynh
      
      Author: Michael Gummelt <mgummeltmesosphere.io>
      
      Closes #17045 from mgummelt/SPARK-19373-registered-resources.
      
      ## What changes were proposed in this pull request?
      
      (Please fill in changes proposed in this fix)
      
      ## How was this patch tested?
      
      (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
      (If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
      
      Please review http://spark.apache.org/contributing.html before opening a pull request.
      
      Author: Michael Gummelt <mgummelt@mesosphere.io>
      
      Closes #17129 from mgummelt/SPARK-19373-registered-resources-2.1.
      27347b5f
    • Stan Zhai's avatar
      [SPARK-19766][SQL] Constant alias columns in INNER JOIN should not be folded... · bbe0d8ca
      Stan Zhai authored
      [SPARK-19766][SQL] Constant alias columns in INNER JOIN should not be folded by FoldablePropagation rule
      
      ## What changes were proposed in this pull request?
      This PR fixes the code in Optimizer phase where the constant alias columns of a `INNER JOIN` query are folded in Rule `FoldablePropagation`.
      
      For the following query():
      
      ```
      val sqlA =
        """
          |create temporary view ta as
          |select a, 'a' as tag from t1 union all
          |select a, 'b' as tag from t2
        """.stripMargin
      
      val sqlB =
        """
          |create temporary view tb as
          |select a, 'a' as tag from t3 union all
          |select a, 'b' as tag from t4
        """.stripMargin
      
      val sql =
        """
          |select tb.* from ta inner join tb on
          |ta.a = tb.a and
          |ta.tag = tb.tag
        """.stripMargin
      ```
      
      The tag column is an constant alias column, it's folded by `FoldablePropagation` like this:
      
      ```
      TRACE SparkOptimizer:
      === Applying Rule org.apache.spark.sql.catalyst.optimizer.FoldablePropagation ===
       Project [a#4, tag#14]                              Project [a#4, tag#14]
      !+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14))   +- Join Inner, ((a#0 = a#4) && (a = a))
          :- Union                                           :- Union
          :  :- Project [a#0, a AS tag#8]                    :  :- Project [a#0, a AS tag#8]
          :  :  +- LocalRelation [a#0]                       :  :  +- LocalRelation [a#0]
          :  +- Project [a#2, b AS tag#9]                    :  +- Project [a#2, b AS tag#9]
          :     +- LocalRelation [a#2]                       :     +- LocalRelation [a#2]
          +- Union                                           +- Union
             :- Project [a#4, a AS tag#14]                      :- Project [a#4, a AS tag#14]
             :  +- LocalRelation [a#4]                          :  +- LocalRelation [a#4]
             +- Project [a#6, b AS tag#15]                      +- Project [a#6, b AS tag#15]
                +- LocalRelation [a#6]                             +- LocalRelation [a#6]
      ```
      
      Finally the Result of Batch Operator Optimizations is:
      
      ```
      Project [a#4, tag#14]                              Project [a#4, tag#14]
      !+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14))   +- Join Inner, (a#0 = a#4)
      !   :- SubqueryAlias ta, `ta`                          :- Union
      !   :  +- Union                                        :  :- LocalRelation [a#0]
      !   :     :- Project [a#0, a AS tag#8]                 :  +- LocalRelation [a#2]
      !   :     :  +- SubqueryAlias t1, `t1`                 +- Union
      !   :     :     +- Project [a#0]                          :- LocalRelation [a#4, tag#14]
      !   :     :        +- SubqueryAlias grouping              +- LocalRelation [a#6, tag#15]
      !   :     :           +- LocalRelation [a#0]
      !   :     +- Project [a#2, b AS tag#9]
      !   :        +- SubqueryAlias t2, `t2`
      !   :           +- Project [a#2]
      !   :              +- SubqueryAlias grouping
      !   :                 +- LocalRelation [a#2]
      !   +- SubqueryAlias tb, `tb`
      !      +- Union
      !         :- Project [a#4, a AS tag#14]
      !         :  +- SubqueryAlias t3, `t3`
      !         :     +- Project [a#4]
      !         :        +- SubqueryAlias grouping
      !         :           +- LocalRelation [a#4]
      !         +- Project [a#6, b AS tag#15]
      !            +- SubqueryAlias t4, `t4`
      !               +- Project [a#6]
      !                  +- SubqueryAlias grouping
      !                     +- LocalRelation [a#6]
      ```
      
      The condition `tag#8 = tag#14` of INNER JOIN has been removed. This leads to the data of inner join being wrong.
      
      After fix:
      
      ```
      === Result of Batch LocalRelation ===
       GlobalLimit 21                                           GlobalLimit 21
       +- LocalLimit 21                                         +- LocalLimit 21
          +- Project [a#4, tag#11]                                 +- Project [a#4, tag#11]
             +- Join Inner, ((a#0 = a#4) && (tag#8 = tag#11))         +- Join Inner, ((a#0 = a#4) && (tag#8 = tag#11))
      !         :- SubqueryAlias ta                                      :- Union
      !         :  +- Union                                              :  :- LocalRelation [a#0, tag#8]
      !         :     :- Project [a#0, a AS tag#8]                       :  +- LocalRelation [a#2, tag#9]
      !         :     :  +- SubqueryAlias t1                             +- Union
      !         :     :     +- Project [a#0]                                :- LocalRelation [a#4, tag#11]
      !         :     :        +- SubqueryAlias grouping                    +- LocalRelation [a#6, tag#12]
      !         :     :           +- LocalRelation [a#0]
      !         :     +- Project [a#2, b AS tag#9]
      !         :        +- SubqueryAlias t2
      !         :           +- Project [a#2]
      !         :              +- SubqueryAlias grouping
      !         :                 +- LocalRelation [a#2]
      !         +- SubqueryAlias tb
      !            +- Union
      !               :- Project [a#4, a AS tag#11]
      !               :  +- SubqueryAlias t3
      !               :     +- Project [a#4]
      !               :        +- SubqueryAlias grouping
      !               :           +- LocalRelation [a#4]
      !               +- Project [a#6, b AS tag#12]
      !                  +- SubqueryAlias t4
      !                     +- Project [a#6]
      !                        +- SubqueryAlias grouping
      !                           +- LocalRelation [a#6]
      ```
      
      ## How was this patch tested?
      
      add sql-tests/inputs/inner-join.sql
      All tests passed.
      
      Author: Stan Zhai <zhaishidan@haizhi.com>
      
      Closes #17099 from stanzhai/fix-inner-join.
      
      (cherry picked from commit 5502a9cf)
      Signed-off-by: default avatarXiao Li <gatorsmile@gmail.com>
      bbe0d8ca
    • Jeff Zhang's avatar
      [SPARK-19572][SPARKR] Allow to disable hive in sparkR shell · f719cccd
      Jeff Zhang authored
      
      ## What changes were proposed in this pull request?
      SPARK-15236 do this for scala shell, this ticket is for sparkR shell. This is not only for sparkR itself, but can also benefit downstream project like livy which use shell.R for its interactive session. For now, livy has no control of whether enable hive or not.
      
      ## How was this patch tested?
      
      Tested it manually, run `bin/sparkR --master local --conf spark.sql.catalogImplementation=in-memory` and verify hive is not enabled.
      
      Author: Jeff Zhang <zjffdu@apache.org>
      
      Closes #16907 from zjffdu/SPARK-19572.
      
      (cherry picked from commit 73158805)
      Signed-off-by: default avatarFelix Cheung <felixcheung@apache.org>
      f719cccd
  6. Feb 28, 2017
    • Michael McCune's avatar
      [SPARK-19769][DOCS] Update quickstart instructions · d887f758
      Michael McCune authored
      
      ## What changes were proposed in this pull request?
      
      This change addresses the renaming of the `simple.sbt` build file to
      `build.sbt`. Newer versions of the sbt tool are not finding the older
      named file and are looking for the `build.sbt`. The quickstart
      instructions for self-contained applications is updated with this
      change.
      
      ## How was this patch tested?
      
      As this is a relatively minor change of a few words, the markdown was checked for syntax and spelling. Site was built with `SKIP_API=1 jekyll serve` for testing purposes.
      
      Author: Michael McCune <msm@redhat.com>
      
      Closes #17101 from elmiko/spark-19769.
      
      (cherry picked from commit bf5987cb)
      Signed-off-by: default avatarSean Owen <sowen@cloudera.com>
      d887f758
    • Roberto Agostino Vitillo's avatar
      [SPARK-19677][SS] Committing a delta file atop an existing one should not fail on HDFS · 947c0cd9
      Roberto Agostino Vitillo authored
      ## What changes were proposed in this pull request?
      
      HDFSBackedStateStoreProvider fails to rename files on HDFS but not on the local filesystem. According to the [implementation notes](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html
      
      ) of `rename()`, the behavior of the local filesystem and HDFS varies:
      
      > Destination exists and is a file
      > Renaming a file atop an existing file is specified as failing, raising an exception.
      >    - Local FileSystem : the rename succeeds; the destination file is replaced by the source file.
      >    - HDFS : The rename fails, no exception is raised. Instead the method call simply returns false.
      
      This patch ensures that `rename()` isn't called if the destination file already exists. It's still semantically correct because Structured Streaming requires that rerunning a batch should generate the same output.
      
      ## How was this patch tested?
      
      This patch was tested by running `StateStoreSuite`.
      
      Author: Roberto Agostino Vitillo <ra.vitillo@gmail.com>
      
      Closes #17012 from vitillo/fix_rename.
      
      (cherry picked from commit 9734a928)
      Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
      947c0cd9
    • windpiger's avatar
      [SPARK-19748][SQL] refresh function has a wrong order to do cache invalidate... · 4b4c3bf3
      windpiger authored
      [SPARK-19748][SQL] refresh function has a wrong order to do cache invalidate and regenerate the inmemory var for InMemoryFileIndex with FileStatusCache
      
      ## What changes were proposed in this pull request?
      
      If we refresh a InMemoryFileIndex with a FileStatusCache, it will first use the FileStatusCache to re-generate the cachedLeafFiles etc, then call FileStatusCache.invalidateAll.
      
      While the order to do these two actions is wrong, this lead to the refresh action does not take effect.
      
      ```
        override def refresh(): Unit = {
          refresh0()
          fileStatusCache.invalidateAll()
        }
      
        private def refresh0(): Unit = {
          val files = listLeafFiles(rootPaths)
          cachedLeafFiles =
            new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
          cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
          cachedPartitionSpec = null
        }
      ```
      ## How was this patch tested?
      unit test added
      
      Author: windpiger <songjun@outlook.com>
      
      Closes #17079 from windpiger/fixInMemoryFileIndexRefresh.
      
      (cherry picked from commit a350bc16)
      Signed-off-by: default avatarWenchen Fan <wenchen@databricks.com>
      4b4c3bf3
  7. Feb 26, 2017
    • Eyal Zituny's avatar
      [SPARK-19594][STRUCTURED STREAMING] StreamingQueryListener fails to handle... · 04fbb9e0
      Eyal Zituny authored
      [SPARK-19594][STRUCTURED STREAMING] StreamingQueryListener fails to handle QueryTerminatedEvent if more then one listeners exists
      
      ## What changes were proposed in this pull request?
      
      currently if multiple streaming queries listeners exists, when a QueryTerminatedEvent is triggered, only one of the listeners will be invoked while the rest of the listeners will ignore the event.
      this is caused since the the streaming queries listeners bus holds a set of running queries ids and when a termination event is triggered, after the first listeners is handling the event, the terminated query id is being removed from the set.
      in this PR, the query id will be removed from the set only after all the listeners handles the event
      
      ## How was this patch tested?
      
      a test with multiple listeners has been added to StreamingQueryListenerSuite
      
      Author: Eyal Zituny <eyal.zituny@equalum.io>
      
      Closes #16991 from eyalzit/master.
      
      (cherry picked from commit 9f8e3921)
      Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
      04fbb9e0
  8. Feb 25, 2017
    • Bryan Cutler's avatar
      [SPARK-14772][PYTHON][ML] Fixed Params.copy method to match Scala implementation · 20a43295
      Bryan Cutler authored
      ## What changes were proposed in this pull request?
      Fixed the PySpark Params.copy method to behave like the Scala implementation.  The main issue was that it did not account for the _defaultParamMap and merged it into the explicitly created param map.
      
      ## How was this patch tested?
      Added new unit test to verify the copy method behaves correctly for copying uid, explicitly created params, and default params.
      
      Author: Bryan Cutler <cutlerb@gmail.com>
      
      Closes #17048 from BryanCutler/pyspark-ml-param_copy-Scala_sync-SPARK-14772-2_1.
      20a43295
    • Boaz Mohar's avatar
      [MINOR][DOCS] Fixes two problems in the SQL programing guide page · 97866e19
      Boaz Mohar authored
      
      ## What changes were proposed in this pull request?
      
      Removed duplicated lines in sql python example and found a typo.
      
      ## How was this patch tested?
      
      Searched for other typo's in the page to minimize PR's.
      
      Author: Boaz Mohar <boazmohar@gmail.com>
      
      Closes #17066 from boazmohar/doc-fix.
      
      (cherry picked from commit 061bcfb8)
      Signed-off-by: default avatarXiao Li <gatorsmile@gmail.com>
      97866e19
  9. Feb 24, 2017
    • jerryshao's avatar
      [SPARK-19038][YARN] Avoid overwriting keytab configuration in yarn-client · ed9aaa31
      jerryshao authored
      
      ## What changes were proposed in this pull request?
      
      Because yarn#client will reset the `spark.yarn.keytab` configuration to point to the location in distributed file, so if user still uses the old `SparkConf` to create `SparkSession` with Hive enabled, it will read keytab from the path in distributed cached. This is OK for yarn cluster mode, but in yarn client mode where driver is running out of container, it will be failed to fetch the keytab.
      
      So here we should avoid reseting this configuration in the `yarn#client` and only overwriting it for AM, so using `spark.yarn.keytab` could get correct keytab path no matter running in client (keytab in local fs) or cluster (keytab in distributed cache) mode.
      
      ## How was this patch tested?
      
      Verified in security cluster.
      
      Author: jerryshao <sshao@hortonworks.com>
      
      Closes #16923 from jerryshao/SPARK-19038.
      
      (cherry picked from commit a920a436)
      Signed-off-by: default avatarMarcelo Vanzin <vanzin@cloudera.com>
      ed9aaa31
    • jerryshao's avatar
      [SPARK-19707][CORE] Improve the invalid path check for sc.addJar · 6da6a27f
      jerryshao authored
      
      ## What changes were proposed in this pull request?
      
      Currently in Spark there're two issues when we add jars with invalid path:
      
      * If the jar path is a empty string {--jar ",dummy.jar"}, then Spark will resolve it to the current directory path and add to classpath / file server, which is unwanted. This is happened in our programatic way to submit Spark application. From my understanding Spark should defensively filter out such empty path.
      * If the jar path is a invalid path (file doesn't exist), `addJar` doesn't check it and will still add to file server, the exception will be delayed until job running. Actually this local path could be checked beforehand, no need to wait until task running. We have similar check in `addFile`, but lacks similar similar mechanism in `addJar`.
      
      ## How was this patch tested?
      
      Add unit test and local manual verification.
      
      Author: jerryshao <sshao@hortonworks.com>
      
      Closes #17038 from jerryshao/SPARK-19707.
      
      (cherry picked from commit b0a8c16f)
      Signed-off-by: default avatarMarcelo Vanzin <vanzin@cloudera.com>
      6da6a27f
    • Takeshi Yamamuro's avatar
      [SPARK-19691][SQL][BRANCH-2.1] Fix ClassCastException when calculating percentile of decimal column · 66a7ca28
      Takeshi Yamamuro authored
      ## What changes were proposed in this pull request?
      This is a backport of the two following commits: https://github.com/apache/spark/commit/93aa4271596a30752dc5234d869c3ae2f6e8e723
      
      This pr fixed a class-cast exception below;
      ```
      scala> spark.range(10).selectExpr("cast (id as decimal) as x").selectExpr("percentile(x, 0.5)").collect()
       java.lang.ClassCastException: org.apache.spark.sql.types.Decimal cannot be cast to java.lang.Number
      	at org.apache.spark.sql.catalyst.expressions.aggregate.Percentile.update(Percentile.scala:141)
      	at org.apache.spark.sql.catalyst.expressions.aggregate.Percentile.update(Percentile.scala:58)
      	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.update(interfaces.scala:514)
      	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
      	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$anonfun$applyOrElse$1.apply(AggregationIterator.scala:171)
      	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:187)
      	at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateProcessRow$1.apply(AggregationIterator.scala:181)
      	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:151)
      	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:78)
      	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:109)
      	at
      ```
      This fix simply converts catalyst values (i.e., `Decimal`) into scala ones by using `CatalystTypeConverters`.
      
      ## How was this patch tested?
      Added a test in `DataFrameSuite`.
      
      Author: Takeshi Yamamuro <yamamuro@apache.org>
      
      Closes #17046 from maropu/SPARK-19691-BACKPORT2.1.
      66a7ca28
  10. Feb 23, 2017
  11. Feb 22, 2017
    • Marcelo Vanzin's avatar
      [SPARK-19652][UI] Do auth checks for REST API access (branch-2.1). · 21afc453
      Marcelo Vanzin authored
      The REST API has a security filter that performs auth checks
      based on the UI root's security manager. That works fine when
      the UI root is the app's UI, but not when it's the history server.
      
      In the SHS case, all users would be allowed to see all applications
      through the REST API, even if the UI itself wouldn't be available
      to them.
      
      This change adds auth checks for each app access through the API
      too, so that only authorized users can see the app's data.
      
      The change also modifies the existing security filter to use
      `HttpServletRequest.getRemoteUser()`, which is used in other
      places. That is not necessarily the same as the principal's
      name; for example, when using Hadoop's SPNEGO auth filter,
      the remote user strips the realm information, which then matches
      the user name registered as the owner of the application.
      
      I also renamed the UIRootFromServletContext trait to a more generic
      name since I'm using it to store more context information now.
      
      Tested manually with an authentication filter enabled.
      
      Author: Marcelo Vanzin <vanzin@cloudera.com>
      
      Closes #17019 from vanzin/SPARK-19652_2.1.
      21afc453
  12. Feb 21, 2017
  13. Feb 20, 2017
  14. Feb 17, 2017
    • Roberto Agostino Vitillo's avatar
      [SPARK-19517][SS] KafkaSource fails to initialize partition offsets · b083ec51
      Roberto Agostino Vitillo authored
      ## What changes were proposed in this pull request?
      
      This patch fixes a bug in `KafkaSource` with the (de)serialization of the length of the JSON string that contains the initial partition offsets.
      
      ## How was this patch tested?
      
      I ran the test suite for spark-sql-kafka-0-10.
      
      Author: Roberto Agostino Vitillo <ra.vitillo@gmail.com>
      
      Closes #16857 from vitillo/kafka_source_fix.
      b083ec51
    • Davies Liu's avatar
      [SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMap · 6e3abed8
      Davies Liu authored
      
      ## What changes were proposed in this pull request?
      
      Radix sort require that half of array as free (as temporary space), so we use 0.5 as the scale factor to make sure that BytesToBytesMap will not have more items than 1/2 of capacity. Turned out this is not true, the current implementation of append() could leave 1 more item than the threshold (1/2 of capacity) in the array, which break the requirement of radix sort (fail the assert in 2.2, or fail to insert into InMemorySorter in 2.1).
      
      This PR fix the off-by-one bug in BytesToBytesMap.
      
      This PR also fix a bug that the array will never grow if it fail to grow once (stay as initial capacity), introduced by #15722 .
      
      ## How was this patch tested?
      
      Added regression test.
      
      Author: Davies Liu <davies@databricks.com>
      
      Closes #16844 from davies/off_by_one.
      
      (cherry picked from commit 3d0c3af0)
      Signed-off-by: default avatarDavies Liu <davies.liu@gmail.com>
      6e3abed8
    • Stan Zhai's avatar
      [SPARK-19622][WEBUI] Fix a http error in a paged table when using a `Go` button to search. · 55958bcd
      Stan Zhai authored
      ## What changes were proposed in this pull request?
      
      The search function of paged table is not available because of we don't skip the hash data of the reqeust path.
      
      ![](https://issues.apache.org/jira/secure/attachment/12852996/screenshot-1.png
      
      )
      
      ## How was this patch tested?
      
      Tested manually with my browser.
      
      Author: Stan Zhai <zhaishidan@haizhi.com>
      
      Closes #16953 from stanzhai/fix-webui-paged-table.
      
      (cherry picked from commit 021062af)
      Signed-off-by: default avatarSean Owen <sowen@cloudera.com>
      55958bcd
  15. Feb 15, 2017
    • Felix Cheung's avatar
      [SPARK-19399][SPARKR][BACKPORT-2.1] fix tests broken by merge · 252dd05f
      Felix Cheung authored
      ## What changes were proposed in this pull request?
      
      fix test broken by git merge for #16739
      
      ## How was this patch tested?
      
      manual
      
      Author: Felix Cheung <felixcheung_m@hotmail.com>
      
      Closes #16950 from felixcheung/fixrtest.
      252dd05f
    • Shixiong Zhu's avatar
      [SPARK-19603][SS] Fix StreamingQuery explain command · db7adb61
      Shixiong Zhu authored
      
      ## What changes were proposed in this pull request?
      
      `StreamingQuery.explain` doesn't show the correct streaming physical plan right now because `ExplainCommand` receives a runtime batch plan and its `logicalPlan.isStreaming` is always false.
      
      This PR adds `streaming` parameter to `ExplainCommand` to allow `StreamExecution` to specify that it's a streaming plan.
      
      Examples of the explain outputs:
      
      - streaming DataFrame.explain()
      ```
      == Physical Plan ==
      *HashAggregate(keys=[value#518], functions=[count(1)])
      +- StateStoreSave [value#518], OperatorStateId(<unknown>,0,0), Append, 0
         +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
            +- StateStoreRestore [value#518], OperatorStateId(<unknown>,0,0)
               +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
                  +- Exchange hashpartitioning(value#518, 5)
                     +- *HashAggregate(keys=[value#518], functions=[partial_count(1)])
                        +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
                           +- *MapElements <function1>, obj#517: java.lang.String
                              +- *DeserializeToObject value#513.toString, obj#516: java.lang.String
                                 +- StreamingRelation MemoryStream[value#513], [value#513]
      ```
      
      - StreamingQuery.explain(extended = false)
      ```
      == Physical Plan ==
      *HashAggregate(keys=[value#518], functions=[count(1)])
      +- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0
         +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
            +- StateStoreRestore [value#518], OperatorStateId(...,0,0)
               +- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
                  +- Exchange hashpartitioning(value#518, 5)
                     +- *HashAggregate(keys=[value#518], functions=[partial_count(1)])
                        +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
                           +- *MapElements <function1>, obj#517: java.lang.String
                              +- *DeserializeToObject value#543.toString, obj#516: java.lang.String
                                 +- LocalTableScan [value#543]
      ```
      
      - StreamingQuery.explain(extended = true)
      ```
      == Parsed Logical Plan ==
      Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
      +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
         +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
            +- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String
               +- LocalRelation [value#543]
      
      == Analyzed Logical Plan ==
      value: string, count(1): bigint
      Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
      +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
         +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
            +- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String
               +- LocalRelation [value#543]
      
      == Optimized Logical Plan ==
      Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
      +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
         +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
            +- DeserializeToObject value#543.toString, obj#516: java.lang.String
               +- LocalRelation [value#543]
      
      == Physical Plan ==
      *HashAggregate(keys=[value#518], functions=[count(1)], output=[value#518, count(1)#524L])
      +- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0
         +- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L])
            +- StateStoreRestore [value#518], OperatorStateId(...,0,0)
               +- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L])
                  +- Exchange hashpartitioning(value#518, 5)
                     +- *HashAggregate(keys=[value#518], functions=[partial_count(1)], output=[value#518, count#530L])
                        +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
                           +- *MapElements <function1>, obj#517: java.lang.String
                              +- *DeserializeToObject value#543.toString, obj#516: java.lang.String
                                 +- LocalTableScan [value#543]
      ```
      
      ## How was this patch tested?
      
      The updated unit test.
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #16934 from zsxwing/SPARK-19603.
      
      (cherry picked from commit fc02ef95)
      Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
      db7adb61
    • Yin Huai's avatar
      [SPARK-19604][TESTS] Log the start of every Python test · b9ab4c0e
      Yin Huai authored
      
      ## What changes were proposed in this pull request?
      Right now, we only have info level log after we finish the tests of a Python test file. We should also log the start of a test. So, if a test is hanging, we can tell which test file is running.
      
      ## How was this patch tested?
      This is a change for python tests.
      
      Author: Yin Huai <yhuai@databricks.com>
      
      Closes #16935 from yhuai/SPARK-19604.
      
      (cherry picked from commit f6c3bba2)
      Signed-off-by: default avatarYin Huai <yhuai@databricks.com>
      b9ab4c0e
    • Shixiong Zhu's avatar
      [SPARK-19599][SS] Clean up HDFSMetadataLog · 88c43f4f
      Shixiong Zhu authored
      ## What changes were proposed in this pull request?
      
      SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some cleanup for HDFSMetadataLog.
      
      This PR includes the following changes:
      - ~~Remove the workaround codes for HADOOP-10622.~~ Unfortunately, there is another issue [HADOOP-14084](https://issues.apache.org/jira/browse/HADOOP-14084
      
      ) that prevents us from removing the workaround codes.
      - Remove unnecessary `writer: (T, OutputStream) => Unit` and just call `serialize` directly.
      - Remove catching FileNotFoundException.
      
      ## How was this patch tested?
      
      Jenkins
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #16932 from zsxwing/metadata-cleanup.
      
      (cherry picked from commit 21b4ba2d)
      Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
      88c43f4f
    • Felix Cheung's avatar
      [SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column · 6c353990
      Felix Cheung authored
      
      Add coalesce on DataFrame for down partitioning without shuffle and coalesce on Column
      
      manual, unit tests
      
      Author: Felix Cheung <felixcheung_m@hotmail.com>
      
      Closes #16739 from felixcheung/rcoalesce.
      
      (cherry picked from commit 671bc08e)
      Signed-off-by: default avatarFelix Cheung <felixcheung@apache.org>
      6c353990
  16. Feb 14, 2017
  17. Feb 13, 2017
    • Marcelo Vanzin's avatar
      [SPARK-19520][STREAMING] Do not encrypt data written to the WAL. · 7fe3543f
      Marcelo Vanzin authored
      
      Spark's I/O encryption uses an ephemeral key for each driver instance.
      So driver B cannot decrypt data written by driver A since it doesn't
      have the correct key.
      
      The write ahead log is used for recovery, thus needs to be readable by
      a different driver. So it cannot be encrypted by Spark's I/O encryption
      code.
      
      The BlockManager APIs used by the WAL code to write the data automatically
      encrypt data, so changes are needed so that callers can to opt out of
      encryption.
      
      Aside from that, the "putBytes" API in the BlockManager does not do
      encryption, so a separate situation arised where the WAL would write
      unencrypted data to the BM and, when those blocks were read, decryption
      would fail. So the WAL code needs to ask the BM to encrypt that data
      when encryption is enabled; this code is not optimal since it results
      in a (temporary) second copy of the data block in memory, but should be
      OK for now until a more performant solution is added. The non-encryption
      case should not be affected.
      
      Tested with new unit tests, and by running streaming apps that do
      recovery using the WAL data with I/O encryption turned on.
      
      Author: Marcelo Vanzin <vanzin@cloudera.com>
      
      Closes #16862 from vanzin/SPARK-19520.
      
      (cherry picked from commit 0169360e)
      Signed-off-by: default avatarMarcelo Vanzin <vanzin@cloudera.com>
      7fe3543f
    • Josh Rosen's avatar
      [SPARK-19529] TransportClientFactory.createClient() shouldn't call awaitUninterruptibly() · 5db23473
      Josh Rosen authored
      
      This patch replaces a single `awaitUninterruptibly()` call with a plain `await()` call in Spark's `network-common` library in order to fix a bug which may cause tasks to be uncancellable.
      
      In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls `awaitUninterruptibly()` on a Netty future while waiting for a connection to be established. This creates problem when a Spark task is interrupted while blocking in this call (which can happen in the event of a slow connection which will eventually time out). This has bad impacts on task cancellation when `interruptOnCancel = true`.
      
      As an example of the impact of this problem, I experienced significant numbers of uncancellable "zombie tasks" on a production cluster where several tasks were blocked trying to connect to a dead shuffle server and then continued running as zombies after I cancelled the associated Spark stage. The zombie tasks ran for several minutes with the following stack:
      
      ```
      java.lang.Object.wait(Native Method)
      java.lang.Object.wait(Object.java:460)
      io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607)
      io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301)
      org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224)
      org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) => holding Monitor(java.lang.Object1849476028})
      org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105)
      org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
      org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
      org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114)
      org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169)
      org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:
      350)
      org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286)
      org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120)
      org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
      org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169)
      org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      [...]
      ```
      
      As far as I can tell, `awaitUninterruptibly()` might have been used in order to avoid having to declare that methods throw `InterruptedException` (this code is written in Java, hence the need to use checked exceptions). This patch simply replaces this with a regular, interruptible `await()` call,.
      
      This required several interface changes to declare a new checked exception (these are internal interfaces, though, and this change doesn't significantly impact binary compatibility).
      
      An alternative approach would be to wrap `InterruptedException` into `IOException` in order to avoid having to change interfaces. The problem with this approach is that the `network-shuffle` project's `RetryingBlockFetcher` code treats `IOExceptions` as transitive failures when deciding whether to retry fetches, so throwing a wrapped `IOException` might cause an interrupted shuffle fetch to be retried, further prolonging the lifetime of a cancelled zombie task.
      
      Note that there are three other `awaitUninterruptibly()` in the codebase, but those calls have a hard 10 second timeout and are waiting on a `close()` operation which is expected to complete near instantaneously, so the impact of uninterruptibility there is much smaller.
      
      Manually.
      
      Author: Josh Rosen <joshrosen@databricks.com>
      
      Closes #16866 from JoshRosen/SPARK-19529.
      
      (cherry picked from commit 1c4d10b1)
      Signed-off-by: default avatarCheng Lian <lian@databricks.com>
      5db23473
Loading