Skip to content
Snippets Groups Projects
  1. Mar 30, 2017
    • Jacek Laskowski's avatar
      [DOCS] Docs-only improvements · 0197262a
      Jacek Laskowski authored
      …adoc
      
      ## What changes were proposed in this pull request?
      
      Use recommended values for row boundaries in Window's scaladoc, i.e. `Window.unboundedPreceding`, `Window.unboundedFollowing`, and `Window.currentRow` (that were introduced in 2.1.0).
      
      ## How was this patch tested?
      
      Local build
      
      Author: Jacek Laskowski <jacek@japila.pl>
      
      Closes #17417 from jaceklaskowski/window-expression-scaladoc.
      0197262a
    • Shubham Chopra's avatar
      [SPARK-15354][CORE] Topology aware block replication strategies · b454d440
      Shubham Chopra authored
      ## What changes were proposed in this pull request?
      
      Implementations of strategies for resilient block replication for different resource managers that replicate the 3-replica strategy used by HDFS, where the first replica is on an executor, the second replica within the same rack as the executor and a third replica on a different rack.
      The implementation involves providing two pluggable classes, one running in the driver that provides topology information for every host at cluster start and the second prioritizing a list of peer BlockManagerIds.
      
      The prioritization itself can be thought of an optimization problem to find a minimal set of peers that satisfy certain objectives and replicating to these peers first. The objectives can be used to express richer constraints over and above HDFS like 3-replica strategy.
      ## How was this patch tested?
      
      This patch was tested with unit tests for storage, along with new unit tests to verify prioritization behaviour.
      
      Author: Shubham Chopra <schopra31@bloomberg.net>
      
      Closes #13932 from shubhamchopra/PrioritizerStrategy.
      b454d440
    • Yuming Wang's avatar
      [SPARK-20107][DOC] Add... · edc87d76
      Yuming Wang authored
      [SPARK-20107][DOC] Add spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version option to configuration.md
      
      ## What changes were proposed in this pull request?
      
      Add `spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version` option to `configuration.md`.
      Set `spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2` can speed up [HadoopMapReduceCommitProtocol.commitJob](https://github.com/apache/spark/blob/v2.1.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L121) for many output files.
      
      All cloudera's hadoop 2.6.0-cdh5.4.0 or higher versions(see: https://github.com/cloudera/hadoop-common/commit/1c1236182304d4075276c00c4592358f428bc433 and https://github.com/cloudera/hadoop-common/commit/16b2de27321db7ce2395c08baccfdec5562017f0) and apache's hadoop 2.7.0 or higher versions support this improvement.
      
      More see:
      
      1. [MAPREDUCE-4815](https://issues.apache.org/jira/browse/MAPREDUCE-4815): Speed up FileOutputCommitter#commitJob for many output files.
      2. [MAPREDUCE-6406](https://issues.apache.org/jira/browse/MAPREDUCE-6406): Update the default version for the property mapreduce.fileoutputcommitter.algorithm.version to 2.
      
      ## How was this patch tested?
      
      Manual test and exist tests.
      
      Author: Yuming Wang <wgyumg@gmail.com>
      
      Closes #17442 from wangyum/SPARK-20107.
      edc87d76
  2. Mar 29, 2017
    • wm624@hotmail.com's avatar
      [MINOR][SPARKR] Add run command comment in examples · 471de5db
      wm624@hotmail.com authored
      ## What changes were proposed in this pull request?
      
      There are two examples in r folder missing the run commands.
      
      In this PR, I just add the missing comment, which is consistent with other examples.
      
      ## How was this patch tested?
      
      Manual test.
      
      Author: wm624@hotmail.com <wm624@hotmail.com>
      
      Closes #17474 from wangmiao1981/stat.
      471de5db
    • Eric Liang's avatar
      [SPARK-20148][SQL] Extend the file commit API to allow subscribing to task commit messages · 79636054
      Eric Liang authored
      ## What changes were proposed in this pull request?
      
      The internal FileCommitProtocol interface returns all task commit messages in bulk to the implementation when a job finishes. However, it is sometimes useful to access those messages before the job completes, so that the driver gets incremental progress updates before the job finishes.
      
      This adds an `onTaskCommit` listener to the internal api.
      
      ## How was this patch tested?
      
      Unit tests.
      
      cc rxin
      
      Author: Eric Liang <ekl@databricks.com>
      
      Closes #17475 from ericl/file-commit-api-ext.
      79636054
    • Reynold Xin's avatar
      [SPARK-20136][SQL] Add num files and metadata operation timing to scan operator metrics · 60977889
      Reynold Xin authored
      ## What changes were proposed in this pull request?
      This patch adds explicit metadata operation timing and number of files in data source metrics. Those would be useful to include for performance profiling.
      
      Screenshot of a UI with this change (num files and metadata time are new metrics):
      
      <img width="321" alt="screen shot 2017-03-29 at 12 29 28 am" src="https://cloud.githubusercontent.com/assets/323388/24443272/d4ea58c0-1416-11e7-8940-ecb69375554a.png">
      
      ## How was this patch tested?
      N/A
      
      Author: Reynold Xin <rxin@databricks.com>
      
      Closes #17465 from rxin/SPARK-20136.
      60977889
    • bomeng's avatar
      [SPARK-20146][SQL] fix comment missing issue for thrift server · 22f07fef
      bomeng authored
      ## What changes were proposed in this pull request?
      
      The column comment was missing while constructing the Hive TableSchema. This fix will preserve the original comment.
      
      ## How was this patch tested?
      
      I have added a new test case to test the column with/without comment.
      
      Author: bomeng <bmeng@us.ibm.com>
      
      Closes #17470 from bomeng/SPARK-20146.
      22f07fef
    • Takuya UESHIN's avatar
      [SPARK-19088][SQL] Fix 2.10 build. · dd2e7d52
      Takuya UESHIN authored
      ## What changes were proposed in this pull request?
      
      Commit 6c70a38c broke the build for scala 2.10. The commit uses some reflections which are not available in Scala 2.10. This PR fixes them.
      
      ## How was this patch tested?
      
      Existing tests.
      
      Author: Takuya UESHIN <ueshin@databricks.com>
      
      Closes #17473 from ueshin/issues/SPARK-19088.
      dd2e7d52
    • Yuming Wang's avatar
      [SPARK-20120][SQL] spark-sql support silent mode · fe1d6b05
      Yuming Wang authored
      ## What changes were proposed in this pull request?
      
      It is similar to Hive silent mode, just show the query result. see: [Hive LanguageManual+Cli](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli) and [the implementation of Hive silent mode](https://github.com/apache/hive/blob/release-1.2.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L948-L950).
      
      This PR set the Logger level to `WARN` to get similar result.
      
      ## How was this patch tested?
      
      manual tests
      
      ![manual test spark sql silent mode](https://cloud.githubusercontent.com/assets/5399861/24390165/989b7780-13b9-11e7-8496-6e68f55757e3.gif)
      
      Author: Yuming Wang <wgyumg@gmail.com>
      
      Closes #17449 from wangyum/SPARK-20120.
      fe1d6b05
    • Xiao Li's avatar
      [SPARK-17075][SQL][FOLLOWUP] Add Estimation of Constant Literal · 5c8ef376
      Xiao Li authored
      ### What changes were proposed in this pull request?
      `FalseLiteral` and `TrueLiteral` should have been eliminated by optimizer rule `BooleanSimplification`, but null literals might be added by optimizer rule `NullPropagation`. For safety, our filter estimation should handle all the eligible literal cases.
      
      Our optimizer rule BooleanSimplification is unable to remove the null literal in many cases. For example, `a < 0 or null`. Thus, we need to handle null literal in filter estimation.
      
      `Not` can be pushed down below `And` and `Or`. Then, we could see two consecutive `Not`, which need to be collapsed into one. Because of the limited expression support for filter estimation, we just need to handle the case `Not(null)` for avoiding incorrect error due to the boolean operation on null. For details, see below matrix.
      
      ```
      not NULL = NULL
      NULL or false = NULL
      NULL or true = true
      NULL or NULL = NULL
      NULL and false = false
      NULL and true = NULL
      NULL and NULL = NULL
      ```
      ### How was this patch tested?
      Added the test cases.
      
      Author: Xiao Li <gatorsmile@gmail.com>
      
      Closes #17446 from gatorsmile/constantFilterEstimation.
      5c8ef376
    • Takeshi Yamamuro's avatar
      [SPARK-20009][SQL] Support DDL strings for defining schema in functions.from_json · c4008480
      Takeshi Yamamuro authored
      ## What changes were proposed in this pull request?
      This pr added `StructType.fromDDL`  to convert a DDL format string into `StructType` for defining schemas in `functions.from_json`.
      
      ## How was this patch tested?
      Added tests in `JsonFunctionsSuite`.
      
      Author: Takeshi Yamamuro <yamamuro@apache.org>
      
      Closes #17406 from maropu/SPARK-20009.
      c4008480
    • Kunal Khamar's avatar
      [SPARK-20048][SQL] Cloning SessionState does not clone query execution listeners · 142f6d14
      Kunal Khamar authored
      ## What changes were proposed in this pull request?
      
      Bugfix from [SPARK-19540.](https://github.com/apache/spark/pull/16826)
      Cloning SessionState does not clone query execution listeners, so cloned session is unable to listen to events on queries.
      
      ## How was this patch tested?
      
      - Unit test
      
      Author: Kunal Khamar <kkhamar@outlook.com>
      
      Closes #17379 from kunalkhamar/clone-bugfix.
      142f6d14
    • Holden Karau's avatar
      [SPARK-19955][PYSPARK] Jenkins Python Conda based test. · d6ddfdf6
      Holden Karau authored
      ## What changes were proposed in this pull request?
      
      Allow Jenkins Python tests to use the installed conda to test Python 2.7 support & test pip installability.
      
      ## How was this patch tested?
      
      Updated shell scripts, ran tests locally with installed conda, ran tests in Jenkins.
      
      Author: Holden Karau <holden@us.ibm.com>
      
      Closes #17355 from holdenk/SPARK-19955-support-python-tests-with-conda.
      d6ddfdf6
    • jerryshao's avatar
      [SPARK-20059][YARN] Use the correct classloader for HBaseCredentialProvider · c622a87c
      jerryshao authored
      ## What changes were proposed in this pull request?
      
      Currently we use system classloader to find HBase jars, if it is specified by `--jars`, then it will be failed with ClassNotFound issue. So here changing to use child classloader.
      
      Also putting added jars and main jar into classpath of submitted application in yarn cluster mode, otherwise HBase jars specified with `--jars` will never be honored in cluster mode, and fetching tokens in client side will always be failed.
      
      ## How was this patch tested?
      
      Unit test and local verification.
      
      Author: jerryshao <sshao@hortonworks.com>
      
      Closes #17388 from jerryshao/SPARK-20059.
      c622a87c
    • Marcelo Vanzin's avatar
      [SPARK-19556][CORE] Do not encrypt block manager data in memory. · b56ad2b1
      Marcelo Vanzin authored
      This change modifies the way block data is encrypted to make the more
      common cases faster, while penalizing an edge case. As a side effect
      of the change, all data that goes through the block manager is now
      encrypted only when needed, including the previous path (broadcast
      variables) where that did not happen.
      
      The way the change works is by not encrypting data that is stored in
      memory; so if a serialized block is in memory, it will only be encrypted
      once it is evicted to disk.
      
      The penalty comes when transferring that encrypted data from disk. If the
      data ends up in memory again, it is as efficient as before; but if the
      evicted block needs to be transferred directly to a remote executor, then
      there's now a performance penalty, since the code now uses a custom
      FileRegion implementation to decrypt the data before transferring.
      
      This also means that block data transferred between executors now is
      not encrypted (and thus relies on the network library encryption support
      for secrecy). Shuffle blocks are still transferred in encrypted form,
      since they're handled in a slightly different way by the code. This also
      keeps compatibility with existing external shuffle services, which transfer
      encrypted shuffle blocks, and avoids having to make the external service
      aware of encryption at all.
      
      The serialization and deserialization APIs in the SerializerManager now
      do not do encryption automatically; callers need to explicitly wrap their
      streams with an appropriate crypto stream before using those.
      
      As a result of these changes, some of the workarounds added in SPARK-19520
      are removed here.
      
      Testing: a new trait ("EncryptionFunSuite") was added that provides an easy
      way to run a test twice, with encryption on and off; broadcast, block manager
      and caching tests were modified to use this new trait so that the existing
      tests exercise both encrypted and non-encrypted paths. I also ran some
      applications with encryption turned on to verify that they still work,
      including streaming tests that failed without the fix for SPARK-19520.
      
      Author: Marcelo Vanzin <vanzin@cloudera.com>
      
      Closes #17295 from vanzin/SPARK-19556.
      b56ad2b1
    • Reynold Xin's avatar
      [SPARK-20134][SQL] SQLMetrics.postDriverMetricUpdates to simplify driver side metric updates · 9712bd39
      Reynold Xin authored
      ## What changes were proposed in this pull request?
      It is not super intuitive how to update SQLMetric on the driver side. This patch introduces a new SQLMetrics.postDriverMetricUpdates function to do that, and adds documentation to make it more obvious.
      
      ## How was this patch tested?
      Updated a test case to use this method.
      
      Author: Reynold Xin <rxin@databricks.com>
      
      Closes #17464 from rxin/SPARK-20134.
      9712bd39
  3. Mar 28, 2017
    • Bago Amirbekian's avatar
      [SPARK-20040][ML][PYTHON] pyspark wrapper for ChiSquareTest · a5c87707
      Bago Amirbekian authored
      ## What changes were proposed in this pull request?
      
      A pyspark wrapper for spark.ml.stat.ChiSquareTest
      
      ## How was this patch tested?
      
      unit tests
      doctests
      
      Author: Bago Amirbekian <bago@databricks.com>
      
      Closes #17421 from MrBago/chiSquareTestWrapper.
      a5c87707
    • 颜发才(Yan Facai)'s avatar
      [SPARK-20043][ML] DecisionTreeModel: ImpurityCalculator builder fails for... · 7d432af8
      颜发才(Yan Facai) authored
      [SPARK-20043][ML] DecisionTreeModel: ImpurityCalculator builder fails for uppercase impurity type Gini
      
      Fix bug: DecisionTreeModel can't recongnize Impurity "Gini" when loading
      
      TODO:
      + [x] add unit test
      + [x] fix the bug
      
      Author: 颜发才(Yan Facai) <facai.yan@gmail.com>
      
      Closes #17407 from facaiy/BUG/decision_tree_loader_failer_with_Gini_impurity.
      7d432af8
    • liujianhui's avatar
      [SPARK-19868] conflict TasksetManager lead to spark stopped · 92e385e0
      liujianhui authored
      ## What changes were proposed in this pull request?
      
      We must set the taskset to zombie before the DAGScheduler handles the taskEnded event. It's possible the taskEnded event will cause the DAGScheduler to launch a new stage attempt (this happens when map output data was lost), and if this happens before the taskSet has been set to zombie, it will appear that we have conflicting task sets.
      
      Author: liujianhui <liujianhui@didichuxing>
      
      Closes #17208 from liujianhuiouc/spark-19868.
      92e385e0
    • Wenchen Fan's avatar
      [SPARK-20125][SQL] Dataset of type option of map does not work · d4fac410
      Wenchen Fan authored
      ## What changes were proposed in this pull request?
      
      When we build the deserializer expression for map type, we will use `StaticInvoke` to call `ArrayBasedMapData.toScalaMap`, and declare the return type as `scala.collection.immutable.Map`. If the map is inside an Option, we will wrap this `StaticInvoke` with `WrapOption`, which requires the input to be `scala.collect.Map`. Ideally this should be fine, as `scala.collection.immutable.Map` extends `scala.collect.Map`, but our `ObjectType` is too strict about this, this PR fixes it.
      
      ## How was this patch tested?
      
      new regression test
      
      Author: Wenchen Fan <wenchen@databricks.com>
      
      Closes #17454 from cloud-fan/map.
      d4fac410
    • jerryshao's avatar
      [SPARK-19995][YARN] Register tokens to current UGI to avoid re-issuing of... · 17eddb35
      jerryshao authored
      [SPARK-19995][YARN] Register tokens to current UGI to avoid re-issuing of tokens in yarn client mode
      
      ## What changes were proposed in this pull request?
      
      In the current Spark on YARN code, we will obtain tokens from provided services, but we're not going to add these tokens to the current user's credentials. This will make all the following operations to these services still require TGT rather than delegation tokens. This is unnecessary since we already got the tokens, also this will lead to failure in user impersonation scenario, because the TGT is granted by real user, not proxy user.
      
      So here changing to put all the tokens to the current UGI, so that following operations to these services will honor tokens rather than TGT, and this will further handle the proxy user issue mentioned above.
      
      ## How was this patch tested?
      
      Local verified in secure cluster.
      
      vanzin tgravescs mridulm  dongjoon-hyun please help to review, thanks a lot.
      
      Author: jerryshao <sshao@hortonworks.com>
      
      Closes #17335 from jerryshao/SPARK-19995.
      17eddb35
    • Herman van Hovell's avatar
      [SPARK-20126][SQL] Remove HiveSessionState · f82461fc
      Herman van Hovell authored
      ## What changes were proposed in this pull request?
      Commit https://github.com/apache/spark/commit/ea361165e1ddce4d8aa0242ae3e878d7b39f1de2 moved most of the logic from the SessionState classes into an accompanying builder. This makes the existence of the `HiveSessionState` redundant. This PR removes the `HiveSessionState`.
      
      ## How was this patch tested?
      Existing tests.
      
      Author: Herman van Hovell <hvanhovell@databricks.com>
      
      Closes #17457 from hvanhovell/SPARK-20126.
      f82461fc
    • wangzhenhua's avatar
      [SPARK-20124][SQL] Join reorder should keep the same order of final project attributes · 4fcc214d
      wangzhenhua authored
      ## What changes were proposed in this pull request?
      
      Join reorder algorithm should keep exactly the same order of output attributes in the top project.
      For example, if user want to select a, b, c, after reordering, we should output a, b, c in the same order as specified by user, instead of b, a, c or other orders.
      
      ## How was this patch tested?
      
      A new test case is added in `JoinReorderSuite`.
      
      Author: wangzhenhua <wangzhenhua@huawei.com>
      
      Closes #17453 from wzhfy/keepOrderInProject.
      4fcc214d
    • wangzhenhua's avatar
      [SPARK-20094][SQL] Preventing push down of IN subquery to Join operator · 91559d27
      wangzhenhua authored
      ## What changes were proposed in this pull request?
      
      TPCDS q45 fails becuase:
      `ReorderJoin` collects all predicates and try to put them into join condition when creating ordered join. If a predicate with an IN subquery (`ListQuery`) is in a join condition instead of a filter condition, `RewritePredicateSubquery.rewriteExistentialExpr` would fail to convert the subquery to an `ExistenceJoin`, and thus result in error.
      
      We should prevent push down of IN subquery to Join operator.
      
      ## How was this patch tested?
      
      Add a new test case in `FilterPushdownSuite`.
      
      Author: wangzhenhua <wangzhenhua@huawei.com>
      
      Closes #17428 from wzhfy/noSubqueryInJoinCond.
      91559d27
    • Xiao Li's avatar
      [SPARK-20119][TEST-MAVEN] Fix the test case fail in DataSourceScanExecRedactionSuite · a9abff28
      Xiao Li authored
      ### What changes were proposed in this pull request?
      Changed the pattern to match the first n characters in the location field so that the string truncation does not affect it.
      
      ### How was this patch tested?
      N/A
      
      Author: Xiao Li <gatorsmile@gmail.com>
      
      Closes #17448 from gatorsmile/fixTestCAse.
      a9abff28
  4. Mar 27, 2017
    • Michal Senkyr's avatar
      [SPARK-19088][SQL] Optimize sequence type deserialization codegen · 6c70a38c
      Michal Senkyr authored
      ## What changes were proposed in this pull request?
      
      Optimization of arbitrary Scala sequence deserialization introduced by #16240.
      
      The previous implementation constructed an array which was then converted by `to`. This required two passes in most cases.
      
      This implementation attempts to remedy that by using `Builder`s provided by the `newBuilder` method on every Scala collection's companion object to build the resulting collection directly.
      
      Example codegen for simple `List` (obtained using `Seq(List(1)).toDS().map(identity).queryExecution.debug.codegen`):
      
      Before:
      
      ```
      /* 001 */ public Object generate(Object[] references) {
      /* 002 */   return new GeneratedIterator(references);
      /* 003 */ }
      /* 004 */
      /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
      /* 006 */   private Object[] references;
      /* 007 */   private scala.collection.Iterator[] inputs;
      /* 008 */   private scala.collection.Iterator inputadapter_input;
      /* 009 */   private boolean deserializetoobject_resultIsNull;
      /* 010 */   private java.lang.Object[] deserializetoobject_argValue;
      /* 011 */   private boolean MapObjects_loopIsNull1;
      /* 012 */   private int MapObjects_loopValue0;
      /* 013 */   private boolean deserializetoobject_resultIsNull1;
      /* 014 */   private scala.collection.generic.CanBuildFrom deserializetoobject_argValue1;
      /* 015 */   private UnsafeRow deserializetoobject_result;
      /* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder;
      /* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter;
      /* 018 */   private scala.collection.immutable.List mapelements_argValue;
      /* 019 */   private UnsafeRow mapelements_result;
      /* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder;
      /* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter;
      /* 022 */   private scala.collection.immutable.List serializefromobject_argValue;
      /* 023 */   private UnsafeRow serializefromobject_result;
      /* 024 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
      /* 025 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
      /* 026 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter;
      /* 027 */
      /* 028 */   public GeneratedIterator(Object[] references) {
      /* 029 */     this.references = references;
      /* 030 */   }
      /* 031 */
      /* 032 */   public void init(int index, scala.collection.Iterator[] inputs) {
      /* 033 */     partitionIndex = index;
      /* 034 */     this.inputs = inputs;
      /* 035 */     inputadapter_input = inputs[0];
      /* 036 */
      /* 037 */     deserializetoobject_result = new UnsafeRow(1);
      /* 038 */     this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32);
      /* 039 */     this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1);
      /* 040 */
      /* 041 */     mapelements_result = new UnsafeRow(1);
      /* 042 */     this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32);
      /* 043 */     this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1);
      /* 044 */
      /* 045 */     serializefromobject_result = new UnsafeRow(1);
      /* 046 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32);
      /* 047 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
      /* 048 */     this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
      /* 049 */
      /* 050 */   }
      /* 051 */
      /* 052 */   protected void processNext() throws java.io.IOException {
      /* 053 */     while (inputadapter_input.hasNext() && !stopEarly()) {
      /* 054 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
      /* 055 */       ArrayData inputadapter_value = inputadapter_row.getArray(0);
      /* 056 */
      /* 057 */       deserializetoobject_resultIsNull = false;
      /* 058 */
      /* 059 */       if (!deserializetoobject_resultIsNull) {
      /* 060 */         ArrayData deserializetoobject_value3 = null;
      /* 061 */
      /* 062 */         if (!false) {
      /* 063 */           Integer[] deserializetoobject_convertedArray = null;
      /* 064 */           int deserializetoobject_dataLength = inputadapter_value.numElements();
      /* 065 */           deserializetoobject_convertedArray = new Integer[deserializetoobject_dataLength];
      /* 066 */
      /* 067 */           int deserializetoobject_loopIndex = 0;
      /* 068 */           while (deserializetoobject_loopIndex < deserializetoobject_dataLength) {
      /* 069 */             MapObjects_loopValue0 = (int) (inputadapter_value.getInt(deserializetoobject_loopIndex));
      /* 070 */             MapObjects_loopIsNull1 = inputadapter_value.isNullAt(deserializetoobject_loopIndex);
      /* 071 */
      /* 072 */             if (MapObjects_loopIsNull1) {
      /* 073 */               throw new RuntimeException(((java.lang.String) references[0]));
      /* 074 */             }
      /* 075 */             if (false) {
      /* 076 */               deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null;
      /* 077 */             } else {
      /* 078 */               deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue0;
      /* 079 */             }
      /* 080 */
      /* 081 */             deserializetoobject_loopIndex += 1;
      /* 082 */           }
      /* 083 */
      /* 084 */           deserializetoobject_value3 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray);
      /* 085 */         }
      /* 086 */         boolean deserializetoobject_isNull2 = true;
      /* 087 */         java.lang.Object[] deserializetoobject_value2 = null;
      /* 088 */         if (!false) {
      /* 089 */           deserializetoobject_isNull2 = false;
      /* 090 */           if (!deserializetoobject_isNull2) {
      /* 091 */             Object deserializetoobject_funcResult = null;
      /* 092 */             deserializetoobject_funcResult = deserializetoobject_value3.array();
      /* 093 */             if (deserializetoobject_funcResult == null) {
      /* 094 */               deserializetoobject_isNull2 = true;
      /* 095 */             } else {
      /* 096 */               deserializetoobject_value2 = (java.lang.Object[]) deserializetoobject_funcResult;
      /* 097 */             }
      /* 098 */
      /* 099 */           }
      /* 100 */           deserializetoobject_isNull2 = deserializetoobject_value2 == null;
      /* 101 */         }
      /* 102 */         deserializetoobject_resultIsNull = deserializetoobject_isNull2;
      /* 103 */         deserializetoobject_argValue = deserializetoobject_value2;
      /* 104 */       }
      /* 105 */
      /* 106 */       boolean deserializetoobject_isNull1 = deserializetoobject_resultIsNull;
      /* 107 */       final scala.collection.Seq deserializetoobject_value1 = deserializetoobject_resultIsNull ? null : scala.collection.mutable.WrappedArray.make(deserializetoobject_argValue);
      /* 108 */       deserializetoobject_isNull1 = deserializetoobject_value1 == null;
      /* 109 */       boolean deserializetoobject_isNull = true;
      /* 110 */       scala.collection.immutable.List deserializetoobject_value = null;
      /* 111 */       if (!deserializetoobject_isNull1) {
      /* 112 */         deserializetoobject_resultIsNull1 = false;
      /* 113 */
      /* 114 */         if (!deserializetoobject_resultIsNull1) {
      /* 115 */           boolean deserializetoobject_isNull6 = false;
      /* 116 */           final scala.collection.generic.CanBuildFrom deserializetoobject_value6 = false ? null : scala.collection.immutable.List.canBuildFrom();
      /* 117 */           deserializetoobject_isNull6 = deserializetoobject_value6 == null;
      /* 118 */           deserializetoobject_resultIsNull1 = deserializetoobject_isNull6;
      /* 119 */           deserializetoobject_argValue1 = deserializetoobject_value6;
      /* 120 */         }
      /* 121 */
      /* 122 */         deserializetoobject_isNull = deserializetoobject_resultIsNull1;
      /* 123 */         if (!deserializetoobject_isNull) {
      /* 124 */           Object deserializetoobject_funcResult1 = null;
      /* 125 */           deserializetoobject_funcResult1 = deserializetoobject_value1.to(deserializetoobject_argValue1);
      /* 126 */           if (deserializetoobject_funcResult1 == null) {
      /* 127 */             deserializetoobject_isNull = true;
      /* 128 */           } else {
      /* 129 */             deserializetoobject_value = (scala.collection.immutable.List) deserializetoobject_funcResult1;
      /* 130 */           }
      /* 131 */
      /* 132 */         }
      /* 133 */         deserializetoobject_isNull = deserializetoobject_value == null;
      /* 134 */       }
      /* 135 */
      /* 136 */       boolean mapelements_isNull = true;
      /* 137 */       scala.collection.immutable.List mapelements_value = null;
      /* 138 */       if (!false) {
      /* 139 */         mapelements_argValue = deserializetoobject_value;
      /* 140 */
      /* 141 */         mapelements_isNull = false;
      /* 142 */         if (!mapelements_isNull) {
      /* 143 */           Object mapelements_funcResult = null;
      /* 144 */           mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue);
      /* 145 */           if (mapelements_funcResult == null) {
      /* 146 */             mapelements_isNull = true;
      /* 147 */           } else {
      /* 148 */             mapelements_value = (scala.collection.immutable.List) mapelements_funcResult;
      /* 149 */           }
      /* 150 */
      /* 151 */         }
      /* 152 */         mapelements_isNull = mapelements_value == null;
      /* 153 */       }
      /* 154 */
      /* 155 */       if (mapelements_isNull) {
      /* 156 */         throw new RuntimeException(((java.lang.String) references[2]));
      /* 157 */       }
      /* 158 */       serializefromobject_argValue = mapelements_value;
      /* 159 */
      /* 160 */       final ArrayData serializefromobject_value = false ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue);
      /* 161 */       serializefromobject_holder.reset();
      /* 162 */
      /* 163 */       // Remember the current cursor so that we can calculate how many bytes are
      /* 164 */       // written later.
      /* 165 */       final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
      /* 166 */
      /* 167 */       if (serializefromobject_value instanceof UnsafeArrayData) {
      /* 168 */         final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
      /* 169 */         // grow the global buffer before writing data.
      /* 170 */         serializefromobject_holder.grow(serializefromobject_sizeInBytes);
      /* 171 */         ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
      /* 172 */         serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
      /* 173 */
      /* 174 */       } else {
      /* 175 */         final int serializefromobject_numElements = serializefromobject_value.numElements();
      /* 176 */         serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4);
      /* 177 */
      /* 178 */         for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) {
      /* 179 */           if (serializefromobject_value.isNullAt(serializefromobject_index)) {
      /* 180 */             serializefromobject_arrayWriter.setNullInt(serializefromobject_index);
      /* 181 */           } else {
      /* 182 */             final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index);
      /* 183 */             serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element);
      /* 184 */           }
      /* 185 */         }
      /* 186 */       }
      /* 187 */
      /* 188 */       serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
      /* 189 */       serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
      /* 190 */       append(serializefromobject_result);
      /* 191 */       if (shouldStop()) return;
      /* 192 */     }
      /* 193 */   }
      /* 194 */ }
      ```
      
      After:
      
      ```
      /* 001 */ public Object generate(Object[] references) {
      /* 002 */   return new GeneratedIterator(references);
      /* 003 */ }
      /* 004 */
      /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
      /* 006 */   private Object[] references;
      /* 007 */   private scala.collection.Iterator[] inputs;
      /* 008 */   private scala.collection.Iterator inputadapter_input;
      /* 009 */   private boolean CollectObjects_loopIsNull1;
      /* 010 */   private int CollectObjects_loopValue0;
      /* 011 */   private UnsafeRow deserializetoobject_result;
      /* 012 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder;
      /* 013 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter;
      /* 014 */   private scala.collection.immutable.List mapelements_argValue;
      /* 015 */   private UnsafeRow mapelements_result;
      /* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder;
      /* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter;
      /* 018 */   private scala.collection.immutable.List serializefromobject_argValue;
      /* 019 */   private UnsafeRow serializefromobject_result;
      /* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
      /* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
      /* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter;
      /* 023 */
      /* 024 */   public GeneratedIterator(Object[] references) {
      /* 025 */     this.references = references;
      /* 026 */   }
      /* 027 */
      /* 028 */   public void init(int index, scala.collection.Iterator[] inputs) {
      /* 029 */     partitionIndex = index;
      /* 030 */     this.inputs = inputs;
      /* 031 */     inputadapter_input = inputs[0];
      /* 032 */
      /* 033 */     deserializetoobject_result = new UnsafeRow(1);
      /* 034 */     this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32);
      /* 035 */     this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1);
      /* 036 */
      /* 037 */     mapelements_result = new UnsafeRow(1);
      /* 038 */     this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32);
      /* 039 */     this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1);
      /* 040 */
      /* 041 */     serializefromobject_result = new UnsafeRow(1);
      /* 042 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32);
      /* 043 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
      /* 044 */     this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
      /* 045 */
      /* 046 */   }
      /* 047 */
      /* 048 */   protected void processNext() throws java.io.IOException {
      /* 049 */     while (inputadapter_input.hasNext() && !stopEarly()) {
      /* 050 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
      /* 051 */       ArrayData inputadapter_value = inputadapter_row.getArray(0);
      /* 052 */
      /* 053 */       scala.collection.immutable.List deserializetoobject_value = null;
      /* 054 */
      /* 055 */       if (!false) {
      /* 056 */         int deserializetoobject_dataLength = inputadapter_value.numElements();
      /* 057 */         scala.collection.mutable.Builder CollectObjects_builderValue2 = scala.collection.immutable.List$.MODULE$.newBuilder();
      /* 058 */         CollectObjects_builderValue2.sizeHint(deserializetoobject_dataLength);
      /* 059 */
      /* 060 */         int deserializetoobject_loopIndex = 0;
      /* 061 */         while (deserializetoobject_loopIndex < deserializetoobject_dataLength) {
      /* 062 */           CollectObjects_loopValue0 = (int) (inputadapter_value.getInt(deserializetoobject_loopIndex));
      /* 063 */           CollectObjects_loopIsNull1 = inputadapter_value.isNullAt(deserializetoobject_loopIndex);
      /* 064 */
      /* 065 */           if (CollectObjects_loopIsNull1) {
      /* 066 */             throw new RuntimeException(((java.lang.String) references[0]));
      /* 067 */           }
      /* 068 */           if (false) {
      /* 069 */             CollectObjects_builderValue2.$plus$eq(null);
      /* 070 */           } else {
      /* 071 */             CollectObjects_builderValue2.$plus$eq(CollectObjects_loopValue0);
      /* 072 */           }
      /* 073 */
      /* 074 */           deserializetoobject_loopIndex += 1;
      /* 075 */         }
      /* 076 */
      /* 077 */         deserializetoobject_value = (scala.collection.immutable.List) CollectObjects_builderValue2.result();
      /* 078 */       }
      /* 079 */
      /* 080 */       boolean mapelements_isNull = true;
      /* 081 */       scala.collection.immutable.List mapelements_value = null;
      /* 082 */       if (!false) {
      /* 083 */         mapelements_argValue = deserializetoobject_value;
      /* 084 */
      /* 085 */         mapelements_isNull = false;
      /* 086 */         if (!mapelements_isNull) {
      /* 087 */           Object mapelements_funcResult = null;
      /* 088 */           mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue);
      /* 089 */           if (mapelements_funcResult == null) {
      /* 090 */             mapelements_isNull = true;
      /* 091 */           } else {
      /* 092 */             mapelements_value = (scala.collection.immutable.List) mapelements_funcResult;
      /* 093 */           }
      /* 094 */
      /* 095 */         }
      /* 096 */         mapelements_isNull = mapelements_value == null;
      /* 097 */       }
      /* 098 */
      /* 099 */       if (mapelements_isNull) {
      /* 100 */         throw new RuntimeException(((java.lang.String) references[2]));
      /* 101 */       }
      /* 102 */       serializefromobject_argValue = mapelements_value;
      /* 103 */
      /* 104 */       final ArrayData serializefromobject_value = false ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue);
      /* 105 */       serializefromobject_holder.reset();
      /* 106 */
      /* 107 */       // Remember the current cursor so that we can calculate how many bytes are
      /* 108 */       // written later.
      /* 109 */       final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
      /* 110 */
      /* 111 */       if (serializefromobject_value instanceof UnsafeArrayData) {
      /* 112 */         final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
      /* 113 */         // grow the global buffer before writing data.
      /* 114 */         serializefromobject_holder.grow(serializefromobject_sizeInBytes);
      /* 115 */         ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
      /* 116 */         serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
      /* 117 */
      /* 118 */       } else {
      /* 119 */         final int serializefromobject_numElements = serializefromobject_value.numElements();
      /* 120 */         serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4);
      /* 121 */
      /* 122 */         for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) {
      /* 123 */           if (serializefromobject_value.isNullAt(serializefromobject_index)) {
      /* 124 */             serializefromobject_arrayWriter.setNullInt(serializefromobject_index);
      /* 125 */           } else {
      /* 126 */             final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index);
      /* 127 */             serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element);
      /* 128 */           }
      /* 129 */         }
      /* 130 */       }
      /* 131 */
      /* 132 */       serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
      /* 133 */       serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
      /* 134 */       append(serializefromobject_result);
      /* 135 */       if (shouldStop()) return;
      /* 136 */     }
      /* 137 */   }
      /* 138 */ }
      ```
      
      Benchmark results before:
      
      ```
      OpenJDK 64-Bit Server VM 1.8.0_112-b15 on Linux 4.8.13-1-ARCH
      AMD A10-4600M APU with Radeon(tm) HD Graphics
      collect:                                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
      ------------------------------------------------------------------------------------------------
      Seq                                            269 /  370          0.0      269125.8       1.0X
      List                                           154 /  176          0.0      154453.5       1.7X
      mutable.Queue                                  210 /  233          0.0      209691.6       1.3X
      ```
      
      Benchmark results after:
      
      ```
      OpenJDK 64-Bit Server VM 1.8.0_112-b15 on Linux 4.8.13-1-ARCH
      AMD A10-4600M APU with Radeon(tm) HD Graphics
      collect:                                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
      ------------------------------------------------------------------------------------------------
      Seq                                            255 /  316          0.0      254697.3       1.0X
      List                                           152 /  177          0.0      152410.0       1.7X
      mutable.Queue                                  213 /  235          0.0      213470.0       1.2X
      ```
      
      ## How was this patch tested?
      
      ```bash
      ./build/mvn -DskipTests clean package && ./dev/run-tests
      ```
      
      Additionally in Spark Shell:
      
      ```scala
      case class QueueClass(q: scala.collection.immutable.Queue[Int])
      
      spark.createDataset(Seq(List(1,2,3))).map(x => QueueClass(scala.collection.immutable.Queue(x: _*))).map(_.q.dequeue).collect
      ```
      
      Author: Michal Senkyr <mike.senkyr@gmail.com>
      
      Closes #16541 from michalsenkyr/dataset-seq-builder.
      6c70a38c
    • Herman van Hovell's avatar
      [SPARK-20100][SQL] Refactor SessionState initialization · ea361165
      Herman van Hovell authored
      ## What changes were proposed in this pull request?
      The current SessionState initialization code path is quite complex. A part of the creation is done in the SessionState companion objects, a part of the creation is one inside the SessionState class, and a part is done by passing functions.
      
      This PR refactors this code path, and consolidates SessionState initialization into a builder class. This SessionState will not do any initialization and just becomes a place holder for the various Spark SQL internals. This also lays the ground work for two future improvements:
      
      1. This provides us with a start for removing the `HiveSessionState`. Removing the `HiveSessionState` would also require us to move resource loading into a separate class, and to (re)move metadata hive.
      2. This makes it easier to customize the Spark Session. Currently you will need to create a custom version of the builder. I have added hooks to facilitate this. A future step will be to create a semi stable API on top of this.
      
      ## How was this patch tested?
      Existing tests.
      
      Author: Herman van Hovell <hvanhovell@databricks.com>
      
      Closes #17433 from hvanhovell/SPARK-20100.
      ea361165
    • Tathagata Das's avatar
      [SPARK-19876][SS] Follow up: Refactored BatchCommitLog to simplify logic · 8a6f33f0
      Tathagata Das authored
      ## What changes were proposed in this pull request?
      
      Existing logic seemingly writes null to the BatchCommitLog, even though it does additional checks to write '{}' (valid json) to the log. This PR simplifies the logic by disallowing use of `log.add(batchId, metadata)` and instead using `log.add(batchId)`. No question of specifying metadata, so no confusion related to null.
      
      ## How was this patch tested?
      Existing tests pass.
      
      Author: Tathagata Das <tathagata.das1565@gmail.com>
      
      Closes #17444 from tdas/SPARK-19876-1.
      8a6f33f0
    • Shubham Chopra's avatar
      [SPARK-19803][CORE][TEST] Proactive replication test failures · a250933c
      Shubham Chopra authored
      ## What changes were proposed in this pull request?
      Executors cache a list of their peers that is refreshed by default every minute. The cached stale references were randomly being used for replication. Since those executors were removed from the master, they did not occur in the block locations as reported by the master. This was fixed by
      1. Refreshing peer cache in the block manager before trying to pro-actively replicate. This way the probability of replicating to a failed executor is eliminated.
      2. Explicitly stopping the block manager in the tests. This shuts down the RPC endpoint use by the block manager. This way, even if a block manager tries to replicate using a stale reference, the replication logic should take care of refreshing the list of peers after failure.
      
      ## How was this patch tested?
      Tested manually
      
      Author: Shubham Chopra <schopra31@bloomberg.net>
      Author: Kay Ousterhout <kayousterhout@gmail.com>
      Author: Shubham Chopra <shubhamchopra@users.noreply.github.com>
      
      Closes #17325 from shubhamchopra/SPARK-19803.
      a250933c
    • Yanbo Liang's avatar
      [MINOR][SPARKR] Move 'Data type mapping between R and Spark' to right place in SparkR doc. · 1d00761b
      Yanbo Liang authored
      Section ```Data type mapping between R and Spark``` was put in the wrong place in SparkR doc currently, we should move it to a separate section.
      
      ## What changes were proposed in this pull request?
      Before this PR:
      ![image](https://cloud.githubusercontent.com/assets/1962026/24340911/bc01a532-126a-11e7-9a08-0d60d13a547c.png)
      
      After this PR:
      ![image](https://cloud.githubusercontent.com/assets/1962026/24340938/d9d32a9a-126a-11e7-8891-d2f5b46e0c71.png)
      
      Author: Yanbo Liang <ybliang8@gmail.com>
      
      Closes #17440 from yanboliang/sparkr-doc.
      1d00761b
    • hyukjinkwon's avatar
      [SPARK-20105][TESTS][R] Add tests for checkType and type string in structField in R · 3fada2f5
      hyukjinkwon authored
      ## What changes were proposed in this pull request?
      
      It seems `checkType` and the type string in `structField` are not being tested closely. This string format currently seems SparkR-specific (see https://github.com/apache/spark/blob/d1f6c64c4b763c05d6d79ae5497f298dc3835f3e/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala#L93-L131) but resembles SQL type definition.
      
      Therefore, it seems nicer if we test positive/negative cases in R side.
      
      ## How was this patch tested?
      
      Unit tests in `test_sparkSQL.R`.
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #17439 from HyukjinKwon/r-typestring-tests.
      3fada2f5
    • Josh Rosen's avatar
      [SPARK-20102] Fix nightly packaging and RC packaging scripts w/ two minor build fixes · 314cf51d
      Josh Rosen authored
      ## What changes were proposed in this pull request?
      
      The master snapshot publisher builds are currently broken due to two minor build issues:
      
      1. For unknown reasons, the LFTP `mkdir -p` command began throwing errors when the remote directory already exists. This change of behavior might have been caused by configuration changes in the ASF's SFTP server, but I'm not entirely sure of that. To work around this problem, this patch updates the script to ignore errors from the `lftp mkdir -p` commands.
      2. The PySpark `setup.py` file references a non-existent `pyspark.ml.stat` module, causing Python packaging to fail by complaining about a missing directory. The fix is to simply drop that line from the setup script.
      
      ## How was this patch tested?
      
      The LFTP fix was tested by manually running the failing commands on AMPLab Jenkins against the ASF SFTP server. The PySpark fix was tested locally.
      
      Author: Josh Rosen <joshrosen@databricks.com>
      
      Closes #17437 from JoshRosen/spark-20102.
      314cf51d
    • Hossein's avatar
      [SPARK-20088] Do not create new SparkContext in SparkR createSparkContext · 0588dc7c
      Hossein authored
      ## What changes were proposed in this pull request?
      Instead of creating new `JavaSparkContext` we use `SparkContext.getOrCreate`.
      
      ## How was this patch tested?
      Existing tests
      
      Author: Hossein <hossein@databricks.com>
      
      Closes #17423 from falaki/SPARK-20088.
      0588dc7c
    • wangzhenhua's avatar
      [SPARK-20104][SQL] Don't estimate IsNull or IsNotNull predicates for non-leaf node · 89049345
      wangzhenhua authored
      ## What changes were proposed in this pull request?
      
      In current stage, we don't have advanced statistics such as sketches or histograms. As a result, some operator can't estimate `nullCount` accurately. E.g. left outer join estimation does not accurately update `nullCount` currently. So for `IsNull` and `IsNotNull` predicates, we only estimate them when the child is a leaf node, whose `nullCount` is accurate.
      
      ## How was this patch tested?
      
      A new test case is added in `FilterEstimationSuite`.
      
      Author: wangzhenhua <wangzhenhua@huawei.com>
      
      Closes #17438 from wzhfy/nullEstimation.
      89049345
  5. Mar 26, 2017
    • hyukjinkwon's avatar
      [MINOR][DOCS] Match several documentation changes in Scala to R/Python · 3fbf0a5f
      hyukjinkwon authored
      ## What changes were proposed in this pull request?
      
      This PR proposes to match minor documentations changes in https://github.com/apache/spark/pull/17399 and https://github.com/apache/spark/pull/17380 to R/Python.
      
      ## How was this patch tested?
      
      Manual tests in Python , Python tests via `./python/run-tests.py --module=pyspark-sql` and lint-checks for Python/R.
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #17429 from HyukjinKwon/minor-match-doc.
      3fbf0a5f
    • zero323's avatar
      [SPARK-19281][PYTHON][ML] spark.ml Python API for FPGrowth · 0bc8847a
      zero323 authored
      ## What changes were proposed in this pull request?
      
      - Add `HasSupport` and `HasConfidence` `Params`.
      - Add new module `pyspark.ml.fpm`.
      - Add `FPGrowth` / `FPGrowthModel` wrappers.
      - Provide tests for new features.
      
      ## How was this patch tested?
      
      Unit tests.
      
      Author: zero323 <zero323@users.noreply.github.com>
      
      Closes #17218 from zero323/SPARK-19281.
      0bc8847a
    • Herman van Hovell's avatar
      [SPARK-20086][SQL] CollapseWindow should not collapse dependent adjacent windows · 617ab644
      Herman van Hovell authored
      ## What changes were proposed in this pull request?
      The `CollapseWindow` is currently to aggressive when collapsing adjacent windows. It also collapses windows in the which the parent produces a column that is consumed by the child; this creates an invalid window which will fail at runtime.
      
      This PR fixes this by adding a check for dependent adjacent windows to the `CollapseWindow` rule.
      
      ## How was this patch tested?
      Added a new test case to `CollapseWindowSuite`
      
      Author: Herman van Hovell <hvanhovell@databricks.com>
      
      Closes #17432 from hvanhovell/SPARK-20086.
      617ab644
    • Juan Rodriguez Hortala's avatar
      logging improvements · 362ee932
      Juan Rodriguez Hortala authored
      ## What changes were proposed in this pull request?
      Adding additional information to existing logging messages:
        - YarnAllocator: log the executor ID together with the container id when a container for an executor is launched.
        - NettyRpcEnv: log the receiver address when there is a timeout waiting for an answer to a remote call.
        - ExecutorAllocationManager: fix a typo in the logging message for the list of executors to be removed.
      
      ## How was this patch tested?
      Build spark and submit the word count example to a YARN cluster using cluster mode
      
      Author: Juan Rodriguez Hortala <hortala@amazon.com>
      
      Closes #17411 from juanrh/logging-improvements.
      362ee932
    • Kazuaki Ishizaki's avatar
      [SPARK-20046][SQL] Facilitate loop optimizations in a JIT compiler regarding... · 93bb0b91
      Kazuaki Ishizaki authored
      [SPARK-20046][SQL] Facilitate loop optimizations in a JIT compiler regarding sqlContext.read.parquet()
      
      ## What changes were proposed in this pull request?
      
      This PR improves performance of operations with `sqlContext.read.parquet()` by changing Java code generated by Catalyst. This PR is inspired by [the blog article](https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html) and [this stackoverflow entry](http://stackoverflow.com/questions/40629435/fast-parquet-row-count-in-spark).
      
      This PR changes generated code in the following two points.
      1. Replace a while-loop with long instance variables a for-loop with int local variables
      2. Suppress generation of `shouldStop()` method if this method is unnecessary (e.g. `append()` is not generated).
      
      These points facilitates compiler optimizations in a JIT compiler by feeding the simplified Java code into the JIT compiler. The performance of `sqlContext.read.parquet().count` is improved by 1.09x.
      
      Benchmark program:
      ```java
      val dir = "/dev/shm/parquet"
      val N = 1000 * 1000 * 40
      val iters = 20
      val benchmark = new Benchmark("Parquet", N * iters, minNumIters = 5, warmupTime = 30.seconds)
      sparkSession.range(n).write.mode("overwrite").parquet(dir)
      
      benchmark.addCase("count") { i: Int =>
        var n = 0
        var len = 0L
        while (n < iters) {
          len += sparkSession.read.parquet(dir).count
          n += 1
        }
      }
      benchmark.run
      ```
      
      Performance result without this PR
      ```
      OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic
      Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
      Parquet:                                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
      ------------------------------------------------------------------------------------------------
      w/o this PR                                   1152 / 1211        694.7           1.4       1.0X
      ```
      
      Performance result with this PR
      ```
      OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic
      Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
      Parquet:                                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
      ------------------------------------------------------------------------------------------------
      with this PR                                  1053 / 1121        760.0           1.3       1.0X
      ```
      
      Here is a comparison between generated code w/o and with this PR. Only the method ```agg_doAggregateWithoutKey``` is changed.
      
      Generated code without this PR
      ```java
      /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
      /* 006 */   private Object[] references;
      /* 007 */   private scala.collection.Iterator[] inputs;
      /* 008 */   private boolean agg_initAgg;
      /* 009 */   private boolean agg_bufIsNull;
      /* 010 */   private long agg_bufValue;
      /* 011 */   private scala.collection.Iterator scan_input;
      /* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
      /* 013 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime;
      /* 014 */   private long scan_scanTime1;
      /* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch;
      /* 016 */   private int scan_batchIdx;
      /* 017 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows;
      /* 018 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
      /* 019 */   private UnsafeRow agg_result;
      /* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
      /* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
      /* 022 */
      /* 023 */   public GeneratedIterator(Object[] references) {
      /* 024 */     this.references = references;
      /* 025 */   }
      /* 026 */
      /* 027 */   public void init(int index, scala.collection.Iterator[] inputs) {
      /* 028 */     partitionIndex = index;
      /* 029 */     this.inputs = inputs;
      /* 030 */     agg_initAgg = false;
      /* 031 */
      /* 032 */     scan_input = inputs[0];
      /* 033 */     this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
      /* 034 */     this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
      /* 035 */     scan_scanTime1 = 0;
      /* 036 */     scan_batch = null;
      /* 037 */     scan_batchIdx = 0;
      /* 038 */     this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
      /* 039 */     this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
      /* 040 */     agg_result = new UnsafeRow(1);
      /* 041 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
      /* 042 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
      /* 043 */
      /* 044 */   }
      /* 045 */
      /* 046 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
      /* 047 */     // initialize aggregation buffer
      /* 048 */     agg_bufIsNull = false;
      /* 049 */     agg_bufValue = 0L;
      /* 050 */
      /* 051 */     if (scan_batch == null) {
      /* 052 */       scan_nextBatch();
      /* 053 */     }
      /* 054 */     while (scan_batch != null) {
      /* 055 */       int numRows = scan_batch.numRows();
      /* 056 */       while (scan_batchIdx < numRows) {
      /* 057 */         int scan_rowIdx = scan_batchIdx++;
      /* 058 */         // do aggregate
      /* 059 */         // common sub-expressions
      /* 060 */
      /* 061 */         // evaluate aggregate function
      /* 062 */         boolean agg_isNull1 = false;
      /* 063 */
      /* 064 */         long agg_value1 = -1L;
      /* 065 */         agg_value1 = agg_bufValue + 1L;
      /* 066 */         // update aggregation buffer
      /* 067 */         agg_bufIsNull = false;
      /* 068 */         agg_bufValue = agg_value1;
      /* 069 */         if (shouldStop()) return;
      /* 070 */       }
      /* 071 */       scan_batch = null;
      /* 072 */       scan_nextBatch();
      /* 073 */     }
      /* 074 */     scan_scanTime.add(scan_scanTime1 / (1000 * 1000));
      /* 075 */     scan_scanTime1 = 0;
      /* 076 */
      /* 077 */   }
      /* 078 */
      /* 079 */   private void scan_nextBatch() throws java.io.IOException {
      /* 080 */     long getBatchStart = System.nanoTime();
      /* 081 */     if (scan_input.hasNext()) {
      /* 082 */       scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
      /* 083 */       scan_numOutputRows.add(scan_batch.numRows());
      /* 084 */       scan_batchIdx = 0;
      /* 085 */
      /* 086 */     }
      /* 087 */     scan_scanTime1 += System.nanoTime() - getBatchStart;
      /* 088 */   }
      /* 089 */
      /* 090 */   protected void processNext() throws java.io.IOException {
      /* 091 */     while (!agg_initAgg) {
      /* 092 */       agg_initAgg = true;
      /* 093 */       long agg_beforeAgg = System.nanoTime();
      /* 094 */       agg_doAggregateWithoutKey();
      /* 095 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
      /* 096 */
      /* 097 */       // output the result
      /* 098 */
      /* 099 */       agg_numOutputRows.add(1);
      /* 100 */       agg_rowWriter.zeroOutNullBytes();
      /* 101 */
      /* 102 */       if (agg_bufIsNull) {
      /* 103 */         agg_rowWriter.setNullAt(0);
      /* 104 */       } else {
      /* 105 */         agg_rowWriter.write(0, agg_bufValue);
      /* 106 */       }
      /* 107 */       append(agg_result);
      /* 108 */     }
      /* 109 */   }
      /* 110 */ }
      ```
      
      Generated code with this PR
      ```java
      /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
      /* 006 */   private Object[] references;
      /* 007 */   private scala.collection.Iterator[] inputs;
      /* 008 */   private boolean agg_initAgg;
      /* 009 */   private boolean agg_bufIsNull;
      /* 010 */   private long agg_bufValue;
      /* 011 */   private scala.collection.Iterator scan_input;
      /* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
      /* 013 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime;
      /* 014 */   private long scan_scanTime1;
      /* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch;
      /* 016 */   private int scan_batchIdx;
      /* 017 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows;
      /* 018 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
      /* 019 */   private UnsafeRow agg_result;
      /* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
      /* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
      /* 022 */
      /* 023 */   public GeneratedIterator(Object[] references) {
      /* 024 */     this.references = references;
      /* 025 */   }
      /* 026 */
      /* 027 */   public void init(int index, scala.collection.Iterator[] inputs) {
      /* 028 */     partitionIndex = index;
      /* 029 */     this.inputs = inputs;
      /* 030 */     agg_initAgg = false;
      /* 031 */
      /* 032 */     scan_input = inputs[0];
      /* 033 */     this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
      /* 034 */     this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
      /* 035 */     scan_scanTime1 = 0;
      /* 036 */     scan_batch = null;
      /* 037 */     scan_batchIdx = 0;
      /* 038 */     this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
      /* 039 */     this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
      /* 040 */     agg_result = new UnsafeRow(1);
      /* 041 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
      /* 042 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
      /* 043 */
      /* 044 */   }
      /* 045 */
      /* 046 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
      /* 047 */     // initialize aggregation buffer
      /* 048 */     agg_bufIsNull = false;
      /* 049 */     agg_bufValue = 0L;
      /* 050 */
      /* 051 */     if (scan_batch == null) {
      /* 052 */       scan_nextBatch();
      /* 053 */     }
      /* 054 */     while (scan_batch != null) {
      /* 055 */       int numRows = scan_batch.numRows();
      /* 056 */       int scan_localEnd = numRows - scan_batchIdx;
      /* 057 */       for (int scan_localIdx = 0; scan_localIdx < scan_localEnd; scan_localIdx++) {
      /* 058 */         int scan_rowIdx = scan_batchIdx + scan_localIdx;
      /* 059 */         // do aggregate
      /* 060 */         // common sub-expressions
      /* 061 */
      /* 062 */         // evaluate aggregate function
      /* 063 */         boolean agg_isNull1 = false;
      /* 064 */
      /* 065 */         long agg_value1 = -1L;
      /* 066 */         agg_value1 = agg_bufValue + 1L;
      /* 067 */         // update aggregation buffer
      /* 068 */         agg_bufIsNull = false;
      /* 069 */         agg_bufValue = agg_value1;
      /* 070 */         // shouldStop check is eliminated
      /* 071 */       }
      /* 072 */       scan_batchIdx = numRows;
      /* 073 */       scan_batch = null;
      /* 074 */       scan_nextBatch();
      /* 075 */     }
      /* 079 */   }
      /* 080 */
      /* 081 */   private void scan_nextBatch() throws java.io.IOException {
      /* 082 */     long getBatchStart = System.nanoTime();
      /* 083 */     if (scan_input.hasNext()) {
      /* 084 */       scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
      /* 085 */       scan_numOutputRows.add(scan_batch.numRows());
      /* 086 */       scan_batchIdx = 0;
      /* 087 */
      /* 088 */     }
      /* 089 */     scan_scanTime1 += System.nanoTime() - getBatchStart;
      /* 090 */   }
      /* 091 */
      /* 092 */   protected void processNext() throws java.io.IOException {
      /* 093 */     while (!agg_initAgg) {
      /* 094 */       agg_initAgg = true;
      /* 095 */       long agg_beforeAgg = System.nanoTime();
      /* 096 */       agg_doAggregateWithoutKey();
      /* 097 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
      /* 098 */
      /* 099 */       // output the result
      /* 100 */
      /* 101 */       agg_numOutputRows.add(1);
      /* 102 */       agg_rowWriter.zeroOutNullBytes();
      /* 103 */
      /* 104 */       if (agg_bufIsNull) {
      /* 105 */         agg_rowWriter.setNullAt(0);
      /* 106 */       } else {
      /* 107 */         agg_rowWriter.write(0, agg_bufValue);
      /* 108 */       }
      /* 109 */       append(agg_result);
      /* 110 */     }
      /* 111 */   }
      /* 112 */ }
      ```
      
      ## How was this patch tested?
      
      Tested existing test suites
      
      Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
      
      Closes #17378 from kiszk/SPARK-20046.
      93bb0b91
    • hyukjinkwon's avatar
      [SPARK-20092][R][PROJECT INFRA] Add the detection for Scala codes dedicated for R in AppVeyor tests · 2422c86f
      hyukjinkwon authored
      ## What changes were proposed in this pull request?
      
      We are currently detecting the changes in `R/` directory only and then trigger AppVeyor tests.
      
      It seems we need to tests when there are Scala codes dedicated for R in `core/src/main/scala/org/apache/spark/api/r/`, `sql/core/src/main/scala/org/apache/spark/sql/api/r/` and `mllib/src/main/scala/org/apache/spark/ml/r/` too.
      
      This will enables the tests, for example, for SPARK-20088.
      
      ## How was this patch tested?
      
      Tests with manually created PRs.
      
      - Changes in `sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala` https://github.com/spark-test/spark/pull/13
      - Changes in `core/src/main/scala/org/apache/spark/api/r/SerDe.scala` https://github.com/spark-test/spark/pull/12
      - Changes in `README.md` https://github.com/spark-test/spark/pull/14
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #17427 from HyukjinKwon/SPARK-20092.
      2422c86f
Loading