Skip to content
Snippets Groups Projects
  1. Nov 02, 2015
    • Xiangrui Meng's avatar
      [SPARK-11358][MLLIB] deprecate runs in k-means · 33ae7a35
      Xiangrui Meng authored
      This PR deprecates `runs` in k-means. `runs` introduces extra complexity and overhead in MLlib's k-means implementation. I haven't seen much usage with `runs` not equal to `1`. We don't have a unit test for it either. We can deprecate this method in 1.6, and void it in 1.7. It helps us simplify the implementation.
      
      cc: srowen
      
      Author: Xiangrui Meng <meng@databricks.com>
      
      Closes #9322 from mengxr/SPARK-11358.
      33ae7a35
    • Sean Owen's avatar
      [SPARK-11456][TESTS] Remove deprecated junit.framework in Java tests · b3aedca6
      Sean Owen authored
      Replace use of `junit.framework` with `org.junit`, and touch up tests in question
      
      Author: Sean Owen <sowen@cloudera.com>
      
      Closes #9411 from srowen/SPARK-11456.
      b3aedca6
    • Jason White's avatar
      [SPARK-11437] [PYSPARK] Don't .take when converting RDD to DataFrame with provided schema · f92f334c
      Jason White authored
      When creating a DataFrame from an RDD in PySpark, `createDataFrame` calls `.take(10)` to verify the first 10 rows of the RDD match the provided schema. Similar to https://issues.apache.org/jira/browse/SPARK-8070, but that issue affected cases where a schema was not provided.
      
      Verifying the first 10 rows is of limited utility and causes the DAG to be executed non-lazily. If necessary, I believe this verification should be done lazily on all rows. However, since the caller is providing a schema to follow, I think it's acceptable to simply fail if the schema is incorrect.
      
      marmbrus We chatted about this at SparkSummitEU. davies you made a similar change for the infer-schema path in https://github.com/apache/spark/pull/6606
      
      Author: Jason White <jason.white@shopify.com>
      
      Closes #9392 from JasonMWhite/createDataFrame_without_take.
      f92f334c
    • Marcelo Vanzin's avatar
      [SPARK-10997][CORE] Add "client mode" to netty rpc env. · 71d1c907
      Marcelo Vanzin authored
      "Client mode" means the RPC env will not listen for incoming connections.
      This allows certain processes in the Spark stack (such as Executors or
      tha YARN client-mode AM) to act as pure clients when using the netty-based
      RPC backend, reducing the number of sockets needed by the app and also the
      number of open ports.
      
      Client connections are also preferred when endpoints that actually have
      a listening socket are involved; so, for example, if a Worker connects
      to a Master and the Master needs to send a message to a Worker endpoint,
      that client connection will be used, even though the Worker is also
      listening for incoming connections.
      
      With this change, the workaround for SPARK-10987 isn't necessary anymore, and
      is removed. The AM connects to the driver in "client mode", and that connection
      is used for all driver <-> AM communication, and so the AM is properly notified
      when the connection goes down.
      
      Author: Marcelo Vanzin <vanzin@cloudera.com>
      
      Closes #9210 from vanzin/SPARK-10997.
      71d1c907
    • jerryshao's avatar
      [SPARK-9817][YARN] Improve the locality calculation of containers by taking... · a930e624
      jerryshao authored
      [SPARK-9817][YARN] Improve the locality calculation of containers by taking pending container requests into consideraion
      
      This is a follow-up PR to further improve the locality calculation by considering the pending container's request. Since the locality preferences of tasks may be shifted from time to time, current localities of pending container requests may not fully match the new preferences, this PR improve it by removing outdated, unmatched container requests and replace with new requests.
      
      sryza please help to review, thanks a lot.
      
      Author: jerryshao <sshao@hortonworks.com>
      
      Closes #8100 from jerryshao/SPARK-9817.
      a930e624
    • Daoyuan Wang's avatar
      [SPARK-11311][SQL] spark cannot describe temporary functions · 74ba9522
      Daoyuan Wang authored
      When describe temporary function, spark would return 'Unable to find function', this is not right.
      
      Author: Daoyuan Wang <daoyuan.wang@intel.com>
      
      Closes #9277 from adrian-wang/functionreg.
      74ba9522
    • huangzhaowei's avatar
      [SPARK-10786][SQL] Take the whole statement to generate the CommandProcessor · 767522dc
      huangzhaowei authored
      In the now implementation of `SparkSQLCLIDriver.scala`:
      `val proc: CommandProcessor = CommandProcessorFactory.get(Array(tokens(0)), hconf)`
      `CommandProcessorFactory` only take the first token of the statement, and this will be hard to diff the statement `delete jar xxx` and `delete from xxx`.
      So maybe it's better to take the whole statement into the `CommandProcessorFactory`.
      
      And in [HiveCommand](https://github.com/SaintBacchus/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java#L76), it already special handing these two statement.
      ```java
      if(command.length > 1 && "from".equalsIgnoreCase(command[1])) {
        //special handling for SQL "delete from <table> where..."
        return null;
      }
      ```
      
      Author: huangzhaowei <carlmartinmax@gmail.com>
      
      Closes #8895 from SaintBacchus/SPARK-10786.
      767522dc
    • Yongjia Wang's avatar
      [SPARK-11413][BUILD] Bump joda-time version to 2.9 for java 8 and s3 · ea4a3e7d
      Yongjia Wang authored
      It's a known issue that joda-time before 2.8.1 is incompatible with java 1.8u60 or later, which causes s3 request to fail. This affects Spark when using s3 as data source.
      https://github.com/aws/aws-sdk-java/issues/444
      
      Author: Yongjia Wang <yongjiaw@gmail.com>
      
      Closes #9379 from yongjiaw/SPARK-11413.
      ea4a3e7d
    • Liang-Chi Hsieh's avatar
      [SPARK-11271][SPARK-11016][CORE] Use Spark BitSet instead of RoaringBitmap to reduce memory usage · e209fa27
      Liang-Chi Hsieh authored
      JIRA: https://issues.apache.org/jira/browse/SPARK-11271
      
      As reported in the JIRA ticket, when there are too many tasks, the memory usage of MapStatus will cause problem. Use BitSet instead of RoaringBitMap should be more efficient in memory usage.
      
      Author: Liang-Chi Hsieh <viirya@appier.com>
      
      Closes #9243 from viirya/mapstatus-bitset.
      e209fa27
    • Yu ISHIKAWA's avatar
      [SPARK-9722] [ML] Pass random seed to spark.ml DecisionTree* · e963070c
      Yu ISHIKAWA authored
      Author: Yu ISHIKAWA <yuu.ishikawa@gmail.com>
      
      Closes #9402 from yu-iskw/SPARK-9722.
      e963070c
  2. Nov 01, 2015
    • Liang-Chi Hsieh's avatar
      [SPARK-9298][SQL] Add pearson correlation aggregation function · 3e770a64
      Liang-Chi Hsieh authored
      JIRA: https://issues.apache.org/jira/browse/SPARK-9298
      
      This patch adds pearson correlation aggregation function based on `AggregateExpression2`.
      
      Author: Liang-Chi Hsieh <viirya@appier.com>
      
      Closes #8587 from viirya/corr_aggregation.
      3e770a64
    • Marcelo Vanzin's avatar
      [SPARK-11073][CORE][YARN] Remove akka dependency in secret key generation. · f8d93ede
      Marcelo Vanzin authored
      Use standard JDK APIs for that (with a little help from Guava). Most of the
      changes here are in test code, since there were no tests specific to that
      part of the code.
      
      Author: Marcelo Vanzin <vanzin@cloudera.com>
      
      Closes #9257 from vanzin/SPARK-11073.
      f8d93ede
    • Marcelo Vanzin's avatar
      [SPARK-11020][CORE] Wait for HDFS to leave safe mode before initializing HS. · cf04fdfe
      Marcelo Vanzin authored
      Large HDFS clusters may take a while to leave safe mode when starting; this change
      makes the HS wait for that before doing checks about its configuraton. This means
      the HS won't stop right away if HDFS is in safe mode and the configuration is not
      correct, but that should be a very uncommon situation.
      
      Author: Marcelo Vanzin <vanzin@cloudera.com>
      
      Closes #9043 from vanzin/SPARK-11020.
      cf04fdfe
    • Nong Li's avatar
      [SPARK-11410][SQL] Add APIs to provide functionality similar to Hive's DISTRIBUTE BY and SORT BY. · 046e32ed
      Nong Li authored
      DISTRIBUTE BY allows the user to hash partition the data by specified exprs. It also allows for
      optioning sorting within each resulting partition. There is no required relationship between the
      exprs for partitioning and sorting (i.e. one does not need to be a prefix of the other).
      
      This patch adds to APIs to DataFrames which can be used together to provide this functionality:
        1. distributeBy() which partitions the data frame into a specified number of partitions using the
           partitioning exprs.
        2. localSort() which sorts each partition using the provided sorting exprs.
      
      To get the DISTRIBUTE BY functionality, the user simply does: df.distributeBy(...).localSort(...)
      
      Author: Nong Li <nongli@gmail.com>
      
      Closes #9364 from nongli/spark-11410.
      046e32ed
    • Christian Kadner's avatar
      [SPARK-11338] [WEBUI] Prepend app links on HistoryPage with uiRoot path · dc7e399f
      Christian Kadner authored
      [SPARK-11338: HistoryPage not multi-tenancy enabled ...](https://issues.apache.org/jira/browse/SPARK-11338)
      - `HistoryPage.scala` ...prepending all page links with the web proxy (`uiRoot`) path
      - `HistoryServerSuite.scala` ...adding a test case to verify all site-relative links are prefixed when the environment variable `APPLICATION_WEB_PROXY_BASE` (or System property `spark.ui.proxyBase`) is set
      
      Author: Christian Kadner <ckadner@us.ibm.com>
      
      Closes #9291 from ckadner/SPARK-11338 and squashes the following commits:
      
      01d2f35 [Christian Kadner] [SPARK-11338][WebUI] nit fixes
      d054bd7 [Christian Kadner] [SPARK-11338][WebUI] prependBaseUri in method makePageLink
      8bcb3dc [Christian Kadner] [SPARK-11338][WebUI] Prepend application links on HistoryPage with uiRoot path
      dc7e399f
    • Sean Owen's avatar
      [SPARK-11305][DOCS] Remove Third-Party Hadoop Distributions Doc Page · 643c49c7
      Sean Owen authored
      Remove Hadoop third party distro page, and move Hadoop cluster config info to configuration page
      
      CC pwendell
      
      Author: Sean Owen <sowen@cloudera.com>
      
      Closes #9298 from srowen/SPARK-11305.
      643c49c7
  3. Oct 31, 2015
    • Cheng Lian's avatar
      [SPARK-11117] [SPARK-11345] [SQL] Makes all HadoopFsRelation data sources produce UnsafeRow · aa494a9c
      Cheng Lian authored
      This PR fixes two issues:
      
      1.  `PhysicalRDD.outputsUnsafeRows` is always `false`
      
          Thus a `ConvertToUnsafe` operator is often required even if the underlying data source relation does output `UnsafeRow`.
      
      1.  Internal/external row conversion for `HadoopFsRelation` is kinda messy
      
          Currently we're using `HadoopFsRelation.needConversion` and [dirty type erasure hacks][1] to indicate whether the relation outputs external row or internal row and apply external-to-internal conversion when necessary.  Basically, all builtin `HadoopFsRelation` data sources, i.e. Parquet, JSON, ORC, and Text output `InternalRow`, while typical external `HadoopFsRelation` data sources, e.g. spark-avro and spark-csv, output `Row`.
      
      This PR adds a `private[sql]` interface method `HadoopFsRelation.buildInternalScan`, which by default invokes `HadoopFsRelation.buildScan` and converts `Row`s to `UnsafeRow`s (which are also `InternalRow`s).  All builtin `HadoopFsRelation` data sources override this method and directly output `UnsafeRow`s.  In this way, now `HadoopFsRelation` always produces `UnsafeRow`s. Thus `PhysicalRDD.outputsUnsafeRows` can be properly set by checking whether the underlying data source is a `HadoopFsRelation`.
      
      A remaining question is that, can we assume that all non-builtin `HadoopFsRelation` data sources output external rows?  At least all well known ones do so.  However it's possible that some users implemented their own `HadoopFsRelation` data sources that leverages `InternalRow` and thus all those unstable internal data representations.  If this assumption is safe, we can deprecate `HadoopFsRelation.needConversion` and cleanup some more conversion code (like [here][2] and [here][3]).
      
      This PR supersedes #9125.
      
      Follow-ups:
      
      1.  Makes JSON and ORC data sources output `UnsafeRow` directly
      
      1.  Makes `HiveTableScan` output `UnsafeRow` directly
      
          This is related to 1 since ORC data source shares the same `Writable` unwrapping code with `HiveTableScan`.
      
      [1]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala#L353
      [2]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L331-L335
      [3]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L630-L669
      
      Author: Cheng Lian <lian@databricks.com>
      
      Closes #9305 from liancheng/spark-11345.unsafe-hadoop-fs-relation.
      aa494a9c
    • Steve Loughran's avatar
      [SPARK-11265][YARN] YarnClient can't get tokens to talk to Hive 1.2.1 in a secure cluster · 40d3c679
      Steve Loughran authored
      This is a fix for SPARK-11265; the introspection code to get Hive delegation tokens failing on Spark 1.5.1+, due to changes in the Hive codebase
      
      Author: Steve Loughran <stevel@hortonworks.com>
      
      Closes #9232 from steveloughran/stevel/patches/SPARK-11265-hive-tokens.
      40d3c679
    • Dilip Biswal's avatar
      [SPARK-11024][SQL] Optimize NULL in <inlist-expressions> by folding it to Literal(null) · fc27dfbf
      Dilip Biswal authored
      Add a rule in optimizer to convert NULL [NOT] IN (expr1,...,expr2) to
      Literal(null).
      
      This is a follow up defect to SPARK-8654
      
      cloud-fan Can you please take a look ?
      
      Author: Dilip Biswal <dbiswal@us.ibm.com>
      
      Closes #9348 from dilipbiswal/spark_11024.
      fc27dfbf
    • Josh Rosen's avatar
      [SPARK-11424] Guard against double-close() of RecordReaders · ac4118db
      Josh Rosen authored
      **TL;DR**: We can rule out one rare but potential cause of input stream corruption via defensive programming.
      
      ## Background
      
      [MAPREDUCE-5918](https://issues.apache.org/jira/browse/MAPREDUCE-5918) is a bug where an instance of a decompressor ends up getting placed into a pool multiple times. Since the pool is backed by a list instead of a set, this can lead to the same decompressor being used in different places at the same time, which is not safe because those decompressors will overwrite each other's buffers. Sometimes this buffer sharing will lead to exceptions but other times it will might silently result in invalid / garbled input.
      
      That Hadoop bug is fixed in Hadoop 2.7 but is still present in many Hadoop versions that we wish to support. As a result, I think that we should try to work around this issue in Spark via defensive programming to prevent RecordReaders from being closed multiple times.
      
      So far, I've had a hard time coming up with explanations of exactly how double-`close()`s occur in practice, but I do have a couple of explanations that work on paper.
      
      For instance, it looks like https://github.com/apache/spark/pull/7424, added in 1.5, introduces at least one extremely~rare corner-case path where Spark could double-close() a LineRecordReader instance in a way that triggers the bug. Here are the steps involved in the bad execution that I brainstormed up:
      
      * [The task has finished reading input, so we call close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L168).
      * [While handling the close call and trying to close the reader, reader.close() throws an exception]( https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L190)
      * We don't set `reader = null` after handling this exception, so the [TaskCompletionListener also ends up calling NewHadoopRDD.close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L156), which, in turn, closes the record reader again.
      
      In this hypothetical situation, `LineRecordReader.close()` could [fail with an exception if its InputStream failed to close](https://github.com/apache/hadoop/blob/release-1.2.1/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java#L212).
      I googled for "Exception in RecordReader.close()" and it looks like it's possible for a closed Hadoop FileSystem to trigger an error there: [SPARK-757](https://issues.apache.org/jira/browse/SPARK-757), [SPARK-2491](https://issues.apache.org/jira/browse/SPARK-2491)
      
      Looking at [SPARK-3052](https://issues.apache.org/jira/browse/SPARK-3052), it seems like it's possible to get spurious exceptions there when there is an error reading from Hadoop. If the Hadoop FileSystem were to get into an error state _right_ after reading the last record then it looks like we could hit the bug here in 1.5.
      
      ## The fix
      
      This patch guards against these issues by modifying `HadoopRDD.close()` and `NewHadoopRDD.close()` so that they set `reader = null` even if an exception occurs in the `reader.close()` call. In addition, I modified `NextIterator. closeIfNeeded()` to guard against double-close if the first `close()` call throws an exception.
      
      I don't have an easy way to test this, since I haven't been able to reproduce the bug that prompted this patch, but these changes seem safe and seem to rule out the on-paper reproductions that I was able to brainstorm up.
      
      Author: Josh Rosen <joshrosen@databricks.com>
      
      Closes #9382 from JoshRosen/hadoop-decompressor-pooling-fix and squashes the following commits:
      
      5ec97d7 [Josh Rosen] Add SqlNewHadoopRDD.unsetInputFileName() that I accidentally deleted.
      ae46cf4 [Josh Rosen] Merge remote-tracking branch 'origin/master' into hadoop-decompressor-pooling-fix
      087aa63 [Josh Rosen] Guard against double-close() of RecordReaders.
      ac4118db
    • Jeff Zhang's avatar
      [SPARK-11226][SQL] Empty line in json file should be skipped · 97b3c8fb
      Jeff Zhang authored
      Currently the empty line in json file will be parsed into Row with all null field values. But in json, "{}" represents a json object, empty line is supposed to be skipped.
      
      Make a trivial change for this.
      
      Author: Jeff Zhang <zjffdu@apache.org>
      
      Closes #9211 from zjffdu/SPARK-11226.
      97b3c8fb
  4. Oct 30, 2015
    • Yin Huai's avatar
      [SPARK-11434][SPARK-11103][SQL] Fix test ": Filter applied on merged Parquet... · 3c471885
      Yin Huai authored
      [SPARK-11434][SPARK-11103][SQL] Fix test ": Filter applied on merged Parquet schema with new column fails"
      
      https://issues.apache.org/jira/browse/SPARK-11434
      
      Author: Yin Huai <yhuai@databricks.com>
      
      Closes #9387 from yhuai/SPARK-11434.
      3c471885
    • Nakul Jindal's avatar
      [SPARK-11385] [ML] foreachActive made public in MLLib's vector API · 69b9e4b3
      Nakul Jindal authored
      Made foreachActive public in MLLib's vector API
      
      Author: Nakul Jindal <njindal@us.ibm.com>
      
      Closes #9362 from nakul02/SPARK-11385_foreach_for_mllib_linalg_vector.
      69b9e4b3
    • Yin Huai's avatar
      e8ec2a7b
    • Davies Liu's avatar
      [SPARK-11423] remove MapPartitionsWithPreparationRDD · 45029bfd
      Davies Liu authored
      Since we do not need to preserve a page before calling compute(), MapPartitionsWithPreparationRDD is not needed anymore.
      
      This PR basically revert #8543, #8511, #8038, #8011
      
      Author: Davies Liu <davies@databricks.com>
      
      Closes #9381 from davies/remove_prepare2.
      45029bfd
    • felixcheung's avatar
      [SPARK-11340][SPARKR] Support setting driver properties when starting Spark... · bb5a2af0
      felixcheung authored
      [SPARK-11340][SPARKR] Support setting driver properties when starting Spark from R programmatically or from RStudio
      
      Mapping spark.driver.memory from sparkEnvir to spark-submit commandline arguments.
      
      shivaram suggested that we possibly add other spark.driver.* properties - do we want to add all of those? I thought those could be set in SparkConf?
      sun-rui
      
      Author: felixcheung <felixcheung_m@hotmail.com>
      
      Closes #9290 from felixcheung/rdrivermem.
      bb5a2af0
    • Jeff Zhang's avatar
      [SPARK-11342][TESTS] Allow to set hadoop profile when running dev/ru… · 729f983e
      Jeff Zhang authored
      …n_tests
      
      Author: Jeff Zhang <zjffdu@apache.org>
      
      Closes #9295 from zjffdu/SPARK-11342.
      729f983e
    • Sun Rui's avatar
      [SPARK-11210][SPARKR] Add window functions into SparkR [step 2]. · 40c77fb2
      Sun Rui authored
      Author: Sun Rui <rui.sun@intel.com>
      
      Closes #9196 from sun-rui/SPARK-11210.
      40c77fb2
    • Sun Rui's avatar
      [SPARK-11414][SPARKR] Forgot to update usage of 'spark.sparkr.r.command' in... · fab710a9
      Sun Rui authored
      [SPARK-11414][SPARKR] Forgot to update usage of 'spark.sparkr.r.command' in RRDD in the PR for SPARK-10971.
      
      Author: Sun Rui <rui.sun@intel.com>
      
      Closes #9368 from sun-rui/SPARK-11414.
      fab710a9
    • Iulian Dragos's avatar
      [SPARK-10986][MESOS] Set the context class loader in the Mesos executor backend. · 0451b001
      Iulian Dragos authored
      See [SPARK-10986](https://issues.apache.org/jira/browse/SPARK-10986) for details.
      
      This fixes the `ClassNotFoundException` for Spark classes in the serializer.
      
      I am not sure this is the right way to handle the class loader, but I couldn't find any documentation on how the context class loader is used and who relies on it. It seems at least the serializer uses it to instantiate classes during deserialization.
      
      I am open to suggestions (I tried this fix on a real Mesos cluster and it *does* fix the issue).
      
      tnachen andrewor14
      
      Author: Iulian Dragos <jaguarul@gmail.com>
      
      Closes #9282 from dragos/issue/mesos-classloader.
      0451b001
    • Wenchen Fan's avatar
      [SPARK-11393] [SQL] CoGroupedIterator should respect the fact that... · 14d08b99
      Wenchen Fan authored
      [SPARK-11393] [SQL] CoGroupedIterator should respect the fact that GroupedIterator.hasNext is not idempotent
      
      When we cogroup 2 `GroupedIterator`s in `CoGroupedIterator`, if the right side is smaller, we will consume right data and keep the left data unchanged. Then we call `hasNext` which will call `left.hasNext`. This will make `GroupedIterator` generate an extra group as the previous one has not been comsumed yet.
      
      Author: Wenchen Fan <wenchen@databricks.com>
      
      Closes #9346 from cloud-fan/cogroup and squashes the following commits:
      
      9be67c8 [Wenchen Fan] SPARK-11393
      14d08b99
    • hyukjinkwon's avatar
      [SPARK-11103][SQL] Filter applied on Merged Parquet shema with new column fail · 59db9e9c
      hyukjinkwon authored
      When enabling mergedSchema and predicate filter, this fails since Parquet does not accept filters pushed down when the columns of the filters do not exist in the schema.
      This is related with Parquet issue (https://issues.apache.org/jira/browse/PARQUET-389).
      
      For now, it just simply disables predicate push down when using merged schema in this PR.
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #9327 from HyukjinKwon/SPARK-11103.
      59db9e9c
    • Lewuathe's avatar
      [SPARK-11207] [ML] Add test cases for solver selection of LinearRegres… · 86d65265
      Lewuathe authored
      …sion as followup. This is the follow up work of SPARK-10668.
      
      * Fix miner style issues.
      * Add test case for checking whether solver is selected properly.
      
      Author: Lewuathe <lewuathe@me.com>
      Author: lewuathe <lewuathe@me.com>
      
      Closes #9180 from Lewuathe/SPARK-11207.
      86d65265
    • Davies Liu's avatar
      [SPARK-11417] [SQL] no @Override in codegen · eb59b94c
      Davies Liu authored
      Older version of Janino (>2.7) does not support Override, we should not use that in codegen.
      
      Author: Davies Liu <davies@databricks.com>
      
      Closes #9372 from davies/no_override.
      eb59b94c
    • Davies Liu's avatar
      [SPARK-10342] [SPARK-10309] [SPARK-10474] [SPARK-10929] [SQL] Cooperative memory management · 56419cf1
      Davies Liu authored
      This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed.
      
      Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling).
      
      The PrepareRDD may be not needed anymore, could be removed in follow up PR.
      
      The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration).
      
      ```python
      sqlContext.setConf("spark.sql.shuffle.partitions", "1")
      df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s")
      df2 = df.select(df.id.alias('id2'), df.s.alias('s2'))
      j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2")
      j.explain()
      print j.count()
      ```
      
      For thread-safety, here what I'm got:
      
      1) Without calling spill(), the operators should only be used by single thread, no safety problems.
      
      2) spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems.
      
      3) if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it.
      
      4) During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning.
      
      5) In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter).
      
      Author: Davies Liu <davies@databricks.com>
      
      Closes #9241 from davies/force_spill.
      56419cf1
  5. Oct 29, 2015
Loading