- Feb 01, 2017
-
-
Devaraj K authored
## What changes were proposed in this pull request? Copying of the killed status was missing while getting the newTaskInfo object by dropping the unnecessary details to reduce the memory usage. This patch adds the copying of the killed status to newTaskInfo object, this will correct the display of the status from wrong status to KILLED status in Web UI. ## How was this patch tested? Current behaviour of displaying tasks in stage UI page, | Index | ID | Attempt | Status | Locality Level | Executor ID / Host | Launch Time | Duration | GC Time | Input Size / Records | Write Time | Shuffle Write Size / Records | Errors | | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | |143 |10 |0 |SUCCESS |NODE_LOCAL |6 / x.xx.x.x stdout stderr|2017/01/25 07:49:27 |0 ms | |0.0 B / 0 | |0.0 B / 0 |TaskKilled (killed intentionally)| |156 |11 |0 |SUCCESS |NODE_LOCAL |5 / x.xx.x.x stdout stderr|2017/01/25 07:49:27 |0 ms | |0.0 B / 0 | |0.0 B / 0 |TaskKilled (killed intentionally)| Web UI display after applying the patch, | Index | ID | Attempt | Status | Locality Level | Executor ID / Host | Launch Time | Duration | GC Time | Input Size / Records | Write Time | Shuffle Write Size / Records | Errors | | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | |143 |10 |0 |KILLED |NODE_LOCAL |6 / x.xx.x.x stdout stderr|2017/01/25 07:49:27 |0 ms | |0.0 B / 0 | | 0.0 B / 0 | TaskKilled (killed intentionally)| |156 |11 |0 |KILLED |NODE_LOCAL |5 / x.xx.x.x stdout stderr|2017/01/25 07:49:27 |0 ms | |0.0 B / 0 | |0.0 B / 0 | TaskKilled (killed intentionally)| Author: Devaraj K <devaraj@apache.org> Closes #16725 from devaraj-kavali/SPARK-19377.
-
hyukjinkwon authored
## What changes were proposed in this pull request? This PR deduplicates arguments, `url` and `table` in `JdbcUtils` with `JDBCOptions`. It avoids to use duplicated arguments, for example, as below: from ```scala val jdbcOptions = new JDBCOptions(url, table, map) JdbcUtils.saveTable(ds, url, table, jdbcOptions) ``` to ```scala val jdbcOptions = new JDBCOptions(url, table, map) JdbcUtils.saveTable(ds, jdbcOptions) ``` ## How was this patch tested? Running unit test in `JdbcSuite`/`JDBCWriteSuite` Building with Scala 2.10 as below: ``` ./dev/change-scala-version.sh 2.10 ./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package ``` Author: hyukjinkwon <gurwls223@gmail.com> Closes #16753 from HyukjinKwon/SPARK-19296.
-
Zheng RuiFeng authored
## What changes were proposed in this pull request? Fix brokens links in ml-pipeline and ml-tuning `<div data-lang="scala">` -> `<div data-lang="scala" markdown="1">` ## How was this patch tested? manual tests Author: Zheng RuiFeng <ruifengz@foxmail.com> Closes #16754 from zhengruifeng/doc_api_fix.
-
hyukjinkwon authored
[SPARK-19402][DOCS] Support LaTex inline formula correctly and fix warnings in Scala/Java APIs generation ## What changes were proposed in this pull request? This PR proposes three things as below: - Support LaTex inline-formula, `\( ... \)` in Scala API documentation It seems currently, ``` \( ... \) ``` are rendered as they are, for example, <img width="345" alt="2017-01-30 10 01 13" src="https://cloud.githubusercontent.com/assets/6477701/22423960/ab37d54a-e737-11e6-9196-4f6229c0189c.png"> It seems mistakenly more backslashes were added. - Fix warnings Scaladoc/Javadoc generation This PR fixes t two types of warnings as below: ``` [warn] .../spark/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala:335: Could not find any member to link for "UnsupportedOperationException". [warn] /** [warn] ^ ``` ``` [warn] .../spark/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala:24: Variable var undefined in comment for class VariableSubstitution in class VariableSubstitution [warn] * `${var}`, `${system:var}` and `${env:var}`. [warn] ^ ``` - Fix Javadoc8 break ``` [error] .../spark/mllib/target/java/org/apache/spark/ml/PredictionModel.java:7: error: reference not found [error] * E.g., {link VectorUDT} for vector features. [error] ^ [error] .../spark/mllib/target/java/org/apache/spark/ml/PredictorParams.java:12: error: reference not found [error] * E.g., {link VectorUDT} for vector features. [error] ^ [error] .../spark/mllib/target/java/org/apache/spark/ml/Predictor.java:10: error: reference not found [error] * E.g., {link VectorUDT} for vector features. [error] ^ [error] .../spark/sql/hive/target/java/org/apache/spark/sql/hive/HiveAnalysis.java:5: error: reference not found [error] * Note that, this rule must be run after {link PreprocessTableInsertion}. [error] ^ ``` ## How was this patch tested? Manually via `sbt unidoc` and `jeykil build`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #16741 from HyukjinKwon/warn-and-break.
-
- Jan 31, 2017
-
-
wm624@hotmail.com authored
## What changes were proposed in this pull request When Kmeans using initMode = "random" and some random seed, it is possible the actual cluster size doesn't equal to the configured `k`. In this case, summary(model) returns error due to the number of cols of coefficient matrix doesn't equal to k. Example: > col1 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0) > col2 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0) > col3 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0) > cols <- as.data.frame(cbind(col1, col2, col3)) > df <- createDataFrame(cols) > > model2 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10, initMode = "random", seed = 22222, tol = 1E-5) > > summary(model2) Error in `colnames<-`(`*tmp*`, value = c("col1", "col2", "col3")) : length of 'dimnames' [2] not equal to array extent In addition: Warning message: In matrix(coefficients, ncol = k) : data length [9] is not a sub-multiple or multiple of the number of rows [2] Fix: Get the actual cluster size in the summary and use it to build the coefficient matrix. ## How was this patch tested? Add unit tests. Author: wm624@hotmail.com <wm624@hotmail.com> Closes #16666 from wangmiao1981/kmeans.
-
zero323 authored
## What changes were proposed in this pull request? Defer `UserDefinedFunction._judf` initialization to the first call. This prevents unintended `SparkSession` initialization. This allows users to define and import UDF without creating a context / session as a side effect. [SPARK-19163](https://issues.apache.org/jira/browse/SPARK-19163) ## How was this patch tested? Unit tests. Author: zero323 <zero323@users.noreply.github.com> Closes #16536 from zero323/SPARK-19163.
-
Burak Yavuz authored
[SPARK-19378][SS] Ensure continuity of stateOperator and eventTime metrics even if there is no new data in trigger ## What changes were proposed in this pull request? In StructuredStreaming, if a new trigger was skipped because no new data arrived, we suddenly report nothing for the metrics `stateOperator`. We could however easily report the metrics from `lastExecution` to ensure continuity of metrics. ## How was this patch tested? Regression test in `StreamingQueryStatusAndProgressSuite` Author: Burak Yavuz <brkyvz@gmail.com> Closes #16716 from brkyvz/state-agg.
-
Bryan Cutler authored
[SPARK-17161][PYSPARK][ML] Add PySpark-ML JavaWrapper convenience function to create Py4J JavaArrays ## What changes were proposed in this pull request? Adding convenience function to Python `JavaWrapper` so that it is easy to create a Py4J JavaArray that is compatible with current class constructors that have a Scala `Array` as input so that it is not necessary to have a Java/Python friendly constructor. The function takes a Java class as input that is used by Py4J to create the Java array of the given class. As an example, `OneVsRest` has been updated to use this and the alternate constructor is removed. ## How was this patch tested? Added unit tests for the new convenience function and updated `OneVsRest` doctests which use this to persist the model. Author: Bryan Cutler <cutlerb@gmail.com> Closes #14725 from BryanCutler/pyspark-new_java_array-CountVectorizer-SPARK-17161.
-
actuaryzhang authored
## What changes were proposed in this pull request? The `coefficients` component in model summary should be 'matrix' but the underlying structure is indeed list. This affects several models except for 'AFTSurvivalRegressionModel' which has the correct implementation. The fix is to first `unlist` the coefficients returned from the `callJMethod` before converting to matrix. An example illustrates the issues: ``` data(iris) df <- createDataFrame(iris) model <- spark.glm(df, Sepal_Length ~ Sepal_Width, family = "gaussian") s <- summary(model) > str(s$coefficients) List of 8 $ : num 6.53 $ : num -0.223 $ : num 0.479 $ : num 0.155 $ : num 13.6 $ : num -1.44 $ : num 0 $ : num 0.152 - attr(*, "dim")= int [1:2] 2 4 - attr(*, "dimnames")=List of 2 ..$ : chr [1:2] "(Intercept)" "Sepal_Width" ..$ : chr [1:4] "Estimate" "Std. Error" "t value" "Pr(>|t|)" > s$coefficients[, 2] $`(Intercept)` [1] 0.4788963 $Sepal_Width [1] 0.1550809 ``` This shows that the underlying structure of coefficients is still `list`. felixcheung wangmiao1981 Author: actuaryzhang <actuaryzhang10@gmail.com> Closes #16730 from actuaryzhang/sparkRCoef.
-
Dongjoon Hyun authored
## What changes were proposed in this pull request? According to the discussion on #16281 which tried to upgrade toward Apache Parquet 1.9.0, Apache Spark community prefer to upgrade to 1.8.2 instead of 1.9.0. Now, Apache Parquet 1.8.2 is released officially last week on 26 Jan. We can use 1.8.2 now. https://lists.apache.org/thread.html/af0c813f1419899289a336d96ec02b3bbeecaea23aa6ef69f435c142%3Cdev.parquet.apache.org%3E This PR only aims to bump Parquet version to 1.8.2. It didn't touch any other codes. ## How was this patch tested? Pass the existing tests and also manually by doing `./dev/test-dependencies.sh`. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #16751 from dongjoon-hyun/SPARK-19409.
-
- Jan 30, 2017
-
-
Felix Cheung authored
## What changes were proposed in this pull request? With extract `[[` or replace `[[<-`, the parameter `i` is a column index, that needs to be corrected in doc. Also a few minor updates: examples, links. ## How was this patch tested? manual Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #16721 from felixcheung/rsubsetdoc.
-
gatorsmile authored
### What changes were proposed in this pull request? Currently, the function `to_json` allows users to provide options for generating JSON. However, it does not pass it to `JacksonGenerator`. Thus, it ignores the user-provided options. This PR is to fix it. Below is an example. ```Scala val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a") val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm") df.select(to_json($"a", options)).show(false) ``` The current output is like ``` +--------------------------------------+ |structtojson(a) | +--------------------------------------+ |{"_1":"2015-08-26T18:00:00.000-07:00"}| +--------------------------------------+ ``` After the fix, the output is like ``` +-------------------------+ |structtojson(a) | +-------------------------+ |{"_1":"26/08/2015 18:00"}| +-------------------------+ ``` ### How was this patch tested? Added test cases for both `from_json` and `to_json` Author: gatorsmile <gatorsmile@gmail.com> Closes #16745 from gatorsmile/toJson.
-
gatorsmile authored
### What changes were proposed in this pull request? The case are not sensitive in JDBC options, after the PR https://github.com/apache/spark/pull/15884 is merged to Spark 2.1. ### How was this patch tested? N/A Author: gatorsmile <gatorsmile@gmail.com> Closes #16734 from gatorsmile/fixDocCaseInsensitive.
-
zero323 authored
## What changes were proposed in this pull request? This removes from the `__all__` list class names that are not defined (visible) in the `pyspark.sql.column`. ## How was this patch tested? Existing unit tests. Author: zero323 <zero323@users.noreply.github.com> Closes #16742 from zero323/SPARK-19403.
-
- Jan 29, 2017
-
-
Liwei Lin authored
[SPARK-19385][SQL] During canonicalization, `NOT(...(l, r))` should not expect such cases that l.hashcode > r.hashcode ## What changes were proposed in this pull request? During canonicalization, `NOT(...(l, r))` should not expect such cases that `l.hashcode > r.hashcode`. Take the rule `case NOT(GreaterThan(l, r)) if l.hashcode > r.hashcode` for example, it should never be matched since `GreaterThan(l, r)` itself would be re-written as `GreaterThan(r, l)` given `l.hashcode > r.hashcode` after canonicalization. This patch consolidates rules like `case NOT(GreaterThan(l, r)) if l.hashcode > r.hashcode` and `case NOT(GreaterThan(l, r))`. ## How was this patch tested? This patch expanded the `NOT` test case to cover both cases where: - `l.hashcode > r.hashcode` - `l.hashcode < r.hashcode` Author: Liwei Lin <lwlin7@gmail.com> Closes #16719 from lw-lin/canonicalize.
-
Dilip Biswal authored
## What changes were proposed in this pull request? This PR adds the first set of tests for EXISTS subquery. File name | Brief description ------------------------| ----------------- exists-basic.sql |Tests EXISTS and NOT EXISTS subqueries with both correlated and local predicates. exists-within-and-or.sql|Tests EXISTS and NOT EXISTS subqueries embedded in AND or OR expression. DB2 results are attached here as reference : [exists-basic-db2.txt](https://github.com/apache/spark/files/733031/exists-basic-db2.txt) [exists-and-or-db2.txt](https://github.com/apache/spark/files/733030/exists-and-or-db2.txt) ## How was this patch tested? This patch is adding tests. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #16710 from dilipbiswal/exist-basic.
-
- Jan 28, 2017
-
-
Wenchen Fan authored
## What changes were proposed in this pull request? After https://github.com/apache/spark/pull/16552 , `CreateHiveTableAsSelectCommand` becomes very similar to `CreateDataSourceTableAsSelectCommand`, and we can further simplify it by only creating table in the table-not-exist branch. This PR also adds hive provider checking in DataStream reader/writer, which is missed in #16552 ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #16693 from cloud-fan/minor.
-
gatorsmile authored
[SPARK-19359][SQL] Revert Clear useless path after rename a partition with upper-case by HiveExternalCatalog ### What changes were proposed in this pull request? This PR is to revert the changes made in https://github.com/apache/spark/pull/16700. It could cause the data loss after partition rename, because we have a bug in the file renaming. Not all the OSs have the same behaviors. For example, on mac OS, if we renaming a path from `.../tbl/a=5/b=6` to `.../tbl/A=5/B=6`. The result is `.../tbl/a=5/B=6`. The expected result is `.../tbl/A=5/B=6`. Thus, renaming on mac OS is not recursive. However, the systems used in Jenkin does not have such an issue. Although this PR is not the root cause, it exposes an existing issue on the code `tablePath.getFileSystem(hadoopConf).rename(wrongPath, rightPath)` --- Hive metastore is not case preserving and keep partition columns with lower case names. If SparkSQL create a table with upper-case partion name use HiveExternalCatalog, when we rename partition, it first call the HiveClient to renamePartition, which will create a new lower case partition path, then SparkSql rename the lower case path to the upper-case. while if the renamed partition contains more than one depth partition ,e.g. A=1/B=2, hive renamePartition change to a=1/b=2, then SparkSql rename it to A=1/B=2, but the a=1 still exists in the filesystem, we should also delete it. ### How was this patch tested? N/A Author: gatorsmile <gatorsmile@gmail.com> Closes #16728 from gatorsmile/revert-pr-16700.
-
Zheng RuiFeng authored
## What changes were proposed in this pull request? unpersist the input dataset if `handlePersistence` = true ## How was this patch tested? existing tests Author: Zheng RuiFeng <ruifengz@foxmail.com> Closes #16718 from zhengruifeng/isoReg_unpersisit.
-
- Jan 27, 2017
-
-
windpiger authored
[SPARK-19359][SQL] clear useless path after rename a partition with upper-case by HiveExternalCatalog ## What changes were proposed in this pull request? Hive metastore is not case preserving and keep partition columns with lower case names. If SparkSQL create a table with upper-case partion name use HiveExternalCatalog, when we rename partition, it first call the HiveClient to renamePartition, which will create a new lower case partition path, then SparkSql rename the lower case path to the upper-case. while if the renamed partition contains more than one depth partition ,e.g. A=1/B=2, hive renamePartition change to a=1/b=2, then SparkSql rename it to A=1/B=2, but the a=1 still exists in the filesystem, we should also delete it. ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #16700 from windpiger/clearUselessPathAfterRenamPartition.
-
wm624@hotmail.com authored
## What changes were proposed in this pull request? Add Python API for the newly added LinearSVC algorithm. ## How was this patch tested? Add new doc string test. Author: wm624@hotmail.com <wm624@hotmail.com> Closes #16694 from wangmiao1981/ser.
-
Shixiong Zhu authored
## What changes were proposed in this pull request? Right now Netty PRC serializes `RequestMessage` using Java serialization, and the size of a single message (e.g., RequestMessage(..., "hello")`) is almost 1KB. This PR optimizes it by serializing `RequestMessage` manually (eliminate unnecessary information from most messages, e.g., class names of `RequestMessage`, `NettyRpcEndpointRef`, ...), and reduces the above message size to 100+ bytes. ## How was this patch tested? Jenkins I did a simple test to measure the improvement: Before ``` $ bin/spark-shell --master local-cluster[1,4,1024] ... scala> for (i <- 1 to 10) { | val start = System.nanoTime | val s = sc.parallelize(1 to 1000000, 10 * 1000).count() | val end = System.nanoTime | println(s"$i\t" + ((end - start)/1000/1000)) | } 1 6830 2 4353 3 3322 4 3107 5 3235 6 3139 7 3156 8 3166 9 3091 10 3029 ``` After: ``` $ bin/spark-shell --master local-cluster[1,4,1024] ... scala> for (i <- 1 to 10) { | val start = System.nanoTime | val s = sc.parallelize(1 to 1000000, 10 * 1000).count() | val end = System.nanoTime | println(s"$i\t" + ((end - start)/1000/1000)) | } 1 6431 2 3643 3 2913 4 2679 5 2760 6 2710 7 2747 8 2793 9 2679 10 2651 ``` I also captured the TCP packets for this test. Before this patch, the total size of TCP packets is ~1.5GB. After it, it reduces to ~1.2GB. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16706 from zsxwing/rpc-opt.
-
Felix Cheung authored
## What changes were proposed in this pull request? This affects mostly running job from the driver in client mode when results are expected to be through stdout (which should be somewhat rare, but possible) Before: ``` > a <- as.DataFrame(cars) > b <- group_by(a, "dist") > c <- count(b) > sparkR.callJMethod(c$countjc, "explain", TRUE) NULL ``` After: ``` > a <- as.DataFrame(cars) > b <- group_by(a, "dist") > c <- count(b) > sparkR.callJMethod(c$countjc, "explain", TRUE) count#11L NULL ``` Now, `column.explain()` doesn't seem very useful (we can get more extensive output with `DataFrame.explain()`) but there are other more complex examples with calls of `println` in Scala/JVM side, that are getting dropped. ## How was this patch tested? manual Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #16670 from felixcheung/rjvmstdout.
-
Felix Cheung authored
## What changes were proposed in this pull request? add header ## How was this patch tested? Manual run to check vignettes html is created properly Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #16709 from felixcheung/rfilelicense.
-
hyukjinkwon authored
## What changes were proposed in this pull request? This PR fixes both, javadoc8 break ``` [error] .../spark/sql/hive/target/java/org/apache/spark/sql/hive/FindHiveSerdeTable.java:3: error: reference not found [error] * Replaces {link SimpleCatalogRelation} with {link MetastoreRelation} if its table provider is hive. ``` and the example in `StructType` as a self-contained example as below: ```scala import org.apache.spark.sql._ import org.apache.spark.sql.types._ val struct = StructType( StructField("a", IntegerType, true) :: StructField("b", LongType, false) :: StructField("c", BooleanType, false) :: Nil) // Extract a single StructField. val singleField = struct("b") // singleField: StructField = StructField(b,LongType,false) // If this struct does not have a field called "d", it throws an exception. struct("d") // java.lang.IllegalArgumentException: Field "d" does not exist. // ... // Extract multiple StructFields. Field names are provided in a set. // A StructType object will be returned. val twoFields = struct(Set("b", "c")) // twoFields: StructType = // StructType(StructField(b,LongType,false), StructField(c,BooleanType,false)) // Any names without matching fields will throw an exception. // For the case shown below, an exception is thrown due to "d". struct(Set("b", "c", "d")) // java.lang.IllegalArgumentException: Field "d" does not exist. // ... ``` ```scala import org.apache.spark.sql._ import org.apache.spark.sql.types._ val innerStruct = StructType( StructField("f1", IntegerType, true) :: StructField("f2", LongType, false) :: StructField("f3", BooleanType, false) :: Nil) val struct = StructType( StructField("a", innerStruct, true) :: Nil) // Create a Row with the schema defined by struct val row = Row(Row(1, 2, true)) ``` Also, now when the column is missing, it throws an exception rather than ignoring. ## How was this patch tested? Manually via `sbt unidoc`. - Scaladoc <img width="665" alt="2017-01-26 12 54 13" src="https://cloud.githubusercontent.com/assets/6477701/22297905/1245620e-e362-11e6-9e22-43bb8d9871af.png"> - Javadoc <img width="722" alt="2017-01-26 12 54 27" src="https://cloud.githubusercontent.com/assets/6477701/22297899/0fd87e0c-e362-11e6-9033-7590bda1aea6.png"> <img width="702" alt="2017-01-26 12 54 32" src="https://cloud.githubusercontent.com/assets/6477701/22297900/0fe14154-e362-11e6-9882-768381c53163.png"> Author: hyukjinkwon <gurwls223@gmail.com> Closes #16703 from HyukjinKwon/SPARK-12970.
-
actuaryzhang authored
## What changes were proposed in this pull request? I propose to add the full Tweedie family into the GeneralizedLinearRegression model. The Tweedie family is characterized by a power variance function. Currently supported distributions such as Gaussian, Poisson and Gamma families are a special case of the Tweedie https://en.wikipedia.org/wiki/Tweedie_distribution. yanboliang srowen sethah Author: actuaryzhang <actuaryzhang10@gmail.com> Author: Wayne Zhang <actuaryzhang10@gmail.com> Closes #16344 from actuaryzhang/tweedie.
-
- Jan 26, 2017
-
-
Felix Cheung authored
## What changes were proposed in this pull request? With doc to say this would convert DF into RDD ## How was this patch tested? unit tests, manual tests Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #16668 from felixcheung/rgetnumpartitions.
-
wm624@hotmail.com authored
## What changes were proposed in this pull request? Add R wrapper for bisecting Kmeans. As JIRA is down, I will update title to link with corresponding JIRA later. ## How was this patch tested? Add new unit tests. Author: wm624@hotmail.com <wm624@hotmail.com> Closes #16566 from wangmiao1981/bk.
-
WeichenXu authored
[SPARK-18218][ML][MLLIB] Reduce shuffled data size of BlockMatrix multiplication and solve potential OOM and low parallelism usage problem By split middle dimension in matrix multiplication ## What changes were proposed in this pull request? ### The problem in current block matrix mulitiplication As in JIRA https://issues.apache.org/jira/browse/SPARK-18218 described, block matrix multiplication in spark may cause some problem, suppose we have `M*N` dimensions matrix A multiply `N*P` dimensions matrix B, when N is much larger than M and P, then the following problem may occur: - when the middle dimension N is too large, it will cause reducer OOM. - even if OOM do not occur, it will still cause parallism too low. - when N is much large than M and P, and matrix A and B have many partitions, it may cause too many partition on M and P dimension, it will cause much larger shuffled data size. (I will expain this in detail in the following.) ### Key point of my improvement In this PR, I introduce `midDimSplitNum` parameter, and improve the algorithm, to resolve this problem. In order to understand the improvement in this PR, first let me give a simple case to explain how the current mulitiplication works and what cause the problems above: suppose we have block matrix A, contains 200 blocks (`2 numRowBlocks * 100 numColBlocks`), blocks arranged in 2 rows, 100 cols: ``` A00 A01 A02 ... A0,99 A10 A11 A12 ... A1,99 ``` and we have block matrix B, also contains 200 blocks (`100 numRowBlocks * 2 numColBlocks`), blocks arranged in 100 rows, 2 cols: ``` B00 B01 B10 B11 B20 B21 ... B99,0 B99,1 ``` Suppose all blocks in the two matrices are dense for now. Now we call A.multiply(B), suppose the generated `resultPartitioner` contains 2 rowPartitions and 2 colPartitions (can't be more partitions because the result matrix only contains `2 * 2` blocks), the current algorithm will contains two shuffle steps: **step-1** Step-1 will generate 4 reducer, I tag them as reducer-00, reducer-01, reducer-10, reducer-11, and shuffle data as following: ``` A00 A01 A02 ... A0,99 B00 B10 B20 ... B99,0 shuffled into reducer-00 A00 A01 A02 ... A0,99 B01 B11 B21 ... B99,1 shuffled into reducer-01 A10 A11 A12 ... A1,99 B00 B10 B20 ... B99,0 shuffled into reducer-10 A10 A11 A12 ... A1,99 B01 B11 B21 ... B99,1 shuffled into reducer-11 ``` and the shuffling above is a `cogroup` transform, note that each reducer contains **only one group**. **step-2** Step-2 will do an `aggregateByKey` transform on the result of step-1, will also generate 4 reducers, and generate the final result RDD, contains 4 partitions, each partition contains one block. The main problems are in step-1. Now we have only 4 reducers, but matrix A and B have 400 blocks in total, obviously the reducer number is too small. and, we can see that, each reducer contains only one group(the group concept in `coGroup` transform), each group contains 200 blocks. This is terrible because we know that `coGroup` transformer will load each group into memory when computing. It is un-extensable in the algorithm level. Suppose matrix A has 10000 cols blocks or more instead of 100? Than each reducer will load 20000 blocks into memory. It will easily cause reducer OOM. This PR try to resolve the problem described above. When matrix A with dimension M * N multiply matrix B with dimension N * P, the middle dimension N is the keypoint. If N is large, the current mulitiplication implementation works badly. In this PR, I introduce a `numMidDimSplits` parameter, represent how many splits it will cut on the middle dimension N. Still using the example described above, now we set `numMidDimSplits = 10`, now we can generate 40 reducers in **step-1**: the reducer-ij above now will be splited into 10 reducers: reducer-ij0, reducer-ij1, ... reducer-ij9, each reducer will receive 20 blocks. now the shuffle works as following: **reducer-000 to reducer-009** ``` A0,0 A0,10 A0,20 ... A0,90 B0,0 B10,0 B20,0 ... B90,0 shuffled into reducer-000 A0,1 A0,11 A0,21 ... A0,91 B1,0 B11,0 B21,0 ... B91,0 shuffled into reducer-001 A0,2 A0,12 A0,22 ... A0,92 B2,0 B12,0 B22,0 ... B92,0 shuffled into reducer-002 ... A0,9 A0,19 A0,29 ... A0,99 B9,0 B19,0 B29,0 ... B99,0 shuffled into reducer-009 ``` **reducer-010 to reducer-019** ``` A0,0 A0,10 A0,20 ... A0,90 B0,1 B10,1 B20,1 ... B90,1 shuffled into reducer-010 A0,1 A0,11 A0,21 ... A0,91 B1,1 B11,1 B21,1 ... B91,1 shuffled into reducer-011 A0,2 A0,12 A0,22 ... A0,92 B2,1 B12,1 B22,1 ... B92,1 shuffled into reducer-012 ... A0,9 A0,19 A0,29 ... A0,99 B9,1 B19,1 B29,1 ... B99,1 shuffled into reducer-019 ``` **reducer-100 to reducer-109** and **reducer-110 to reducer-119** is similar to the above, I omit to write them out. ### API for this optimized algorithm I add a new API as following: ``` def multiply( other: BlockMatrix, numMidDimSplits: Int // middle dimension split number, expained above ): BlockMatrix ``` ### Shuffled data size analysis (compared under the same parallelism) The optimization has some subtle influence on the total shuffled data size. Appropriate `numMidDimSplits` will significantly reduce the shuffled data size, but too large `numMidDimSplits` may increase the shuffled data in reverse. For now I don't want to introduce formula to make thing too complex, I only use a simple case to represent it here: Suppose we have two same size square matrices X and Y, both have `16 numRowBlocks * 16 numColBlocks`. X and Y are both dense matrix. Now let me analysis the shuffling data size in the following case: **case 1: X and Y both partitioned in 16 rowPartitions and 16 colPartitions, numMidDimSplits = 1** ShufflingDataSize = (16 * 16 * (16 + 16) + 16 * 16) blocks = 8448 blocks parallelism = 16 * 16 * 1 = 256 //use step-1 reducers number as the parallism because it cost most of the computation time in this algorithm. **case 2: X and Y both partitioned in 8 rowPartitions and 8 colPartitions, numMidDimSplits = 4** ShufflingDataSize = (8 * 8 * (32 + 32) + 16 * 16 * 4) blocks = 5120 blocks parallelism = 8 * 8 * 4 = 256 //use step-1 reducers number as the parallism because it cost most of the computation time in this algorithm. **The two cases above all have parallism = 256**, case 1 `numMidDimSplits = 1` is equivalent with current implementation in mllib, but case 2 shuffling data is 60.6% of case 1, **it shows that under the same parallelism, proper `numMidDimSplits` will significantly reduce the shuffling data size**. ## How was this patch tested? Test suites added. Running result:  Author: WeichenXu <WeichenXu123@outlook.com> Closes #15730 from WeichenXu123/optim_block_matrix.
-
Takeshi YAMAMURO authored
## What changes were proposed in this pull request? This pr added a variable for a UDF name in `ScalaUDF`. Then, if the variable filled, `DataFrame#explain` prints the name. ## How was this patch tested? Added a test in `UDFSuite`. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #16707 from maropu/SPARK-19338.
-
Takuya UESHIN authored
## What changes were proposed in this pull request? As of Spark 2.1, Spark SQL assumes the machine timezone for datetime manipulation, which is bad if users are not in the same timezones as the machines, or if different users have different timezones. We should introduce a session local timezone setting that is used for execution. An explicit non-goal is locale handling. ### Semantics Setting the session local timezone means that the timezone-aware expressions listed below should use the timezone to evaluate values, and also it should be used to convert (cast) between string and timestamp or between timestamp and date. - `CurrentDate` - `CurrentBatchTimestamp` - `Hour` - `Minute` - `Second` - `DateFormatClass` - `ToUnixTimestamp` - `UnixTimestamp` - `FromUnixTime` and below are implicitly timezone-aware through cast from timestamp to date: - `DayOfYear` - `Year` - `Quarter` - `Month` - `DayOfMonth` - `WeekOfYear` - `LastDay` - `NextDay` - `TruncDate` For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values evaluated by some of timezone-aware expressions are: ```scala scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts") df: org.apache.spark.sql.DataFrame = [ts: timestamp] scala> df.selectExpr("cast(ts as string)", "year(ts)", "month(ts)", "dayofmonth(ts)", "hour(ts)", "minute(ts)", "second(ts)").show(truncate = false) +-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+ |ts |year(CAST(ts AS DATE))|month(CAST(ts AS DATE))|dayofmonth(CAST(ts AS DATE))|hour(ts)|minute(ts)|second(ts)| +-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+ |2016-01-01 00:00:00|2016 |1 |1 |0 |0 |0 | +-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+ ``` whereas setting the session local timezone to `"PST"`, they are: ```scala scala> spark.conf.set("spark.sql.session.timeZone", "PST") scala> df.selectExpr("cast(ts as string)", "year(ts)", "month(ts)", "dayofmonth(ts)", "hour(ts)", "minute(ts)", "second(ts)").show(truncate = false) +-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+ |ts |year(CAST(ts AS DATE))|month(CAST(ts AS DATE))|dayofmonth(CAST(ts AS DATE))|hour(ts)|minute(ts)|second(ts)| +-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+ |2015-12-31 16:00:00|2015 |12 |31 |16 |0 |0 | +-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+ ``` Notice that even if you set the session local timezone, it affects only in `DataFrame` operations, neither in `Dataset` operations, `RDD` operations nor in `ScalaUDF`s. You need to properly handle timezone by yourself. ### Design of the fix I introduced an analyzer to pass session local timezone to timezone-aware expressions and modified DateTimeUtils to take the timezone argument. ## How was this patch tested? Existing tests and added tests for timezone aware expressions. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #16308 from ueshin/issues/SPARK-18350.
-
Takeshi YAMAMURO authored
## What changes were proposed in this pull request? This pr is to update a help message for `--files` in spark-submit because it seems users get confused about how to get full paths of the files that one adds via the option. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #16698 from maropu/SparkFilesDoc.
-
Marcelo Vanzin authored
The redirect handler was installed only for the root of the server; any other context ended up being served directly through the HTTP port. Since every sub page (e.g. application UIs in the history server) is a separate servlet context, this meant that everything but the root was accessible via HTTP still. The change adds separate names to each connector, and binds contexts to specific connectors so that content is only served through the HTTPS connector when it's enabled. In that case, the only thing that binds to the HTTP connector is the redirect handler. Tested with new unit tests and by checking a live history server. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #16582 from vanzin/SPARK-19220.
-
- Jan 25, 2017
-
-
Dilip Biswal authored
## What changes were proposed in this pull request? In CachedTableSuite, we are not setting up the test data at the beginning. Some tests fail while trying to run individually. When running the entire suite they run fine. Here are some of the tests that fail - - test("SELECT star from cached table") - test("Self-join cached") As part of this simplified a couple of tests by calling a support method to count the number of InMemoryRelations. ## How was this patch tested? Ran the failing tests individually. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #16688 from dilipbiswal/cachetablesuite_simple.
-
Takeshi YAMAMURO authored
## What changes were proposed in this pull request? This pr is to fix an issue occurred when resharding Kinesis streams; the resharding makes the KCL throw an exception because Spark does not checkpoint `SHARD_END` when finishing reading closed shards in `KinesisRecordProcessor#shutdown`. This bug finally leads to stopping subscribing new split (or merged) shards. ## How was this patch tested? Added a test in `KinesisStreamSuite` to check if it works well when splitting/merging shards. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #16213 from maropu/SPARK-18020.
-
uncleGen authored
## What changes were proposed in this pull request? A green dot in the DAG visualization apparently means that the referenced RDD is cached. This is not documented anywhere except in this [blog post](https://databricks.com/blog/2015/06/22/understanding-your-spark-application-through-visualization.html). It would be good if the Web UI itself documented this somehow (perhaps in the tooltip?) so that the user can naturally learn what it means while using the Web UI. before pr:   after pr:   ## How was this patch tested? Author: uncleGen <hustyugm@gmail.com> Closes #16702 from uncleGen/SPARK-18495.
-
Tathagata Das authored
## What changes were proposed in this pull request? EdgeRDD/VertexRDD overrides checkpoint() and isCheckpointed() to forward these to the internal partitionRDD. So when checkpoint() is called on them, its the partitionRDD that actually gets checkpointed. However since isCheckpointed() also overridden to call partitionRDD.isCheckpointed, EdgeRDD/VertexRDD.isCheckpointed returns true even though this RDD is actually not checkpointed. This would have been fine except the RDD's internal logic for computing the RDD depends on isCheckpointed(). So for VertexRDD/EdgeRDD, since isCheckpointed is true, when computing Spark tries to read checkpoint data of VertexRDD/EdgeRDD even though they are not actually checkpointed. Through a crazy sequence of call forwarding, it reads checkpoint data of partitionsRDD and tries to cast it to types in Vertex/EdgeRDD. This leads to ClassCastException. The minimal fix that does not change any public behavior is to modify RDD internal to not use public override-able API for internal logic. ## How was this patch tested? New unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15396 from tdas/SPARK-14804.
-
Holden Karau authored
## What changes were proposed in this pull request? Fix instalation of mllib and ml sub components, and more eagerly cleanup cache files during test script & make-distribution. ## How was this patch tested? Updated sanity test script to import mllib and ml sub-components. Author: Holden Karau <holden@us.ibm.com> Closes #16465 from holdenk/SPARK-19064-fix-pip-install-sub-components.
-
Marcelo Vanzin authored
The code was failing to propagate the user conf in the case where the JVM was already initialized, which happens when a user submits a python script via spark-submit. Tested with new unit test and by running a python script in a real cluster. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #16682 from vanzin/SPARK-19307.
-
gmoehler authored
## What changes were proposed in this pull request? acceptType() in UDT will no only accept the same type but also all base types ## How was this patch tested? Manual test using a set of generated UDTs fixing acceptType() in my user defined types Please review http://spark.apache.org/contributing.html before opening a pull request. Author: gmoehler <moehler@de.ibm.com> Closes #16660 from gmoehler/master.
-