Skip to content
Snippets Groups Projects
  1. Mar 12, 2017
    • uncleGen's avatar
      [SPARK-19853][SS] uppercase kafka topics fail when startingOffsets are SpecificOffsets · 0a4d06a7
      uncleGen authored
      When using the KafkaSource with Structured Streaming, consumer assignments are not what the user expects if startingOffsets is set to an explicit set of topics/partitions in JSON where the topic(s) happen to have uppercase characters. When StartingOffsets is constructed, the original string value from options is transformed toLowerCase to make matching on "earliest" and "latest" case insensitive. However, the toLowerCase JSON is passed to SpecificOffsets for the terminal condition, so topic names may not be what the user intended by the time assignments are made with the underlying KafkaConsumer.
      
      KafkaSourceProvider.scala:
      ```
      val startingOffsets = caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
          case Some("latest") => LatestOffsets
          case Some("earliest") => EarliestOffsets
          case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json))
          case None => LatestOffsets
        }
      ```
      
      Thank cbowden for reporting.
      
      Jenkins
      
      Author: uncleGen <hustyugm@gmail.com>
      
      Closes #17209 from uncleGen/SPARK-19853.
      0a4d06a7
    • Xin Ren's avatar
      [SPARK-19282][ML][SPARKR] RandomForest Wrapper and GBT Wrapper return param "maxDepth" to R models · 9f8ce482
      Xin Ren authored
      ## What changes were proposed in this pull request?
      
      RandomForest R Wrapper and GBT R Wrapper return param `maxDepth` to R models.
      
      Below 4 R wrappers are changed:
      * `RandomForestClassificationWrapper`
      * `RandomForestRegressionWrapper`
      * `GBTClassificationWrapper`
      * `GBTRegressionWrapper`
      
      ## How was this patch tested?
      
      Test manually on my local machine.
      
      Author: Xin Ren <iamshrek@126.com>
      
      Closes #17207 from keypointt/SPARK-19282.
      9f8ce482
    • xiaojian.fxj's avatar
      [SPARK-19831][CORE] Reuse the existing cleanupThreadExecutor to clean up the... · 2f5187bd
      xiaojian.fxj authored
      [SPARK-19831][CORE] Reuse the existing cleanupThreadExecutor to clean up the directories of finished applications to avoid the block
      
      Cleaning the application may cost much time at worker, then it will block that the worker send heartbeats master because the worker is extend ThreadSafeRpcEndpoint. If the heartbeat from a worker is blocked by the message ApplicationFinished, master will think the worker is dead. If the worker has a driver, the driver will be scheduled by master again.
      It had better reuse the existing cleanupThreadExecutor to clean up the directories of finished applications to avoid the block.
      
      Author: xiaojian.fxj <xiaojian.fxj@alibaba-inc.com>
      
      Closes #17189 from hustfxj/worker-hearbeat.
      2f5187bd
    • uncleGen's avatar
      [DOCS][SS] fix structured streaming python example · e29a74d5
      uncleGen authored
      ## What changes were proposed in this pull request?
      
      - SS python example: `TypeError: 'xxx' object is not callable`
      - some other doc issue.
      
      ## How was this patch tested?
      
      Jenkins.
      
      Author: uncleGen <hustyugm@gmail.com>
      
      Closes #17257 from uncleGen/docs-ss-python.
      e29a74d5
  2. Mar 10, 2017
    • windpiger's avatar
      [SPARK-19723][SQL] create datasource table with an non-existent location should work · f6fdf92d
      windpiger authored
      ## What changes were proposed in this pull request?
      
      This JIRA is a follow up work after [SPARK-19583](https://issues.apache.org/jira/browse/SPARK-19583)
      
      As we discussed in that [PR](https://github.com/apache/spark/pull/16938)
      
      The following DDL for datasource table with an non-existent location should work:
      ```
      CREATE TABLE ... (PARTITIONED BY ...) LOCATION path
      ```
      Currently it will throw exception that path not exists for datasource table for datasource table
      
      ## How was this patch tested?
      unit test added
      
      Author: windpiger <songjun@outlook.com>
      
      Closes #17055 from windpiger/CTDataSourcePathNotExists.
      f6fdf92d
    • Wenchen Fan's avatar
      [SPARK-19893][SQL] should not run DataFrame set oprations with map type · fb9beda5
      Wenchen Fan authored
      ## What changes were proposed in this pull request?
      
      In spark SQL, map type can't be used in equality test/comparison, and `Intersect`/`Except`/`Distinct` do need equality test for all columns, we should not allow map type in `Intersect`/`Except`/`Distinct`.
      
      ## How was this patch tested?
      
      new regression test
      
      Author: Wenchen Fan <wenchen@databricks.com>
      
      Closes #17236 from cloud-fan/map.
      fb9beda5
    • Cheng Lian's avatar
      [SPARK-19905][SQL] Bring back Dataset.inputFiles for Hive SerDe tables · ffee4f1c
      Cheng Lian authored
      ## What changes were proposed in this pull request?
      
      `Dataset.inputFiles` works by matching `FileRelation`s in the query plan. In Spark 2.1, Hive SerDe tables are represented by `MetastoreRelation`, which inherits from `FileRelation`. However, in Spark 2.2, Hive SerDe tables are now represented by `CatalogRelation`, which doesn't inherit from `FileRelation` anymore, due to the unification of Hive SerDe tables and data source tables. This change breaks `Dataset.inputFiles` for Hive SerDe tables.
      
      This PR tries to fix this issue by explicitly matching `CatalogRelation`s that are Hive SerDe tables in `Dataset.inputFiles`. Note that we can't make `CatalogRelation` inherit from `FileRelation` since not all `CatalogRelation`s are file based (e.g., JDBC data source tables).
      
      ## How was this patch tested?
      
      New test case added in `HiveDDLSuite`.
      
      Author: Cheng Lian <lian@databricks.com>
      
      Closes #17247 from liancheng/spark-19905-hive-table-input-files.
      ffee4f1c
    • Budde's avatar
      [SPARK-19611][SQL] Preserve metastore field order when merging inferred schema · bc303514
      Budde authored
      ## What changes were proposed in this pull request?
      
      The ```HiveMetastoreCatalog.mergeWithMetastoreSchema()``` method added in #16944 may
      not preserve the same field order as the metastore schema in some cases, which can cause
      queries to fail. This change ensures that the metastore field order is preserved.
      
      ## How was this patch tested?
      
      A test for ensuring that metastore order is preserved was added to ```HiveSchemaInferenceSuite.```
      The particular failure usecase from #16944 was tested manually as well.
      
      Author: Budde <budde@amazon.com>
      
      Closes #17249 from budde/PreserveMetastoreFieldOrder.
      bc303514
    • Yong Tang's avatar
      [SPARK-17979][SPARK-14453] Remove deprecated SPARK_YARN_USER_ENV and SPARK_JAVA_OPTS · 8f0490e2
      Yong Tang authored
      This fix removes deprecated support for config `SPARK_YARN_USER_ENV`, as is mentioned in SPARK-17979.
      This fix also removes deprecated support for the following:
      ```
      SPARK_YARN_USER_ENV
      SPARK_JAVA_OPTS
      SPARK_CLASSPATH
      SPARK_WORKER_INSTANCES
      ```
      
      Related JIRA:
      [SPARK-14453]: https://issues.apache.org/jira/browse/SPARK-14453
      [SPARK-12344]: https://issues.apache.org/jira/browse/SPARK-12344
      [SPARK-15781]: https://issues.apache.org/jira/browse/SPARK-15781
      
      Existing tests should pass.
      
      Author: Yong Tang <yong.tang.github@outlook.com>
      
      Closes #17212 from yongtang/SPARK-17979.
      8f0490e2
    • Carson Wang's avatar
      [SPARK-19620][SQL] Fix incorrect exchange coordinator id in the physical plan · dd9049e0
      Carson Wang authored
      ## What changes were proposed in this pull request?
      When adaptive execution is enabled, an exchange coordinator is used in the Exchange operators. For Join, the same exchange coordinator is used for its two Exchanges. But the physical plan shows two different coordinator Ids which is confusing.
      
      This PR is to fix the incorrect exchange coordinator id in the physical plan. The coordinator object instead of the `Option[ExchangeCoordinator]` should be used to generate the identity hash code of the same coordinator.
      
      ## How was this patch tested?
      Before the patch, the physical plan shows two different exchange coordinator id for Join.
      ```
      == Physical Plan ==
      *Project [key1#3L, value2#12L]
      +- *SortMergeJoin [key1#3L], [key2#11L], Inner
         :- *Sort [key1#3L ASC NULLS FIRST], false, 0
         :  +- Exchange(coordinator id: 1804587700) hashpartitioning(key1#3L, 10), coordinator[target post-shuffle partition size: 67108864]
         :     +- *Project [(id#0L % 500) AS key1#3L]
         :        +- *Filter isnotnull((id#0L % 500))
         :           +- *Range (0, 1000, step=1, splits=Some(10))
         +- *Sort [key2#11L ASC NULLS FIRST], false, 0
            +- Exchange(coordinator id: 793927319) hashpartitioning(key2#11L, 10), coordinator[target post-shuffle partition size: 67108864]
               +- *Project [(id#8L % 500) AS key2#11L, id#8L AS value2#12L]
                  +- *Filter isnotnull((id#8L % 500))
                     +- *Range (0, 1000, step=1, splits=Some(10))
      ```
      After the patch, two exchange coordinator id are the same.
      
      Author: Carson Wang <carson.wang@intel.com>
      
      Closes #16952 from carsonwang/FixCoordinatorId.
      dd9049e0
    • Kazuaki Ishizaki's avatar
      [SPARK-19786][SQL] Facilitate loop optimizations in a JIT compiler regarding range() · fcb68e0f
      Kazuaki Ishizaki authored
      ## What changes were proposed in this pull request?
      
      This PR improves performance of operations with `range()` by changing Java code generated by Catalyst. This PR is inspired by the [blog article](https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html).
      
      This PR changes generated code in the following two points.
      1. Replace a while-loop with long instance variables a for-loop with int local varibles
      2. Suppress generation of `shouldStop()` method if this method is unnecessary (e.g. `append()` is not generated).
      
      These points facilitates compiler optimizations in a JIT compiler by feeding the simplified Java code into the JIT compiler. The performance is improved by 7.6x.
      
      Benchmark program:
      ```java
      val N = 1 << 29
      val iters = 2
      val benchmark = new Benchmark("range.count", N * iters)
      benchmark.addCase(s"with this PR") { i =>
        var n = 0
        var len = 0
        while (n < iters) {
          len += sparkSession.range(N).selectExpr("count(id)").collect.length
          n += 1
        }
      }
      benchmark.run
      ```
      
      Performance result without this PR
      ```
      OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
      Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
      range.count:                             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
      ------------------------------------------------------------------------------------------------
      w/o this PR                                   1349 / 1356        796.2           1.3       1.0X
      ```
      
      Performance result with this PR
      ```
      OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
      Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
      range.count:                             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
      ------------------------------------------------------------------------------------------------
      with this PR                                   177 /  271       6065.3           0.2       1.0X
      ```
      
      Here is a comparison between generated code w/o and with this PR. Only the method ```agg_doAggregateWithoutKey``` is changed.
      
      Generated code without this PR
      ```java
      
      /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
      /* 006 */   private Object[] references;
      /* 007 */   private scala.collection.Iterator[] inputs;
      /* 008 */   private boolean agg_initAgg;
      /* 009 */   private boolean agg_bufIsNull;
      /* 010 */   private long agg_bufValue;
      /* 011 */   private org.apache.spark.sql.execution.metric.SQLMetric range_numOutputRows;
      /* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric range_numGeneratedRows;
      /* 013 */   private boolean range_initRange;
      /* 014 */   private long range_number;
      /* 015 */   private TaskContext range_taskContext;
      /* 016 */   private InputMetrics range_inputMetrics;
      /* 017 */   private long range_batchEnd;
      /* 018 */   private long range_numElementsTodo;
      /* 019 */   private scala.collection.Iterator range_input;
      /* 020 */   private UnsafeRow range_result;
      /* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder;
      /* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter;
      /* 023 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows;
      /* 024 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
      /* 025 */   private UnsafeRow agg_result;
      /* 026 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
      /* 027 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
      /* 028 */
      /* 029 */   public GeneratedIterator(Object[] references) {
      /* 030 */     this.references = references;
      /* 031 */   }
      /* 032 */
      /* 033 */   public void init(int index, scala.collection.Iterator[] inputs) {
      /* 034 */     partitionIndex = index;
      /* 035 */     this.inputs = inputs;
      /* 036 */     agg_initAgg = false;
      /* 037 */
      /* 038 */     this.range_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
      /* 039 */     this.range_numGeneratedRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
      /* 040 */     range_initRange = false;
      /* 041 */     range_number = 0L;
      /* 042 */     range_taskContext = TaskContext.get();
      /* 043 */     range_inputMetrics = range_taskContext.taskMetrics().inputMetrics();
      /* 044 */     range_batchEnd = 0;
      /* 045 */     range_numElementsTodo = 0L;
      /* 046 */     range_input = inputs[0];
      /* 047 */     range_result = new UnsafeRow(1);
      /* 048 */     this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0);
      /* 049 */     this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1);
      /* 050 */     this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
      /* 051 */     this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
      /* 052 */     agg_result = new UnsafeRow(1);
      /* 053 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
      /* 054 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
      /* 055 */
      /* 056 */   }
      /* 057 */
      /* 058 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
      /* 059 */     // initialize aggregation buffer
      /* 060 */     agg_bufIsNull = false;
      /* 061 */     agg_bufValue = 0L;
      /* 062 */
      /* 063 */     // initialize Range
      /* 064 */     if (!range_initRange) {
      /* 065 */       range_initRange = true;
      /* 066 */       initRange(partitionIndex);
      /* 067 */     }
      /* 068 */
      /* 069 */     while (true) {
      /* 070 */       while (range_number != range_batchEnd) {
      /* 071 */         long range_value = range_number;
      /* 072 */         range_number += 1L;
      /* 073 */
      /* 074 */         // do aggregate
      /* 075 */         // common sub-expressions
      /* 076 */
      /* 077 */         // evaluate aggregate function
      /* 078 */         boolean agg_isNull1 = false;
      /* 079 */
      /* 080 */         long agg_value1 = -1L;
      /* 081 */         agg_value1 = agg_bufValue + 1L;
      /* 082 */         // update aggregation buffer
      /* 083 */         agg_bufIsNull = false;
      /* 084 */         agg_bufValue = agg_value1;
      /* 085 */
      /* 086 */         if (shouldStop()) return;
      /* 087 */       }
      /* 088 */
      /* 089 */       if (range_taskContext.isInterrupted()) {
      /* 090 */         throw new TaskKilledException();
      /* 091 */       }
      /* 092 */
      /* 093 */       long range_nextBatchTodo;
      /* 094 */       if (range_numElementsTodo > 1000L) {
      /* 095 */         range_nextBatchTodo = 1000L;
      /* 096 */         range_numElementsTodo -= 1000L;
      /* 097 */       } else {
      /* 098 */         range_nextBatchTodo = range_numElementsTodo;
      /* 099 */         range_numElementsTodo = 0;
      /* 100 */         if (range_nextBatchTodo == 0) break;
      /* 101 */       }
      /* 102 */       range_numOutputRows.add(range_nextBatchTodo);
      /* 103 */       range_inputMetrics.incRecordsRead(range_nextBatchTodo);
      /* 104 */
      /* 105 */       range_batchEnd += range_nextBatchTodo * 1L;
      /* 106 */     }
      /* 107 */
      /* 108 */   }
      /* 109 */
      /* 110 */   private void initRange(int idx) {
      /* 111 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
      /* 112 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L);
      /* 113 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(10000L);
      /* 114 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
      /* 115 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
      /* 117 */
      /* 118 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
      /* 119 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
      /* 120 */       range_number = Long.MAX_VALUE;
      /* 121 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
      /* 122 */       range_number = Long.MIN_VALUE;
      /* 123 */     } else {
      /* 124 */       range_number = st.longValue();
      /* 125 */     }
      /* 126 */     range_batchEnd = range_number;
      /* 127 */
      /* 128 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
      /* 129 */     .multiply(step).add(start);
      /* 130 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
      /* 131 */       partitionEnd = Long.MAX_VALUE;
      /* 132 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
      /* 133 */       partitionEnd = Long.MIN_VALUE;
      /* 134 */     } else {
      /* 135 */       partitionEnd = end.longValue();
      /* 136 */     }
      /* 137 */
      /* 138 */     java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
      /* 139 */       java.math.BigInteger.valueOf(range_number));
      /* 140 */     range_numElementsTodo  = startToEnd.divide(step).longValue();
      /* 141 */     if (range_numElementsTodo < 0) {
      /* 142 */       range_numElementsTodo = 0;
      /* 143 */     } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
      /* 144 */       range_numElementsTodo++;
      /* 145 */     }
      /* 146 */   }
      /* 147 */
      /* 148 */   protected void processNext() throws java.io.IOException {
      /* 149 */     while (!agg_initAgg) {
      /* 150 */       agg_initAgg = true;
      /* 151 */       long agg_beforeAgg = System.nanoTime();
      /* 152 */       agg_doAggregateWithoutKey();
      /* 153 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
      /* 154 */
      /* 155 */       // output the result
      /* 156 */
      /* 157 */       agg_numOutputRows.add(1);
      /* 158 */       agg_rowWriter.zeroOutNullBytes();
      /* 159 */
      /* 160 */       if (agg_bufIsNull) {
      /* 161 */         agg_rowWriter.setNullAt(0);
      /* 162 */       } else {
      /* 163 */         agg_rowWriter.write(0, agg_bufValue);
      /* 164 */       }
      /* 165 */       append(agg_result);
      /* 166 */     }
      /* 167 */   }
      /* 168 */ }
      ```
      
      Generated code with this PR
      ```java
      /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
      /* 006 */   private Object[] references;
      /* 007 */   private scala.collection.Iterator[] inputs;
      /* 008 */   private boolean agg_initAgg;
      /* 009 */   private boolean agg_bufIsNull;
      /* 010 */   private long agg_bufValue;
      /* 011 */   private org.apache.spark.sql.execution.metric.SQLMetric range_numOutputRows;
      /* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric range_numGeneratedRows;
      /* 013 */   private boolean range_initRange;
      /* 014 */   private long range_number;
      /* 015 */   private TaskContext range_taskContext;
      /* 016 */   private InputMetrics range_inputMetrics;
      /* 017 */   private long range_batchEnd;
      /* 018 */   private long range_numElementsTodo;
      /* 019 */   private scala.collection.Iterator range_input;
      /* 020 */   private UnsafeRow range_result;
      /* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder;
      /* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter;
      /* 023 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows;
      /* 024 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
      /* 025 */   private UnsafeRow agg_result;
      /* 026 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
      /* 027 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
      /* 028 */
      /* 029 */   public GeneratedIterator(Object[] references) {
      /* 030 */     this.references = references;
      /* 031 */   }
      /* 032 */
      /* 033 */   public void init(int index, scala.collection.Iterator[] inputs) {
      /* 034 */     partitionIndex = index;
      /* 035 */     this.inputs = inputs;
      /* 036 */     agg_initAgg = false;
      /* 037 */
      /* 038 */     this.range_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
      /* 039 */     this.range_numGeneratedRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
      /* 040 */     range_initRange = false;
      /* 041 */     range_number = 0L;
      /* 042 */     range_taskContext = TaskContext.get();
      /* 043 */     range_inputMetrics = range_taskContext.taskMetrics().inputMetrics();
      /* 044 */     range_batchEnd = 0;
      /* 045 */     range_numElementsTodo = 0L;
      /* 046 */     range_input = inputs[0];
      /* 047 */     range_result = new UnsafeRow(1);
      /* 048 */     this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0);
      /* 049 */     this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1);
      /* 050 */     this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
      /* 051 */     this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
      /* 052 */     agg_result = new UnsafeRow(1);
      /* 053 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
      /* 054 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
      /* 055 */
      /* 056 */   }
      /* 057 */
      /* 058 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
      /* 059 */     // initialize aggregation buffer
      /* 060 */     agg_bufIsNull = false;
      /* 061 */     agg_bufValue = 0L;
      /* 062 */
      /* 063 */     // initialize Range
      /* 064 */     if (!range_initRange) {
      /* 065 */       range_initRange = true;
      /* 066 */       initRange(partitionIndex);
      /* 067 */     }
      /* 068 */
      /* 069 */     while (true) {
      /* 070 */       long range_range = range_batchEnd - range_number;
      /* 071 */       if (range_range != 0L) {
      /* 072 */         int range_localEnd = (int)(range_range / 1L);
      /* 073 */         for (int range_localIdx = 0; range_localIdx < range_localEnd; range_localIdx++) {
      /* 074 */           long range_value = ((long)range_localIdx * 1L) + range_number;
      /* 075 */
      /* 076 */           // do aggregate
      /* 077 */           // common sub-expressions
      /* 078 */
      /* 079 */           // evaluate aggregate function
      /* 080 */           boolean agg_isNull1 = false;
      /* 081 */
      /* 082 */           long agg_value1 = -1L;
      /* 083 */           agg_value1 = agg_bufValue + 1L;
      /* 084 */           // update aggregation buffer
      /* 085 */           agg_bufIsNull = false;
      /* 086 */           agg_bufValue = agg_value1;
      /* 087 */
      /* 088 */           // shouldStop check is eliminated
      /* 089 */         }
      /* 090 */         range_number = range_batchEnd;
      /* 091 */       }
      /* 092 */
      /* 093 */       if (range_taskContext.isInterrupted()) {
      /* 094 */         throw new TaskKilledException();
      /* 095 */       }
      /* 096 */
      /* 097 */       long range_nextBatchTodo;
      /* 098 */       if (range_numElementsTodo > 1000L) {
      /* 099 */         range_nextBatchTodo = 1000L;
      /* 100 */         range_numElementsTodo -= 1000L;
      /* 101 */       } else {
      /* 102 */         range_nextBatchTodo = range_numElementsTodo;
      /* 103 */         range_numElementsTodo = 0;
      /* 104 */         if (range_nextBatchTodo == 0) break;
      /* 105 */       }
      /* 106 */       range_numOutputRows.add(range_nextBatchTodo);
      /* 107 */       range_inputMetrics.incRecordsRead(range_nextBatchTodo);
      /* 108 */
      /* 109 */       range_batchEnd += range_nextBatchTodo * 1L;
      /* 110 */     }
      /* 111 */
      /* 112 */   }
      /* 113 */
      /* 114 */   private void initRange(int idx) {
      /* 115 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
      /* 116 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L);
      /* 117 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(10000L);
      /* 118 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
      /* 119 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
      /* 120 */     long partitionEnd;
      /* 121 */
      /* 122 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
      /* 123 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
      /* 124 */       range_number = Long.MAX_VALUE;
      /* 125 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
      /* 126 */       range_number = Long.MIN_VALUE;
      /* 127 */     } else {
      /* 128 */       range_number = st.longValue();
      /* 129 */     }
      /* 130 */     range_batchEnd = range_number;
      /* 131 */
      /* 132 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
      /* 133 */     .multiply(step).add(start);
      /* 134 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
      /* 135 */       partitionEnd = Long.MAX_VALUE;
      /* 136 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
      /* 137 */       partitionEnd = Long.MIN_VALUE;
      /* 138 */     } else {
      /* 139 */       partitionEnd = end.longValue();
      /* 140 */     }
      /* 141 */
      /* 142 */     java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
      /* 143 */       java.math.BigInteger.valueOf(range_number));
      /* 144 */     range_numElementsTodo  = startToEnd.divide(step).longValue();
      /* 145 */     if (range_numElementsTodo < 0) {
      /* 146 */       range_numElementsTodo = 0;
      /* 147 */     } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
      /* 148 */       range_numElementsTodo++;
      /* 149 */     }
      /* 150 */   }
      /* 151 */
      /* 152 */   protected void processNext() throws java.io.IOException {
      /* 153 */     while (!agg_initAgg) {
      /* 154 */       agg_initAgg = true;
      /* 155 */       long agg_beforeAgg = System.nanoTime();
      /* 156 */       agg_doAggregateWithoutKey();
      /* 157 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
      /* 158 */
      /* 159 */       // output the result
      /* 160 */
      /* 161 */       agg_numOutputRows.add(1);
      /* 162 */       agg_rowWriter.zeroOutNullBytes();
      /* 163 */
      /* 164 */       if (agg_bufIsNull) {
      /* 165 */         agg_rowWriter.setNullAt(0);
      /* 166 */       } else {
      /* 167 */         agg_rowWriter.write(0, agg_bufValue);
      /* 168 */       }
      /* 169 */       append(agg_result);
      /* 170 */     }
      /* 171 */   }
      /* 172 */ }
      ```
      
      A part of suppressing `shouldStop()` was originally developed by inouehrs
      
      ## How was this patch tested?
      
      Add new tests into `DataFrameRangeSuite`
      
      Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
      
      Closes #17122 from kiszk/SPARK-19786.
      fcb68e0f
    • Tyson Condie's avatar
      [SPARK-19891][SS] Await Batch Lock notified on stream execution exit · 501b7111
      Tyson Condie authored
      ## What changes were proposed in this pull request?
      
      We need to notify the await batch lock when the stream exits early e.g., when an exception has been thrown.
      
      ## How was this patch tested?
      
      Current tests that throw exceptions at runtime will finish faster as a result of this update.
      
      zsxwing
      
      Please review http://spark.apache.org/contributing.html before opening a pull request.
      
      Author: Tyson Condie <tcondie@gmail.com>
      
      Closes #17231 from tcondie/kafka-writer.
      501b7111
    • Kazuaki Ishizaki's avatar
      [SPARK-19008][SQL] Improve performance of Dataset.map by eliminating boxing/unboxing · 5949e6c4
      Kazuaki Ishizaki authored
      ## What changes were proposed in this pull request?
      
      This PR improve performance of Dataset.map() for primitive types by removing boxing/unbox operations. This is based on [the discussion](https://github.com/apache/spark/pull/16391#discussion_r93788919) with cloud-fan.
      
      Current Catalyst generates a method call to a `apply()` method of an anonymous function written in Scala. The types of an argument and return value are `java.lang.Object`. As a result, each method call for a primitive value involves a pair of unboxing and boxing for calling this `apply()` method and a pair of boxing and unboxing for returning from this `apply()` method.
      
      This PR directly calls a specialized version of a `apply()` method without boxing and unboxing. For example, if types of an arguments ant return value is `int`, this PR generates a method call to `apply$mcII$sp`. This PR supports any combination of `Int`, `Long`, `Float`, and `Double`.
      
      The following is a benchmark result using [this program](https://github.com/apache/spark/pull/16391/files) with 4.7x. Here is a Dataset part of this program.
      
      Without this PR
      ```
      OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
      Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
      back-to-back map:                        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
      ------------------------------------------------------------------------------------------------
      RDD                                           1923 / 1952         52.0          19.2       1.0X
      DataFrame                                      526 /  548        190.2           5.3       3.7X
      Dataset                                       3094 / 3154         32.3          30.9       0.6X
      ```
      
      With this PR
      ```
      OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
      Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
      back-to-back map:                        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
      ------------------------------------------------------------------------------------------------
      RDD                                           1883 / 1892         53.1          18.8       1.0X
      DataFrame                                      502 /  642        199.1           5.0       3.7X
      Dataset                                        657 /  784        152.2           6.6       2.9X
      ```
      
      ```java
        def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = {
          import spark.implicits._
          val rdd = spark.sparkContext.range(0, numRows)
          val ds = spark.range(0, numRows)
          val func = (l: Long) => l + 1
          val benchmark = new Benchmark("back-to-back map", numRows)
      ...
          benchmark.addCase("Dataset") { iter =>
            var res = ds.as[Long]
            var i = 0
            while (i < numChains) {
              res = res.map(func)
              i += 1
            }
            res.queryExecution.toRdd.foreach(_ => Unit)
          }
          benchmark
        }
      ```
      
      A motivating example
      ```java
      Seq(1, 2, 3).toDS.map(i => i * 7).show
      ```
      
      Generated code without this PR
      ```java
      /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
      /* 006 */   private Object[] references;
      /* 007 */   private scala.collection.Iterator[] inputs;
      /* 008 */   private scala.collection.Iterator inputadapter_input;
      /* 009 */   private UnsafeRow deserializetoobject_result;
      /* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder;
      /* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter;
      /* 012 */   private int mapelements_argValue;
      /* 013 */   private UnsafeRow mapelements_result;
      /* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder;
      /* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter;
      /* 016 */   private UnsafeRow serializefromobject_result;
      /* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
      /* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
      /* 019 */
      /* 020 */   public GeneratedIterator(Object[] references) {
      /* 021 */     this.references = references;
      /* 022 */   }
      /* 023 */
      /* 024 */   public void init(int index, scala.collection.Iterator[] inputs) {
      /* 025 */     partitionIndex = index;
      /* 026 */     this.inputs = inputs;
      /* 027 */     inputadapter_input = inputs[0];
      /* 028 */     deserializetoobject_result = new UnsafeRow(1);
      /* 029 */     this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 0);
      /* 030 */     this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1);
      /* 031 */
      /* 032 */     mapelements_result = new UnsafeRow(1);
      /* 033 */     this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 0);
      /* 034 */     this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1);
      /* 035 */     serializefromobject_result = new UnsafeRow(1);
      /* 036 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
      /* 037 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
      /* 038 */
      /* 039 */   }
      /* 040 */
      /* 041 */   protected void processNext() throws java.io.IOException {
      /* 042 */     while (inputadapter_input.hasNext() && !stopEarly()) {
      /* 043 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
      /* 044 */       int inputadapter_value = inputadapter_row.getInt(0);
      /* 045 */
      /* 046 */       boolean mapelements_isNull = true;
      /* 047 */       int mapelements_value = -1;
      /* 048 */       if (!false) {
      /* 049 */         mapelements_argValue = inputadapter_value;
      /* 050 */
      /* 051 */         mapelements_isNull = false;
      /* 052 */         if (!mapelements_isNull) {
      /* 053 */           Object mapelements_funcResult = null;
      /* 054 */           mapelements_funcResult = ((scala.Function1) references[0]).apply(mapelements_argValue);
      /* 055 */           if (mapelements_funcResult == null) {
      /* 056 */             mapelements_isNull = true;
      /* 057 */           } else {
      /* 058 */             mapelements_value = (Integer) mapelements_funcResult;
      /* 059 */           }
      /* 060 */
      /* 061 */         }
      /* 062 */
      /* 063 */       }
      /* 064 */
      /* 065 */       serializefromobject_rowWriter.zeroOutNullBytes();
      /* 066 */
      /* 067 */       if (mapelements_isNull) {
      /* 068 */         serializefromobject_rowWriter.setNullAt(0);
      /* 069 */       } else {
      /* 070 */         serializefromobject_rowWriter.write(0, mapelements_value);
      /* 071 */       }
      /* 072 */       append(serializefromobject_result);
      /* 073 */       if (shouldStop()) return;
      /* 074 */     }
      /* 075 */   }
      /* 076 */ }
      ```
      
      Generated code with this PR (lines 48-56 are changed)
      ```java
      /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
      /* 006 */   private Object[] references;
      /* 007 */   private scala.collection.Iterator[] inputs;
      /* 008 */   private scala.collection.Iterator inputadapter_input;
      /* 009 */   private UnsafeRow deserializetoobject_result;
      /* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder;
      /* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter;
      /* 012 */   private int mapelements_argValue;
      /* 013 */   private UnsafeRow mapelements_result;
      /* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder;
      /* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter;
      /* 016 */   private UnsafeRow serializefromobject_result;
      /* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
      /* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
      /* 019 */
      /* 020 */   public GeneratedIterator(Object[] references) {
      /* 021 */     this.references = references;
      /* 022 */   }
      /* 023 */
      /* 024 */   public void init(int index, scala.collection.Iterator[] inputs) {
      /* 025 */     partitionIndex = index;
      /* 026 */     this.inputs = inputs;
      /* 027 */     inputadapter_input = inputs[0];
      /* 028 */     deserializetoobject_result = new UnsafeRow(1);
      /* 029 */     this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 0);
      /* 030 */     this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1);
      /* 031 */
      /* 032 */     mapelements_result = new UnsafeRow(1);
      /* 033 */     this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 0);
      /* 034 */     this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1);
      /* 035 */     serializefromobject_result = new UnsafeRow(1);
      /* 036 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
      /* 037 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
      /* 038 */
      /* 039 */   }
      /* 040 */
      /* 041 */   protected void processNext() throws java.io.IOException {
      /* 042 */     while (inputadapter_input.hasNext() && !stopEarly()) {
      /* 043 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
      /* 044 */       int inputadapter_value = inputadapter_row.getInt(0);
      /* 045 */
      /* 046 */       boolean mapelements_isNull = true;
      /* 047 */       int mapelements_value = -1;
      /* 048 */       if (!false) {
      /* 049 */         mapelements_argValue = inputadapter_value;
      /* 050 */
      /* 051 */         mapelements_isNull = false;
      /* 052 */         if (!mapelements_isNull) {
      /* 053 */           mapelements_value = ((scala.Function1) references[0]).apply$mcII$sp(mapelements_argValue);
      /* 054 */         }
      /* 055 */
      /* 056 */       }
      /* 057 */
      /* 058 */       serializefromobject_rowWriter.zeroOutNullBytes();
      /* 059 */
      /* 060 */       if (mapelements_isNull) {
      /* 061 */         serializefromobject_rowWriter.setNullAt(0);
      /* 062 */       } else {
      /* 063 */         serializefromobject_rowWriter.write(0, mapelements_value);
      /* 064 */       }
      /* 065 */       append(serializefromobject_result);
      /* 066 */       if (shouldStop()) return;
      /* 067 */     }
      /* 068 */   }
      /* 069 */ }
      ```
      
      Java bytecode for methods for `i => i * 7`
      ```java
      $ javap -c Test\$\$anonfun\$5\$\$anonfun\$apply\$mcV\$sp\$1.class
      Compiled from "Test.scala"
      public final class org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1 extends scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable {
        public static final long serialVersionUID;
      
        public final int apply(int);
          Code:
             0: aload_0
             1: iload_1
             2: invokevirtual #18                 // Method apply$mcII$sp:(I)I
             5: ireturn
      
        public int apply$mcII$sp(int);
          Code:
             0: iload_1
             1: bipush        7
             3: imul
             4: ireturn
      
        public final java.lang.Object apply(java.lang.Object);
          Code:
             0: aload_0
             1: aload_1
             2: invokestatic  #29                 // Method scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I
             5: invokevirtual #31                 // Method apply:(I)I
             8: invokestatic  #35                 // Method scala/runtime/BoxesRunTime.boxToInteger:(I)Ljava/lang/Integer;
            11: areturn
      
        public org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1(org.apache.spark.sql.Test$$anonfun$5);
          Code:
             0: aload_0
             1: invokespecial #42                 // Method scala/runtime/AbstractFunction1$mcII$sp."<init>":()V
             4: return
      }
      ```
      ## How was this patch tested?
      
      Added new test suites to `DatasetPrimitiveSuite`.
      
      Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
      
      Closes #17172 from kiszk/SPARK-19008.
      5949e6c4
  3. Mar 09, 2017
    • Burak Yavuz's avatar
      [SPARK-19886] Fix reportDataLoss if statement in SS KafkaSource · 82138e09
      Burak Yavuz authored
      ## What changes were proposed in this pull request?
      
      Fix the `throw new IllegalStateException` if statement part.
      
      ## How is this patch tested
      
      Regression test
      
      Author: Burak Yavuz <brkyvz@gmail.com>
      
      Closes #17228 from brkyvz/kafka-cause-fix.
      82138e09
    • Budde's avatar
      [SPARK-19611][SQL] Introduce configurable table schema inference · f79371ad
      Budde authored
      ## Summary of changes
      
      Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties.
      
      - Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf
      - Add schemaPreservesCase field to CatalogTable (set to false when schema can't
        successfully be read from Hive table props)
      - Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is
        false, depending on spark.sql.hive.caseSensitiveInferenceMode
      - Add alterTableSchema() method to the ExternalCatalog interface
      - Add HiveSchemaInferenceSuite tests
      - Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as
        HiveMetastoreCatalog.mergeWithMetastoreSchema
      - Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite
      
      [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611)
      
      ## How was this patch tested?
      
      The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference is working as expected. ```ExternalCatalogSuite``` has also been extended to cover the new ```alterTableSchema()``` API.
      
      Author: Budde <budde@amazon.com>
      
      Closes #16944 from budde/SPARK-19611.
      f79371ad
    • Jeff Zhang's avatar
      [SPARK-12334][SQL][PYSPARK] Support read from multiple input paths for orc... · cabe1df8
      Jeff Zhang authored
      [SPARK-12334][SQL][PYSPARK] Support read from multiple input paths for orc file in DataFrameReader.orc
      
      Beside the issue in spark api, also fix 2 minor issues in pyspark
      - support read from multiple input paths for orc
      - support read from multiple input paths for text
      
      Author: Jeff Zhang <zjffdu@apache.org>
      
      Closes #10307 from zjffdu/SPARK-12334.
      cabe1df8
    • uncleGen's avatar
      [SPARK-19861][SS] watermark should not be a negative time. · 30b18e69
      uncleGen authored
      ## What changes were proposed in this pull request?
      
      `watermark` should not be negative. This behavior is invalid, check it before real run.
      
      ## How was this patch tested?
      
      add new unit test.
      
      Author: uncleGen <hustyugm@gmail.com>
      Author: dylon <hustyugm@gmail.com>
      
      Closes #17202 from uncleGen/SPARK-19861.
      30b18e69
    • Liwei Lin's avatar
      [SPARK-19715][STRUCTURED STREAMING] Option to Strip Paths in FileSource · 40da4d18
      Liwei Lin authored
      ## What changes were proposed in this pull request?
      
      Today, we compare the whole path when deciding if a file is new in the FileSource for structured streaming. However, this would cause false negatives in the case where the path has changed in a cosmetic way (i.e. changing `s3n` to `s3a`).
      
      This patch adds an option `fileNameOnly` that causes the new file check to be based only on the filename (but still store the whole path in the log).
      
      ## Usage
      
      ```scala
      spark
        .readStream
        .option("fileNameOnly", true)
        .text("s3n://bucket/dir1/dir2")
        .writeStream
        ...
      ```
      ## How was this patch tested?
      
      Added a test case
      
      Author: Liwei Lin <lwlin7@gmail.com>
      
      Closes #17120 from lw-lin/filename-only.
      40da4d18
    • jinxing's avatar
      [SPARK-19793] Use clock.getTimeMillis when mark task as finished in TaskSetManager. · 3232e54f
      jinxing authored
      ## What changes were proposed in this pull request?
      
      TaskSetManager is now using `System.getCurrentTimeMillis` when mark task as finished in `handleSuccessfulTask` and `handleFailedTask`. Thus developer cannot set the tasks finishing time in unit test. When `handleSuccessfulTask`, task's duration = `System.getCurrentTimeMillis` - launchTime(which can be set by `clock`), the result is not correct.
      
      ## How was this patch tested?
      Existing tests.
      
      Author: jinxing <jinxing6042@126.com>
      
      Closes #17133 from jinxing64/SPARK-19793.
      3232e54f
    • Jimmy Xiang's avatar
      [SPARK-19757][CORE] DriverEndpoint#makeOffers race against... · b60b9fc1
      Jimmy Xiang authored
      [SPARK-19757][CORE] DriverEndpoint#makeOffers race against CoarseGrainedSchedulerBackend#killExecutors
      
      ## What changes were proposed in this pull request?
      While some executors are being killed due to idleness, if some new tasks come in, driver could assign them to some executors are being killed. These tasks will fail later when the executors are lost. This patch is to make sure CoarseGrainedSchedulerBackend#killExecutors and DriverEndpoint#makeOffers are properly synchronized.
      
      ## How was this patch tested?
      manual tests
      
      Author: Jimmy Xiang <jxiang@apache.org>
      
      Closes #17091 from jxiang/spark-19757.
      b60b9fc1
    • Jason White's avatar
      [SPARK-19561][SQL] add int case handling for TimestampType · 206030bd
      Jason White authored
      ## What changes were proposed in this pull request?
      
      Add handling of input of type `Int` for dataType `TimestampType` to `EvaluatePython.scala`. Py4J serializes ints smaller than MIN_INT or larger than MAX_INT to Long, which are handled correctly already, but values between MIN_INT and MAX_INT are serialized to Int.
      
      These range limits correspond to roughly half an hour on either side of the epoch. As a result, PySpark doesn't allow TimestampType values to be created in this range.
      
      Alternatives attempted: patching the `TimestampType.toInternal` function to cast return values to `long`, so Py4J would always serialize them to Scala Long. Python3 does not have a `long` type, so this approach failed on Python3.
      
      ## How was this patch tested?
      
      Added a new PySpark-side test that fails without the change.
      
      The contribution is my original work and I license the work to the project under the project’s open source license.
      
      Resubmission of https://github.com/apache/spark/pull/16896. The original PR didn't go through Jenkins and broke the build. davies dongjoon-hyun
      
      cloud-fan Could you kick off a Jenkins run for me? It passed everything for me locally, but it's possible something has changed in the last few weeks.
      
      Author: Jason White <jason.white@shopify.com>
      
      Closes #17200 from JasonMWhite/SPARK-19561.
      206030bd
    • windpiger's avatar
      [SPARK-19763][SQL] qualified external datasource table location stored in catalog · 274973d2
      windpiger authored
      ## What changes were proposed in this pull request?
      
      If we create a external datasource table with a non-qualified location , we should qualified it to store in catalog.
      
      ```
      CREATE TABLE t(a string)
      USING parquet
      LOCATION '/path/xx'
      
      CREATE TABLE t1(a string, b string)
      USING parquet
      PARTITIONED BY(b)
      LOCATION '/path/xx'
      ```
      
      when we get the table from catalog, the location should be qualified, e.g.'file:/path/xxx'
      ## How was this patch tested?
      unit test added
      
      Author: windpiger <songjun@outlook.com>
      
      Closes #17095 from windpiger/tablepathQualified.
      274973d2
    • uncleGen's avatar
      [SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one. · eeb1d6db
      uncleGen authored
      ## What changes were proposed in this pull request?
      
      A follow up to SPARK-19859:
      
      - extract the calculation of `delayMs` and reuse it.
      - update EventTimeWatermarkExec
      - use the correct `delayMs` in EventTimeWatermark
      
      ## How was this patch tested?
      
      Jenkins.
      
      Author: uncleGen <hustyugm@gmail.com>
      
      Closes #17221 from uncleGen/SPARK-19859.
      eeb1d6db
    • Shixiong Zhu's avatar
      [SPARK-19874][BUILD] Hide API docs for org.apache.spark.sql.internal · 029e40b4
      Shixiong Zhu authored
      ## What changes were proposed in this pull request?
      
      The API docs should not include the "org.apache.spark.sql.internal" package because they are internal private APIs.
      
      ## How was this patch tested?
      
      Jenkins
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #17217 from zsxwing/SPARK-19874.
      029e40b4
    • Xiao Li's avatar
      [SPARK-19235][SQL][TESTS] Enable Test Cases in DDLSuite with Hive Metastore · 09829be6
      Xiao Li authored
      ### What changes were proposed in this pull request?
      So far, the test cases in DDLSuites only verify the behaviors of InMemoryCatalog. That means, they do not cover the scenarios using HiveExternalCatalog. Thus, we need to improve the existing test suite to run these cases using Hive metastore.
      
      When porting these test cases, a bug of `SET LOCATION` is found. `path` is not set when the location is changed.
      
      After this PR, a few changes are made, as summarized below,
      - `DDLSuite` becomes an abstract class. Both `InMemoryCatalogedDDLSuite` and `HiveCatalogedDDLSuite` extend it. `InMemoryCatalogedDDLSuite` is using `InMemoryCatalog`. `HiveCatalogedDDLSuite` is using `HiveExternalCatalog`.
      - `InMemoryCatalogedDDLSuite` contains all the existing test cases in `DDLSuite`.
      - `HiveCatalogedDDLSuite` contains a subset of `DDLSuite`. The following test cases are excluded:
      
      1. The following test cases only make sense for `InMemoryCatalog`:
      ```
        test("desc table for parquet data source table using in-memory catalog")
        test("create a managed Hive source table") {
        test("create an external Hive source table")
        test("Create Hive Table As Select")
      ```
      
      2. The following test cases are unable to be ported because we are unable to alter table provider when using Hive metastore. In the future PRs we need to improve the test cases so that altering table provider is not needed:
      ```
        test("alter table: set location (datasource table)")
        test("alter table: set properties (datasource table)")
        test("alter table: unset properties (datasource table)")
        test("alter table: set serde (datasource table)")
        test("alter table: set serde partition (datasource table)")
        test("alter table: change column (datasource table)")
        test("alter table: add partition (datasource table)")
        test("alter table: drop partition (datasource table)")
        test("alter table: rename partition (datasource table)")
        test("drop table - data source table")
      ```
      
      **TODO** : in the future PRs, we need to remove `HiveDDLSuite` and move the test cases to either `DDLSuite`,  `InMemoryCatalogedDDLSuite` or `HiveCatalogedDDLSuite`.
      
      ### How was this patch tested?
      N/A
      
      Author: Xiao Li <gatorsmile@gmail.com>
      Author: gatorsmile <gatorsmile@gmail.com>
      
      Closes #16592 from gatorsmile/refactorDDLSuite.
      09829be6
  4. Mar 08, 2017
    • Dilip Biswal's avatar
      [MINOR][SQL] The analyzer rules are fired twice for cases when... · d809ceed
      Dilip Biswal authored
      [MINOR][SQL] The analyzer rules are fired twice for cases when AnalysisException is raised from analyzer.
      
      ## What changes were proposed in this pull request?
      In general we have a checkAnalysis phase which validates the logical plan and throws AnalysisException on semantic errors. However we also can throw AnalysisException from a few analyzer rules like ResolveSubquery.
      
      I found that we fire up the analyzer rules twice for the queries that throw AnalysisException from one of the analyzer rules. This is a very minor fix. We don't have to strictly fix it. I just got confused seeing the rule getting fired two times when i was not expecting it.
      
      ## How was this patch tested?
      
      Tested manually.
      
      Author: Dilip Biswal <dbiswal@us.ibm.com>
      
      Closes #17214 from dilipbiswal/analyis_twice.
      d809ceed
    • Burak Yavuz's avatar
      [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in... · a3648b5d
      Burak Yavuz authored
      [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in combination with maxFileAge in FileStreamSource
      
      ## What changes were proposed in this pull request?
      
      **The Problem**
      There is a file stream source option called maxFileAge which limits how old the files can be, relative the latest file that has been seen. This is used to limit the files that need to be remembered as "processed". Files older than the latest processed files are ignored. This values is by default 7 days.
      This causes a problem when both
      latestFirst = true
      maxFilesPerTrigger > total files to be processed.
      Here is what happens in all combinations
      1) latestFirst = false - Since files are processed in order, there wont be any unprocessed file older than the latest processed file. All files will be processed.
      2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is not, then all old files get processed in the first batch, and so no file is left behind.
      3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch process the latest X files. That sets the threshold latest file - maxFileAge, so files older than this threshold will never be considered for processing.
      The bug is with case 3.
      
      **The Solution**
      
      Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are set.
      
      ## How was this patch tested?
      
      Regression test in `FileStreamSourceSuite`
      
      Author: Burak Yavuz <brkyvz@gmail.com>
      
      Closes #17153 from brkyvz/maxFileAge.
      a3648b5d
    • hyukjinkwon's avatar
      [SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV · 45512902
      hyukjinkwon authored
      ## What changes were proposed in this pull request?
      
      This PR proposes to add an API that loads `DataFrame` from `Dataset[String]` storing csv.
      
      It allows pre-processing before loading into CSV, which means allowing a lot of workarounds for many narrow cases, for example, as below:
      
      - Case 1 - pre-processing
      
        ```scala
        val df = spark.read.text("...")
        // Pre-processing with this.
        spark.read.csv(df.as[String])
        ```
      
      - Case 2 - use other input formats
      
        ```scala
        val rdd = spark.sparkContext.newAPIHadoopFile("/file.csv.lzo",
          classOf[com.hadoop.mapreduce.LzoTextInputFormat],
          classOf[org.apache.hadoop.io.LongWritable],
          classOf[org.apache.hadoop.io.Text])
        val stringRdd = rdd.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength))
      
        spark.read.csv(stringRdd.toDS)
        ```
      
      ## How was this patch tested?
      
      Added tests in `CSVSuite` and build with Scala 2.10.
      
      ```
      ./dev/change-scala-version.sh 2.10
      ./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package
      ```
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #16854 from HyukjinKwon/SPARK-15463.
      45512902
    • Kunal Khamar's avatar
      [SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session... · 6570cfd7
      Kunal Khamar authored
      [SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session has an identical copy of the SessionState
      
      Forking a newSession() from SparkSession currently makes a new SparkSession that does not retain SessionState (i.e. temporary tables, SQL config, registered functions etc.) This change adds a method cloneSession() which creates a new SparkSession with a copy of the parent's SessionState.
      
      Subsequent changes to base session are not propagated to cloned session, clone is independent after creation.
      If the base is changed after clone has been created, say user registers new UDF, then the new UDF will not be available inside the clone. Same goes for configs and temp tables.
      
      Unit tests
      
      Author: Kunal Khamar <kkhamar@outlook.com>
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #16826 from kunalkhamar/fork-sparksession.
      6570cfd7
    • Shixiong Zhu's avatar
      [SPARK-19858][SS] Add output mode to flatMapGroupsWithState and disallow invalid cases · 1bf90123
      Shixiong Zhu authored
      ## What changes were proposed in this pull request?
      
      Add a output mode parameter to `flatMapGroupsWithState` and just define `mapGroupsWithState` as `flatMapGroupsWithState(Update)`.
      
      `UnsupportedOperationChecker` is modified to disallow unsupported cases.
      
      - Batch mapGroupsWithState or flatMapGroupsWithState is always allowed.
      - For streaming (map/flatMap)GroupsWithState, see the following table:
      
      | Operators  | Supported Query Output Mode |
      | ------------- | ------------- |
      | flatMapGroupsWithState(Update) without aggregation  | Update |
      | flatMapGroupsWithState(Update) with aggregation  | None |
      | flatMapGroupsWithState(Append) without aggregation  | Append |
      | flatMapGroupsWithState(Append) before aggregation  | Append, Update, Complete |
      | flatMapGroupsWithState(Append) after aggregation  | None |
      | Multiple flatMapGroupsWithState(Append)s  | Append |
      | Multiple mapGroupsWithStates  | None |
      | Mxing mapGroupsWithStates  and flatMapGroupsWithStates | None |
      | Other cases of multiple flatMapGroupsWithState | None |
      
      ## How was this patch tested?
      
      The added unit tests. Here are the tests related to (map/flatMap)GroupsWithState:
      ```
      [info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation: supported (1 millisecond)
      [info] - batch plan - flatMapGroupsWithState - multiple flatMapGroupsWithState(Append)s on batch relation: supported (0 milliseconds)
      [info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation: supported (0 milliseconds)
      [info] - batch plan - flatMapGroupsWithState - multiple flatMapGroupsWithState(Update)s on batch relation: supported (0 milliseconds)
      [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in update mode: supported (2 milliseconds)
      [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in append mode: not supported (7 milliseconds)
      [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation without aggregation in complete mode: not supported (5 milliseconds)
      [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Append mode: not supported (11 milliseconds)
      [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Update mode: not supported (5 milliseconds)
      [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation with aggregation in Complete mode: not supported (5 milliseconds)
      [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation without aggregation in append mode: supported (1 millisecond)
      [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation without aggregation in update mode: not supported (6 milliseconds)
      [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Append mode: supported (1 millisecond)
      [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Update mode: supported (0 milliseconds)
      [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation before aggregation in Complete mode: supported (1 millisecond)
      [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation after aggregation in Append mode: not supported (6 milliseconds)
      [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on streaming relation after aggregation in Update mode: not supported (4 milliseconds)
      [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation in complete mode: not supported (2 milliseconds)
      [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation inside streaming relation in Append output mode: supported (1 millisecond)
      [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) on batch relation inside streaming relation in Update output mode: supported (1 millisecond)
      [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation inside streaming relation in Append output mode: supported (0 milliseconds)
      [info] - streaming plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) on batch relation inside streaming relation in Update output mode: supported (0 milliseconds)
      [info] - streaming plan - flatMapGroupsWithState - multiple flatMapGroupsWithStates on streaming relation and all are in append mode: supported (2 milliseconds)
      [info] - streaming plan - flatMapGroupsWithState -  multiple flatMapGroupsWithStates on s streaming relation but some are not in append mode: not supported (7 milliseconds)
      [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation without aggregation in append mode: not supported (3 milliseconds)
      [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation without aggregation in complete mode: not supported (3 milliseconds)
      [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Append mode: not supported (6 milliseconds)
      [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Update mode: not supported (3 milliseconds)
      [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on streaming relation with aggregation in Complete mode: not supported (4 milliseconds)
      [info] - streaming plan - mapGroupsWithState - multiple mapGroupsWithStates on streaming relation and all are in append mode: not supported (4 milliseconds)
      [info] - streaming plan - mapGroupsWithState - mixing mapGroupsWithStates and flatMapGroupsWithStates on streaming relation: not supported (4 milliseconds)
      ```
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #17197 from zsxwing/mapgroups-check.
      1bf90123
    • Wojtek Szymanski's avatar
      [SPARK-19727][SQL] Fix for round function that modifies original column · e9e2c612
      Wojtek Szymanski authored
      ## What changes were proposed in this pull request?
      
      Fix for SQL round function that modifies original column when underlying data frame is created from a local product.
      
          import org.apache.spark.sql.functions._
      
          case class NumericRow(value: BigDecimal)
      
          val df = spark.createDataFrame(Seq(NumericRow(BigDecimal("1.23456789"))))
      
          df.show()
          +--------------------+
          |               value|
          +--------------------+
          |1.234567890000000000|
          +--------------------+
      
          df.withColumn("value_rounded", round('value)).show()
      
          // before
          +--------------------+-------------+
          |               value|value_rounded|
          +--------------------+-------------+
          |1.000000000000000000|            1|
          +--------------------+-------------+
      
          // after
          +--------------------+-------------+
          |               value|value_rounded|
          +--------------------+-------------+
          |1.234567890000000000|            1|
          +--------------------+-------------+
      
      ## How was this patch tested?
      
      New unit test added to existing suite `org.apache.spark.sql.MathFunctionsSuite`
      
      Author: Wojtek Szymanski <wk.szymanski@gmail.com>
      
      Closes #17075 from wojtek-szymanski/SPARK-19727.
      e9e2c612
    • windpiger's avatar
      [SPARK-19864][SQL][TEST] provide a makeQualifiedPath functions to optimize some code · f3387d97
      windpiger authored
      ## What changes were proposed in this pull request?
      
      Currently there are lots of places to make the path qualified, it is better to provide a function to do this, then the code will be more simple.
      
      ## How was this patch tested?
      N/A
      
      Author: windpiger <songjun@outlook.com>
      
      Closes #17204 from windpiger/addQualifiledPathUtil.
      f3387d97
    • Tejas Patil's avatar
      [SPARK-19843][SQL][FOLLOWUP] Classdoc for `IntWrapper` and `LongWrapper` · e420fd45
      Tejas Patil authored
      ## What changes were proposed in this pull request?
      
      This is as per suggestion by rxin at : https://github.com/apache/spark/pull/17184#discussion_r104841735
      
      ## How was this patch tested?
      
      NA as this is a documentation change
      
      Author: Tejas Patil <tejasp@fb.com>
      
      Closes #17205 from tejasapatil/SPARK-19843_followup.
      e420fd45
    • Xiao Li's avatar
      [SPARK-19601][SQL] Fix CollapseRepartition rule to preserve shuffle-enabled Repartition · 9a6ac722
      Xiao Li authored
      ### What changes were proposed in this pull request?
      
      Observed by felixcheung  in https://github.com/apache/spark/pull/16739, when users use the shuffle-enabled `repartition` API, they expect the partition they got should be the exact number they provided, even if they call shuffle-disabled `coalesce` later.
      
      Currently, `CollapseRepartition` rule does not consider whether shuffle is enabled or not. Thus, we got the following unexpected result.
      
      ```Scala
          val df = spark.range(0, 10000, 1, 5)
          val df2 = df.repartition(10)
          assert(df2.coalesce(13).rdd.getNumPartitions == 5)
          assert(df2.coalesce(7).rdd.getNumPartitions == 5)
          assert(df2.coalesce(3).rdd.getNumPartitions == 3)
      ```
      
      This PR is to fix the issue. We preserve shuffle-enabled Repartition.
      
      ### How was this patch tested?
      Added a test case
      
      Author: Xiao Li <gatorsmile@gmail.com>
      
      Closes #16933 from gatorsmile/CollapseRepartition.
      9a6ac722
    • jiangxingbo's avatar
      [SPARK-19865][SQL] remove the view identifier in SubqueryAlias · 5f7d835d
      jiangxingbo authored
      ## What changes were proposed in this pull request?
      
      Since we have a `View` node now, we can remove the view identifier in `SubqueryAlias`, which was used to indicate a view node before.
      
      ## How was this patch tested?
      
      Update the related test cases.
      
      Author: jiangxingbo <jiangxb1987@gmail.com>
      
      Closes #17210 from jiangxb1987/SubqueryAlias.
      5f7d835d
    • wangzhenhua's avatar
      [SPARK-17080][SQL] join reorder · e4427487
      wangzhenhua authored
      ## What changes were proposed in this pull request?
      
      Reorder the joins using a dynamic programming algorithm (Selinger paper):
      First we put all items (basic joined nodes) into level 1, then we build all two-way joins at level 2 from plans at level 1 (single items), then build all 3-way joins from plans at previous levels (two-way joins and single items), then 4-way joins ... etc, until we build all n-way joins and pick the best plan among them.
      
      When building m-way joins, we only keep the best plan (with the lowest cost) for the same set of m items. E.g., for 3-way joins, we keep only the best plan for items {A, B, C} among plans (A J B) J C, (A J C) J B and (B J C) J A. Thus, the plans maintained for each level when reordering four items A, B, C, D are as follows:
      ```
      level 1: p({A}), p({B}), p({C}), p({D})
      level 2: p({A, B}), p({A, C}), p({A, D}), p({B, C}), p({B, D}), p({C, D})
      level 3: p({A, B, C}), p({A, B, D}), p({A, C, D}), p({B, C, D})
      level 4: p({A, B, C, D})
      ```
      where p({A, B, C, D}) is the final output plan.
      
      For cost evaluation, since physical costs for operators are not available currently, we use cardinalities and sizes to compute costs.
      
      ## How was this patch tested?
      add test cases
      
      Author: wangzhenhua <wangzhenhua@huawei.com>
      Author: Zhenhua Wang <wzh_zju@163.com>
      
      Closes #17138 from wzhfy/joinReorder.
      e4427487
    • Anthony Truchet's avatar
      [SPARK-16440][MLLIB] Ensure broadcasted variables are destroyed even in case of exception · 9ea201cf
      Anthony Truchet authored
      ## What changes were proposed in this pull request?
      
      Ensure broadcasted variable are destroyed even in case of exception
      ## How was this patch tested?
      
      Word2VecSuite was run locally
      
      Author: Anthony Truchet <a.truchet@criteo.com>
      
      Closes #14299 from AnthonyTruchet/SPARK-16440.
      9ea201cf
    • Yuming Wang's avatar
      [SPARK-19693][SQL] Make the SET mapreduce.job.reduces automatically converted... · 3f9f9180
      Yuming Wang authored
      [SPARK-19693][SQL] Make the SET mapreduce.job.reduces automatically converted to spark.sql.shuffle.partitions
      
      ## What changes were proposed in this pull request?
      Make the `SET mapreduce.job.reduces` automatically converted to `spark.sql.shuffle.partitions`, it's similar to `SET mapred.reduce.tasks`.
      
      ## How was this patch tested?
      
      unit tests
      
      Author: Yuming Wang <wgyumg@gmail.com>
      
      Closes #17020 from wangyum/SPARK-19693.
      3f9f9180
    • Yanbo Liang's avatar
      [SPARK-19806][ML][PYSPARK] PySpark GeneralizedLinearRegression supports tweedie distribution. · 81303f7c
      Yanbo Liang authored
      ## What changes were proposed in this pull request?
      PySpark ```GeneralizedLinearRegression``` supports tweedie distribution.
      
      ## How was this patch tested?
      Add unit tests.
      
      Author: Yanbo Liang <ybliang8@gmail.com>
      
      Closes #17146 from yanboliang/spark-19806.
      81303f7c
    • Yanbo Liang's avatar
      [ML][MINOR] Separate estimator and model params for read/write test. · 1fa58868
      Yanbo Liang authored
      ## What changes were proposed in this pull request?
      Since we allow ```Estimator``` and ```Model``` not always share same params (see ```ALSParams``` and ```ALSModelParams```), we should pass in test params for estimator and model separately in function ```testEstimatorAndModelReadWrite```.
      
      ## How was this patch tested?
      Existing tests.
      
      Author: Yanbo Liang <ybliang8@gmail.com>
      
      Closes #17151 from yanboliang/test-rw.
      1fa58868
Loading