Skip to content
Snippets Groups Projects
  1. Feb 07, 2017
    • Tathagata Das's avatar
      [SPARK-19413][SS] MapGroupsWithState for arbitrary stateful operations · aeb80348
      Tathagata Das authored
      ## What changes were proposed in this pull request?
      
      `mapGroupsWithState` is a new API for arbitrary stateful operations in Structured Streaming, similar to `DStream.mapWithState`
      
      *Requirements*
      - Users should be able to specify a function that can do the following
      - Access the input row corresponding to a key
      - Access the previous state corresponding to a key
      - Optionally, update or remove the state
      - Output any number of new rows (or none at all)
      
      *Proposed API*
      ```
      // ------------ New methods on KeyValueGroupedDataset ------------
      class KeyValueGroupedDataset[K, V] {
      	// Scala friendly
      	def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => U)
              def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => Iterator[U])
      	// Java friendly
             def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
             def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
      }
      
      // ------------------- New Java-friendly function classes -------------------
      public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
        R call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
      }
      public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable {
        Iterator<R> call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
      }
      
      // ---------------------- Wrapper class for state data ----------------------
      trait State[S] {
      	def exists(): Boolean
        	def get(): S 			// throws Exception is state does not exist
      	def getOption(): Option[S]
      	def update(newState: S): Unit
      	def remove(): Unit		// exists() will be false after this
      }
      ```
      
      Key Semantics of the State class
      - The state can be null.
      - If the state.remove() is called, then state.exists() will return false, and getOption will returm None.
      - After that state.update(newState) is called, then state.exists() will return true, and getOption will return Some(...).
      - None of the operations are thread-safe. This is to avoid memory barriers.
      
      *Usage*
      ```
      val stateFunc = (word: String, words: Iterator[String, runningCount: KeyedState[Long]) => {
          val newCount = words.size + runningCount.getOption.getOrElse(0L)
          runningCount.update(newCount)
         (word, newCount)
      }
      
      dataset					                        // type is Dataset[String]
        .groupByKey[String](w => w)        	                // generates KeyValueGroupedDataset[String, String]
        .mapGroupsWithState[Long, (String, Long)](stateFunc)	// returns Dataset[(String, Long)]
      ```
      
      ## How was this patch tested?
      New unit tests.
      
      Author: Tathagata Das <tathagata.das1565@gmail.com>
      
      Closes #16758 from tdas/mapWithState.
      aeb80348
    • gatorsmile's avatar
      [SPARK-19397][SQL] Make option names of LIBSVM and TEXT case insensitive · e33aaa2a
      gatorsmile authored
      ### What changes were proposed in this pull request?
      Prior to Spark 2.1, the option names are case sensitive for all the formats. Since Spark 2.1, the option key names become case insensitive except the format `Text` and `LibSVM `. This PR is to fix these issues.
      
      Also, add a check to know whether the input option vector type is legal for `LibSVM`.
      
      ### How was this patch tested?
      Added test cases
      
      Author: gatorsmile <gatorsmile@gmail.com>
      
      Closes #16737 from gatorsmile/libSVMTextOptions.
      e33aaa2a
    • Tyson Condie's avatar
      [SPARK-18682][SS] Batch Source for Kafka · 8df44440
      Tyson Condie authored
      ## What changes were proposed in this pull request?
      
      Today, you can start a stream that reads from kafka. However, given kafka's configurable retention period, it seems like sometimes you might just want to read all of the data that is available now. As such we should add a version that works with spark.read as well.
      The options should be the same as the streaming kafka source, with the following differences:
      startingOffsets should default to earliest, and should not allow latest (which would always be empty).
      endingOffsets should also be allowed and should default to latest. the same assign json format as startingOffsets should also be accepted.
      It would be really good, if things like .limit(n) were enough to prevent all the data from being read (this might just work).
      
      ## How was this patch tested?
      
      KafkaRelationSuite was added for testing batch queries via KafkaUtils.
      
      Author: Tyson Condie <tcondie@gmail.com>
      
      Closes #16686 from tcondie/SPARK-18682.
      8df44440
    • Herman van Hovell's avatar
      [SPARK-18609][SPARK-18841][SQL] Fix redundant Alias removal in the optimizer · 73ee7394
      Herman van Hovell authored
      ## What changes were proposed in this pull request?
      The optimizer tries to remove redundant alias only projections from the query plan using the `RemoveAliasOnlyProject` rule. The current rule identifies removes such a project and rewrites the project's attributes in the **entire** tree. This causes problems when parts of the tree are duplicated (for instance a self join on a temporary view/CTE)  and the duplicated part contains the alias only project, in this case the rewrite will break the tree.
      
      This PR fixes these problems by using a blacklist for attributes that are not to be moved, and by making sure that attribute remapping is only done for the parent tree, and not for unrelated parts of the query plan.
      
      The current tree transformation infrastructure works very well if the transformation at hand requires little or a global contextual information. In this case we need to know both the attributes that were not to be moved, and we also needed to know which child attributes were modified. This cannot be done easily using the current infrastructure, and solutions typically involves transversing the query plan multiple times (which is super slow). I have moved around some code in `TreeNode`, `QueryPlan` and `LogicalPlan`to make this much more straightforward; this basically allows you to manually traverse the tree.
      
      This PR subsumes the following PRs by windpiger:
      Closes https://github.com/apache/spark/pull/16267
      Closes https://github.com/apache/spark/pull/16255
      
      ## How was this patch tested?
      I have added unit tests to `RemoveRedundantAliasAndProjectSuite` and I have added integration tests to the `SQLQueryTestSuite.union` and `SQLQueryTestSuite.cte` test cases.
      
      Author: Herman van Hovell <hvanhovell@databricks.com>
      
      Closes #16757 from hvanhovell/SPARK-18609.
      73ee7394
    • Reynold Xin's avatar
      [SPARK-19495][SQL] Make SQLConf slightly more extensible · b7277e03
      Reynold Xin authored
      ## What changes were proposed in this pull request?
      This pull request makes SQLConf slightly more extensible by removing the visibility limitations on the build* functions.
      
      ## How was this patch tested?
      N/A - there are no logic changes and everything should be covered by existing unit tests.
      
      Author: Reynold Xin <rxin@databricks.com>
      
      Closes #16835 from rxin/SPARK-19495.
      b7277e03
    • anabranch's avatar
      [SPARK-16609] Add to_date/to_timestamp with format functions · 7a7ce272
      anabranch authored
      ## What changes were proposed in this pull request?
      
      This pull request adds two new user facing functions:
      - `to_date` which accepts an expression and a format and returns a date.
      - `to_timestamp` which accepts an expression and a format and returns a timestamp.
      
      For example, Given a date in format: `2016-21-05`. (YYYY-dd-MM)
      
      ### Date Function
      *Previously*
      ```
      to_date(unix_timestamp(lit("2016-21-05"), "yyyy-dd-MM").cast("timestamp"))
      ```
      *Current*
      ```
      to_date(lit("2016-21-05"), "yyyy-dd-MM")
      ```
      
      ### Timestamp Function
      *Previously*
      ```
      unix_timestamp(lit("2016-21-05"), "yyyy-dd-MM").cast("timestamp")
      ```
      *Current*
      ```
      to_timestamp(lit("2016-21-05"), "yyyy-dd-MM")
      ```
      ### Tasks
      
      - [X] Add `to_date` to Scala Functions
      - [x] Add `to_date` to Python Functions
      - [x] Add `to_date` to SQL Functions
      - [X] Add `to_timestamp` to Scala Functions
      - [x] Add `to_timestamp` to Python Functions
      - [x] Add `to_timestamp` to SQL Functions
      - [x] Add function to R
      
      ## How was this patch tested?
      
      - [x] Add Functions to `DateFunctionsSuite`
      - Test new `ParseToTimestamp` Expression (*not necessary*)
      - Test new `ParseToDate` Expression (*not necessary*)
      - [x] Add test for R
      - [x] Add test for Python in test.py
      
      Please review http://spark.apache.org/contributing.html before opening a pull request.
      
      Author: anabranch <wac.chambers@gmail.com>
      Author: Bill Chambers <bill@databricks.com>
      Author: anabranch <bill@databricks.com>
      
      Closes #16138 from anabranch/SPARK-16609.
      7a7ce272
    • Ala Luszczak's avatar
      [SPARK-19447] Fixing input metrics for range operator. · 6ed285c6
      Ala Luszczak authored
      ## What changes were proposed in this pull request?
      
      This change introduces a new metric "number of generated rows". It is used exclusively for Range, which is a leaf in the query tree, yet doesn't read any input data, and therefore cannot report "recordsRead".
      
      Additionally the way in which the metrics are reported by the JIT-compiled version of Range was changed. Previously, it was immediately reported that all the records were produced. This could be confusing for a user monitoring execution progress in the UI. Now, the metric is updated gradually.
      
      In order to avoid negative impact on Range performance, the code generation was reworked. The values are now produced in batches in the tighter inner loop, while the metrics are updated in the outer loop.
      
      The change also contains a number of unit tests, which should help ensure the correctness of metrics for various input sources.
      
      ## How was this patch tested?
      
      Unit tests.
      
      Author: Ala Luszczak <ala@databricks.com>
      
      Closes #16829 from ala/SPARK-19447.
      6ed285c6
    • gagan taneja's avatar
      [SPARK-19118][SQL] Percentile support for frequency distribution table · e99e34d0
      gagan taneja authored
      ## What changes were proposed in this pull request?
      
      I have a frequency distribution table with following entries
      Age,    No of person
      21, 10
      22, 15
      23, 18
      ..
      ..
      30, 14
      Moreover it is common to have data in frequency distribution format to further calculate Percentile, Median. With current implementation
      It would be very difficult and complex to find the percentile.
      Therefore i am proposing enhancement to current Percentile and Approx Percentile implementation to take frequency distribution column into consideration
      
      ## How was this patch tested?
      1) Enhanced /sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala to cover the additional functionality
      2) Run some performance benchmark test with 20 million row in local environment and did not see any performance degradation
      
      Please review http://spark.apache.org/contributing.html before opening a pull request.
      
      Author: gagan taneja <tanejagagan@gagans-MacBook-Pro.local>
      
      Closes #16497 from tanejagagan/branch-18940.
      e99e34d0
    • hyukjinkwon's avatar
      [SPARK-16101][SQL] Refactoring CSV schema inference path to be consistent with JSON · 3d314d08
      hyukjinkwon authored
      ## What changes were proposed in this pull request?
      
      This PR refactors CSV schema inference path to be consistent with JSON data source and moves some filtering codes having the similar/same logics into `CSVUtils`.
      
       It makes the methods in classes have consistent arguments with JSON ones. (this PR renames `.../json/InferSchema.scala` → `.../json/JsonInferSchema.scala`)
      
      `CSVInferSchema` and `JsonInferSchema`
      
      ``` scala
      private[csv] object CSVInferSchema {
        ...
      
        def infer(
            csv: Dataset[String],
            caseSensitive: Boolean,
            options: CSVOptions): StructType = {
        ...
      ```
      
      ``` scala
      private[sql] object JsonInferSchema {
        ...
      
        def infer(
            json: RDD[String],
            columnNameOfCorruptRecord: String,
            configOptions: JSONOptions): StructType = {
        ...
      ```
      
      These allow schema inference from `Dataset[String]` directly, meaning the similar functionalities that use `JacksonParser`/`JsonInferSchema` for JSON can be easily implemented by `UnivocityParser`/`CSVInferSchema` for CSV.
      
      This completes refactoring CSV datasource and they are now pretty consistent.
      
      ## How was this patch tested?
      
      Existing tests should cover this and
      
      ```
      ./dev/change-scala-version.sh 2.10
      ./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package
      ```
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #16680 from HyukjinKwon/SPARK-16101-schema-inference.
      3d314d08
    • zuotingbing's avatar
      [SPARK-19260] Spaces or "%20" in path parameter are not correctly handled with… · 8fd178d2
      zuotingbing authored
      JIRA Issue: https://issues.apache.org/jira/browse/SPARK-19260
      
      ## What changes were proposed in this pull request?
      
      1. “spark.history.fs.logDirectory” supports with space character and “%20” characters.
      2. As usually, if the run classpath includes hdfs-site.xml and core-site.xml files, the supplied path eg."/test" which does not contain a scheme should be taken as a HDFS path rather than a local path since the path parameter is a Hadoop dir.
      
      ## How was this patch tested?
      Update Unit Test and take some manual tests
      
      local:
      .sbin/start-history-server.sh "file:/a b"
      .sbin/start-history-server.sh "/abc%20c" (without hdfs-site.xml,core-site.xml)
      .sbin/start-history-server.sh "/a b" (without hdfs-site.xml,core-site.xml)
      .sbin/start-history-server.sh "/a b/a bc%20c" (without hdfs-site.xml,core-site.xml)
      
      hdfs:
      .sbin/start-history-server.sh "hdfs:/namenode:9000/a b"
      .sbin/start-history-server.sh "/a b" (with hdfs-site.xml,core-site.xml)
      .sbin/start-history-server.sh "/a b/a bc%20c" (with hdfs-site.xml,core-site.xml)
      
      Author: zuotingbing <zuo.tingbing9@zte.com.cn>
      
      Closes #16614 from zuotingbing/SPARK-19260.
      Unverified
      8fd178d2
    • Aseem Bansal's avatar
      [SPARK-19444][ML][DOCUMENTATION] Fix imports not being present in documentation · aee2bd2c
      Aseem Bansal authored
      ## What changes were proposed in this pull request?
      
      SPARK-19444 imports not being present in documentation
      
      ## How was this patch tested?
      
      Manual
      
      ## Disclaimer
      
      Contribution is original work and I license the work to the project under the project’s open source license
      
      Author: Aseem Bansal <anshbansal@users.noreply.github.com>
      
      Closes #16789 from anshbansal/patch-1.
      Unverified
      aee2bd2c
    • Eyal Farago's avatar
      [SPARK-18601][SQL] Simplify Create/Get complex expression pairs in optimizer · a97edc2c
      Eyal Farago authored
      ## What changes were proposed in this pull request?
      It often happens that a complex object (struct/map/array) is created only to get elements from it in an subsequent expression. We can add an optimizer rule for this.
      
      ## How was this patch tested?
      unit-tests
      
      Please review http://spark.apache.org/contributing.html before opening a pull request.
      
      Author: Eyal Farago <eyal@nrgene.com>
      Author: eyal farago <eyal.farago@gmail.com>
      
      Closes #16043 from eyalfa/SPARK-18601.
      a97edc2c
    • Imran Rashid's avatar
      [SPARK-18967][SCHEDULER] compute locality levels even if delay = 0 · d9043092
      Imran Rashid authored
      ## What changes were proposed in this pull request?
      
      Before this change, with delay scheduling off, spark would effectively
      ignore locality preferences for bulk scheduling.  With this change,
      locality preferences are used when multiple offers are made
      simultaneously.
      
      ## How was this patch tested?
      
      Test case added which fails without this change.  All unit tests run via jenkins.
      
      Author: Imran Rashid <irashid@cloudera.com>
      
      Closes #16376 from squito/locality_without_delay.
      d9043092
  2. Feb 06, 2017
    • uncleGen's avatar
      [SPARK-19407][SS] defaultFS is used FileSystem.get instead of getting it from uri scheme · 7a0a630e
      uncleGen authored
      ## What changes were proposed in this pull request?
      
      ```
      Caused by: java.lang.IllegalArgumentException: Wrong FS: s3a://**************/checkpoint/7b2231a3-d845-4740-bfa3-681850e5987f/metadata, expected: file:///
      	at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:649)
      	at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:82)
      	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:606)
      	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
      	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
      	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
      	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
      	at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51)
      	at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100)
      	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
      	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
      	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
      ```
      
      Can easily replicate on spark standalone cluster by providing checkpoint location uri scheme anything other than "file://" and not overriding in config.
      
      WorkAround  --conf spark.hadoop.fs.defaultFS=s3a://somebucket or set it in sparkConf or spark-default.conf
      
      ## How was this patch tested?
      
      existing ut
      
      Author: uncleGen <hustyugm@gmail.com>
      
      Closes #16815 from uncleGen/SPARK-19407.
      7a0a630e
    • zero323's avatar
      [SPARK-19467][ML][PYTHON] Remove cyclic imports from pyspark.ml.pipeline · fab0d62a
      zero323 authored
      ## What changes were proposed in this pull request?
      
      Remove cyclic imports between `pyspark.ml.pipeline` and `pyspark.ml`.
      
      ## How was this patch tested?
      
      Existing unit tests.
      
      Author: zero323 <zero323@users.noreply.github.com>
      
      Closes #16814 from zero323/SPARK-19467.
      fab0d62a
    • gatorsmile's avatar
      [SPARK-19441][SQL] Remove IN type coercion from PromoteStrings · d6dc603e
      gatorsmile authored
      ### What changes were proposed in this pull request?
      The removed codes for `IN` are not reachable, because the previous rule `InConversion` already resolves the type coercion issues.
      
      ### How was this patch tested?
      N/A
      
      Author: gatorsmile <gatorsmile@gmail.com>
      
      Closes #16783 from gatorsmile/typeCoercionIn.
      d6dc603e
    • Herman van Hovell's avatar
      [SPARK-19472][SQL] Parser should not mistake CASE WHEN(...) for a function call · cb2677b8
      Herman van Hovell authored
      ## What changes were proposed in this pull request?
      The SQL parser can mistake a `WHEN (...)` used in `CASE` for a function call. This happens in cases like the following:
      ```sql
      select case when (1) + case when 1 > 0 then 1 else 0 end = 2 then 1 else 0 end
      from tb
      ```
      This PR fixes this by re-organizing the case related parsing rules.
      
      ## How was this patch tested?
      Added a regression test to the `ExpressionParserSuite`.
      
      Author: Herman van Hovell <hvanhovell@databricks.com>
      
      Closes #16821 from hvanhovell/SPARK-19472.
      cb2677b8
    • Jin Xing's avatar
      [SPARK-19398] Change one misleading log in TaskSetManager. · d33021b3
      Jin Xing authored
      ## What changes were proposed in this pull request?
      
      Log below is misleading:
      
      ```
      if (successful(index)) {
        logInfo(
          s"Task ${info.id} in stage ${taskSet.id} (TID $tid) failed, " +
          "but another instance of the task has already succeeded, " +
          "so not re-queuing the task to be re-executed.")
      }
      ```
      
      If fetch failed, the task is marked as successful in `TaskSetManager:: handleFailedTask`. Then log above will be printed. The `successful` just means task will not be scheduled any longer, not a real success.
      
      ## How was this patch tested?
      Existing unit tests can cover this.
      
      Author: jinxing <jinxing@meituan.com>
      
      Closes #16738 from jinxing64/SPARK-19398.
      d33021b3
    • Wenchen Fan's avatar
      [SPARK-19080][SQL] simplify data source analysis · aff53021
      Wenchen Fan authored
      ## What changes were proposed in this pull request?
      
      The current way of resolving `InsertIntoTable` and `CreateTable` is convoluted: sometimes we replace them with concrete implementation commands during analysis, sometimes during planning phase.
      
      And the error checking logic is also a mess: we may put it in extended analyzer rules, or extended checking rules, or `CheckAnalysis`.
      
      This PR simplifies the data source analysis:
      
      1.  `InsertIntoTable` and `CreateTable` are always unresolved and need to be replaced by concrete implementation commands during analysis.
      2. The error checking logic is mainly in 2 rules: `PreprocessTableCreation` and `PreprocessTableInsertion`.
      
      ## How was this patch tested?
      
      existing test.
      
      Author: Wenchen Fan <wenchen@databricks.com>
      
      Closes #16269 from cloud-fan/ddl.
      aff53021
    • hyukjinkwon's avatar
      [SPARK-17213][SQL][FOLLOWUP] Re-enable Parquet filter tests for binary and string · 0f16ff5b
      hyukjinkwon authored
      ## What changes were proposed in this pull request?
      
      This PR proposes to enable the tests for Parquet filter pushdown with binary and string.
      
      This was disabled in https://github.com/apache/spark/pull/16106 due to Parquet's issue but it is now revived in https://github.com/apache/spark/pull/16791 after upgrading Parquet to 1.8.2.
      
      ## How was this patch tested?
      
      Manually tested `ParquetFilterSuite` via IDE.
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #16817 from HyukjinKwon/SPARK-17213.
      0f16ff5b
    • erenavsarogullari's avatar
      [SPARK-17663][CORE] SchedulableBuilder should handle invalid data access via... · 7beb227c
      erenavsarogullari authored
      [SPARK-17663][CORE] SchedulableBuilder should handle invalid data access via scheduler.allocation.file
      
      ## What changes were proposed in this pull request?
      
      If `spark.scheduler.allocation.file` has invalid `minShare` or/and `weight` values, these cause :
      - `NumberFormatException` due to `toInt` function
      - `SparkContext` can not be initialized.
      - It does not show meaningful error message to user.
      
      In a nutshell, this functionality can be more robust by selecting one of the following flows :
      
      **1-** Currently, if `schedulingMode` has an invalid value, a warning message is logged and default value is set as `FIFO`. Same pattern can be used for `minShare`(default: 0) and `weight`(default: 1) as well
      **2-** Meaningful error message can be shown to the user for all invalid cases.
      
      PR offers :
      - `schedulingMode` handles just empty values. It also needs to be supported for **whitespace**, **non-uppercase**(fair, FaIr etc...) or `SchedulingMode.NONE` cases by setting default value(`FIFO`)
      - `minShare` and `weight` handle just empty values. They also need to be supported for **non-integer** cases by setting default values.
      - Some refactoring of `PoolSuite`.
      
      **Code to Reproduce :**
      
      ```
      val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local")
      conf.set("spark.scheduler.mode", "FAIR")
      conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-invalid-data.xml")
      val sc = new SparkContext(conf)
      ```
      
      **fairscheduler-invalid-data.xml :**
      
      ```
      <allocations>
          <pool name="production">
              <schedulingMode>FIFO</schedulingMode>
              <weight>invalid_weight</weight>
              <minShare>2</minShare>
          </pool>
      </allocations>
      ```
      
      **Stacktrace :**
      
      ```
      Exception in thread "main" java.lang.NumberFormatException: For input string: "invalid_weight"
          at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
          at java.lang.Integer.parseInt(Integer.java:580)
          at java.lang.Integer.parseInt(Integer.java:615)
          at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
          at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
          at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:127)
          at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:102)
      ```
      ## How was this patch tested?
      
      Added Unit Test Case.
      
      Author: erenavsarogullari <erenavsarogullari@gmail.com>
      
      Closes #15237 from erenavsarogullari/SPARK-17663.
      7beb227c
    • Cheng Lian's avatar
      [SPARK-19409][SPARK-17213] Cleanup Parquet workarounds/hacks due to bugs of old Parquet versions · 7730426c
      Cheng Lian authored
      ## What changes were proposed in this pull request?
      
      We've already upgraded parquet-mr to 1.8.2. This PR does some further cleanup by removing a workaround of PARQUET-686 and a hack due to PARQUET-363 and PARQUET-278. All three Parquet issues are fixed in parquet-mr 1.8.2.
      
      ## How was this patch tested?
      
      Existing unit tests.
      
      Author: Cheng Lian <lian@databricks.com>
      
      Closes #16791 from liancheng/parquet-1.8.2-cleanup.
      7730426c
  3. Feb 05, 2017
    • gatorsmile's avatar
      [SPARK-19279][SQL] Infer Schema for Hive Serde Tables and Block Creating a... · 65b10ffb
      gatorsmile authored
      [SPARK-19279][SQL] Infer Schema for Hive Serde Tables and Block Creating a Hive Table With an Empty Schema
      
      ### What changes were proposed in this pull request?
      So far, we allow users to create a table with an empty schema: `CREATE TABLE tab1`. This could break many code paths if we enable it. Thus, we should follow Hive to block it.
      
      For Hive serde tables, some serde libraries require the specified schema and record it in the metastore. To get the list, we need to check `hive.serdes.using.metastore.for.schema,` which contains a list of serdes that require user-specified schema. The default values are
      
      - org.apache.hadoop.hive.ql.io.orc.OrcSerde
      - org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      - org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe
      - org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe
      - org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe
      - org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
      - org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
      - org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
      
      ### How was this patch tested?
      Added test cases for both Hive and data source tables
      
      Author: gatorsmile <gatorsmile@gmail.com>
      
      Closes #16636 from gatorsmile/fixEmptyTableSchema.
      65b10ffb
    • Zheng RuiFeng's avatar
      [SPARK-19421][ML][PYSPARK] Remove numClasses and numFeatures methods in LinearSVC · 317fa750
      Zheng RuiFeng authored
      ## What changes were proposed in this pull request?
      Methods `numClasses` and `numFeatures` in LinearSVCModel are already usable by inheriting `JavaClassificationModel`
      we should not explicitly add them.
      
      ## How was this patch tested?
      existing tests
      
      Author: Zheng RuiFeng <ruifengz@foxmail.com>
      
      Closes #16727 from zhengruifeng/nits_in_linearSVC.
      317fa750
    • Asher Krim's avatar
      [SPARK-19247][ML] Save large word2vec models · b3e89802
      Asher Krim authored
      ## What changes were proposed in this pull request?
      
      * save word2vec models as distributed files rather than as one large datum. Backwards compatibility with the previous save format is maintained by checking for the "wordIndex" column
      * migrate the fix for loading large models (SPARK-11994) to ml word2vec
      
      ## How was this patch tested?
      
      Tested loading the new and old formats locally
      
      srowen yanboliang MLnick
      
      Author: Asher Krim <akrim@hubspot.com>
      
      Closes #16607 from Krimit/saveLargeModels.
      b3e89802
    • actuaryzhang's avatar
      [SPARK-19452][SPARKR] Fix bug in the name assignment method · b94f4b6f
      actuaryzhang authored
      ## What changes were proposed in this pull request?
      The names method fails to check for validity of the assignment values. This can be fixed by calling colnames within names.
      
      ## How was this patch tested?
      new tests.
      
      Author: actuaryzhang <actuaryzhang10@gmail.com>
      
      Closes #16794 from actuaryzhang/sparkRNames.
      b94f4b6f
  4. Feb 04, 2017
    • Liang-Chi Hsieh's avatar
      [SPARK-19425][SQL] Make ExtractEquiJoinKeys support UDT columns · 0674e7eb
      Liang-Chi Hsieh authored
      ## What changes were proposed in this pull request?
      
      DataFrame.except doesn't work for UDT columns. It is because `ExtractEquiJoinKeys` will run `Literal.default` against UDT. However, we don't handle UDT in `Literal.default` and an exception will throw like:
      
          java.lang.RuntimeException: no default for type
          org.apache.spark.ml.linalg.VectorUDT3bfc3ba7
            at org.apache.spark.sql.catalyst.expressions.Literal$.default(literals.scala:179)
            at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:117)
            at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:110)
      
      More simple fix is just let `Literal.default` handle UDT by its sql type. So we can use more efficient join type on UDT.
      
      Besides `except`, this also fixes other similar scenarios, so in summary this fixes:
      
      * `except` on two Datasets with UDT
      * `intersect` on two Datasets with UDT
      * `Join` with the join conditions using `<=>` on UDT columns
      
      ## How was this patch tested?
      
      Jenkins tests.
      
      Please review http://spark.apache.org/contributing.html before opening a pull request.
      
      Author: Liang-Chi Hsieh <viirya@gmail.com>
      
      Closes #16765 from viirya/df-except-for-udt.
      0674e7eb
    • hyukjinkwon's avatar
      [SPARK-19446][SQL] Remove unused findTightestCommonType in TypeCoercion · 2f3c20bb
      hyukjinkwon authored
      ## What changes were proposed in this pull request?
      
      This PR proposes to
      
      - remove unused `findTightestCommonType` in `TypeCoercion` as suggested in https://github.com/apache/spark/pull/16777#discussion_r99283834
      - rename `findTightestCommonTypeOfTwo ` to `findTightestCommonType`.
      - fix comments accordingly
      
      The usage was removed while refactoring/fixing in several JIRAs such as SPARK-16714, SPARK-16735 and SPARK-16646
      
      ## How was this patch tested?
      
      Existing tests.
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #16786 from HyukjinKwon/SPARK-19446.
      2f3c20bb
  5. Feb 03, 2017
    • Reynold Xin's avatar
      [SPARK-10063] Follow-up: remove dead code related to an old output committer. · 22d4aae8
      Reynold Xin authored
      ## What changes were proposed in this pull request?
      DirectParquetOutputCommitter was removed from Spark as it was deemed unsafe to use. We however still have some code to generate warning. This patch removes those code as well.
      
      ## How was this patch tested?
      N/A
      
      Author: Reynold Xin <rxin@databricks.com>
      
      Closes #16796 from rxin/remove-direct.
      22d4aae8
    • actuaryzhang's avatar
      [SPARK-19386][SPARKR][FOLLOWUP] fix error in vignettes · 050c20cc
      actuaryzhang authored
      ## What changes were proposed in this pull request?
      
      Current version has error in vignettes:
      ```
      model <- spark.bisectingKmeans(df, Sepal_Length ~ Sepal_Width, k = 4)
      summary(kmeansModel)
      ```
      
      `kmeansModel` does not exist...
      
      felixcheung wangmiao1981
      
      Author: actuaryzhang <actuaryzhang10@gmail.com>
      
      Closes #16799 from actuaryzhang/sparkRVignettes.
      050c20cc
    • krishnakalyan3's avatar
      [SPARK-19386][SPARKR][DOC] Bisecting k-means in SparkR documentation · 48aafeda
      krishnakalyan3 authored
      ## What changes were proposed in this pull request?
      Update programming guide, example and vignette with Bisecting k-means.
      
      Author: krishnakalyan3 <krishnakalyan3@gmail.com>
      
      Closes #16767 from krishnakalyan3/bisecting-kmeans.
      48aafeda
    • Liang-Chi Hsieh's avatar
      [SPARK-19244][CORE] Sort MemoryConsumers according to their memory usage when spilling · 2f523fa0
      Liang-Chi Hsieh authored
      ## What changes were proposed in this pull request?
      
      In `TaskMemoryManager `, when we acquire memory by calling `acquireExecutionMemory` and we can't acquire required memory, we will try to spill other memory consumers.
      
      Currently, we simply iterates the memory consumers in a hash set. Normally each time the consumer will be iterated in the same order.
      
      The first issue is that we might spill additional consumers. For example, if consumer 1 uses 10MB, consumer 2 uses 50MB, then consumer 3 acquires 100MB but we can only get 60MB and spilling is needed. We might spill both consumer 1 and consumer 2. But we actually just need to spill consumer 2 and get the required 100MB.
      
      The second issue is that if we spill consumer 1 in first time spilling. After a while, consumer 1 now uses 5MB. Then consumer 4 may acquire some memory and spilling is needed again. Because we iterate the memory consumers in the same order, we will spill consumer 1 again. So for consumer 1, we will produce many small spilling files.
      
      This patch modifies the way iterating the memory consumers. It sorts the memory consumers by their memory usage. So the consumer using more memory will spill first. Once it is spilled, even it acquires few memory again, in next time spilling happens it will not be the consumers to spill again if there are other consumers using more memory than it.
      
      ## How was this patch tested?
      
      Jenkins tests.
      
      Please review http://spark.apache.org/contributing.html before opening a pull request.
      
      Author: Liang-Chi Hsieh <viirya@gmail.com>
      
      Closes #16603 from viirya/sort-memoryconsumer-when-spill.
      2f523fa0
    • Dongjoon Hyun's avatar
      [SPARK-18909][SQL] The error messages in `ExpressionEncoder.toRow/fromRow` are too verbose · 52d4f619
      Dongjoon Hyun authored
      ## What changes were proposed in this pull request?
      
      In `ExpressionEncoder.toRow` and `fromRow`, we catch the exception and output `treeString` of serializer/deserializer expressions in the error message. However, encoder can be very complex and the serializer/deserializer expressions can be very large trees and blow up the log files(e.g. generate over 500mb logs for this single error message.) As a first attempt, this PR try to use `simpleString` instead.
      
      **BEFORE**
      
      ```scala
      scala> :paste
      // Entering paste mode (ctrl-D to finish)
      
      case class TestCaseClass(value: Int)
      import spark.implicits._
      Seq(TestCaseClass(1)).toDS().collect()
      
      // Exiting paste mode, now interpreting.
      
      java.lang.RuntimeException: Error while decoding: java.lang.NullPointerException
      newInstance(class TestCaseClass)
      +- assertnotnull(input[0, int, false], - field (class: "scala.Int", name: "value"), - root class: "TestCaseClass")
         +- input[0, int, false]
      
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:303)
      ...
      ```
      
      **AFTER**
      
      ```scala
      ...
      // Exiting paste mode, now interpreting.
      
      java.lang.RuntimeException: Error while decoding: java.lang.NullPointerException
      newInstance(class TestCaseClass)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:303)
      ...
      ```
      
      ## How was this patch tested?
      
      Manual.
      
      Author: Dongjoon Hyun <dongjoon@apache.org>
      
      Closes #16701 from dongjoon-hyun/SPARK-18909-EXPR-ERROR.
      52d4f619
    • Sean Owen's avatar
      [BUILD] Close stale PRs · 20b4ca14
      Sean Owen authored
      Closes #15736
      Closes #16309
      Closes #16485
      Closes #16502
      Closes #16196
      Closes #16498
      Closes #12380
      Closes #16764
      
      Closes #14394
      Closes #14204
      Closes #14027
      Closes #13690
      Closes #16279
      
      Author: Sean Owen <sowen@cloudera.com>
      
      Closes #16778 from srowen/CloseStalePRs.
      Unverified
      20b4ca14
    • Liang-Chi Hsieh's avatar
      [SPARK-19411][SQL] Remove the metadata used to mark optional columns in merged... · bf493686
      Liang-Chi Hsieh authored
      [SPARK-19411][SQL] Remove the metadata used to mark optional columns in merged Parquet schema for filter predicate pushdown
      
      ## What changes were proposed in this pull request?
      
      There is a metadata introduced before to mark the optional columns in merged Parquet schema for filter predicate pushdown. As we upgrade to Parquet 1.8.2 which includes the fix for the pushdown of optional columns, we don't need this metadata now.
      
      ## How was this patch tested?
      
      Jenkins tests.
      
      Please review http://spark.apache.org/contributing.html before opening a pull request.
      
      Author: Liang-Chi Hsieh <viirya@gmail.com>
      
      Closes #16756 from viirya/remove-optional-metadata.
      bf493686
    • jinxing's avatar
      [SPARK-19437] Rectify spark executor id in HeartbeatReceiverSuite. · c86a57f4
      jinxing authored
      ## What changes were proposed in this pull request?
      
      The current code in `HeartbeatReceiverSuite`, executorId is set as below:
      ```
        private val executorId1 = "executor-1"
        private val executorId2 = "executor-2"
      ```
      
      The executorId is sent to driver when register as below:
      
      ```
      test("expire dead hosts should kill executors with replacement (SPARK-8119)")  {
        ...
        fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
            RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty))
        ...
      }
      ```
      
      Receiving `RegisterExecutor` in `CoarseGrainedSchedulerBackend`, the executorId will be compared with `currentExecutorIdCounter` as below:
      ```
      case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls)  =>
        if (executorDataMap.contains(executorId)) {
          executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
          context.reply(true)
        } else {
        ...
        executorDataMap.put(executorId, data)
        if (currentExecutorIdCounter < executorId.toInt) {
          currentExecutorIdCounter = executorId.toInt
        }
        ...
      ```
      
      `executorId.toInt` will cause NumberformatException.
      
      This unit test can pass currently because of `askWithRetry`, when catching exception, RPC will call again, thus it will go `if` branch and return true.
      
      **To fix**
      Rectify executorId and replace `askWithRetry` with `askSync`, refer to https://github.com/apache/spark/pull/16690
      ## How was this patch tested?
      This fix is for unit test and no need to add another one.(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
      
      Author: jinxing <jinxing@meituan.com>
      
      Closes #16779 from jinxing64/SPARK-19437.
      c86a57f4
  6. Feb 02, 2017
  7. Feb 01, 2017
    • Shixiong Zhu's avatar
      [SPARK-19432][CORE] Fix an unexpected failure when connecting timeout · 8303e20c
      Shixiong Zhu authored
      ## What changes were proposed in this pull request?
      
      When connecting timeout, `ask` may fail with a confusing message:
      
      ```
      17/02/01 23:15:19 INFO Worker: Connecting to master ...
      java.lang.IllegalArgumentException: requirement failed: TransportClient has not yet been set.
              at scala.Predef$.require(Predef.scala:224)
              at org.apache.spark.rpc.netty.RpcOutboxMessage.onTimeout(Outbox.scala:70)
              at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$ask$1.applyOrElse(NettyRpcEnv.scala:232)
              at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$ask$1.applyOrElse(NettyRpcEnv.scala:231)
              at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:138)
              at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
              at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
      ```
      
      It's better to provide a meaningful message.
      
      ## How was this patch tested?
      
      Jenkins
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #16773 from zsxwing/connect-timeout.
      8303e20c
    • Zheng RuiFeng's avatar
      [SPARK-14352][SQL] approxQuantile should support multi columns · b0985764
      Zheng RuiFeng authored
      ## What changes were proposed in this pull request?
      
      1, add the multi-cols support based on current private api
      2, add the multi-cols support to pyspark
      ## How was this patch tested?
      
      unit tests
      
      Author: Zheng RuiFeng <ruifengz@foxmail.com>
      Author: Ruifeng Zheng <ruifengz@foxmail.com>
      
      Closes #12135 from zhengruifeng/quantile4multicols.
      b0985764
    • jinxing's avatar
      [SPARK-19347] ReceiverSupervisorImpl can add block to ReceiverTracker multiple... · c5fcb7f6
      jinxing authored
      [SPARK-19347] ReceiverSupervisorImpl can add block to ReceiverTracker multiple times because of askWithRetry.
      
      ## What changes were proposed in this pull request?
      
      `ReceiverSupervisorImpl` on executor side reports block's meta back to `ReceiverTracker` on driver side. In current code, `askWithRetry` is used. However, for `AddBlock`, `ReceiverTracker` is not idempotent, which may result in messages are processed multiple times.
      
      **To reproduce**:
      
      1. Check if it is the first time receiving `AddBlock` in `ReceiverTracker`, if so sleep long enough(say 200 seconds), thus the first RPC call will be timeout in `askWithRetry`, then `AddBlock` will be resent.
      2. Rebuild Spark and run following job:
      ```
        def streamProcessing(): Unit = {
          val conf = new SparkConf()
            .setAppName("StreamingTest")
            .setMaster(masterUrl)
          val ssc = new StreamingContext(conf, Seconds(200))
          val stream = ssc.socketTextStream("localhost", 1234)
          stream.print()
          ssc.start()
          ssc.awaitTermination()
        }
      ```
      **To fix**:
      
      It makes sense to provide a blocking version `ask` in RpcEndpointRef, as mentioned in SPARK-18113 (https://github.com/apache/spark/pull/16503#event-927953218). Because Netty RPC layer will not drop messages. `askWithRetry` is a leftover from akka days. It imposes restrictions on the caller(e.g. idempotency) and other things that people generally don't pay that much attention to when using it.
      
      ## How was this patch tested?
      Test manually. The scenario described above doesn't happen with this patch.
      
      Author: jinxing <jinxing@meituan.com>
      
      Closes #16690 from jinxing64/SPARK-19347.
      c5fcb7f6
Loading