diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index a1dd1af4bf2664690033a4a8d1071734c680e2c2..3fc756b9ef40c2495e2eff1741b5267cbc9c661e 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -169,6 +169,7 @@ exportMethods("arrange", "transform", "union", "unionAll", + "unionByName", "unique", "unpersist", "where", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 80526cdd4fd45e5d38b3e6c346cad74bbf8160eb..1b46c1e800c9626b3ebb13fa60e028fa92239351 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2683,7 +2683,7 @@ generateAliasesForIntersectedCols <- function (x, intersectedColNames, suffix) { #' @rdname union #' @name union #' @aliases union,SparkDataFrame,SparkDataFrame-method -#' @seealso \link{rbind} +#' @seealso \link{rbind} \link{unionByName} #' @export #' @examples #'\dontrun{ @@ -2714,6 +2714,40 @@ setMethod("unionAll", union(x, y) }) +#' Return a new SparkDataFrame containing the union of rows, matched by column names +#' +#' Return a new SparkDataFrame containing the union of rows in this SparkDataFrame +#' and another SparkDataFrame. This is different from \code{union} function, and both +#' \code{UNION ALL} and \code{UNION DISTINCT} in SQL as column positions are not taken +#' into account. Input SparkDataFrames can have different data types in the schema. +#' +#' Note: This does not remove duplicate rows across the two SparkDataFrames. +#' This function resolves columns by name (not by position). +#' +#' @param x A SparkDataFrame +#' @param y A SparkDataFrame +#' @return A SparkDataFrame containing the result of the union. +#' @family SparkDataFrame functions +#' @rdname unionByName +#' @name unionByName +#' @aliases unionByName,SparkDataFrame,SparkDataFrame-method +#' @seealso \link{rbind} \link{union} +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df1 <- select(createDataFrame(mtcars), "carb", "am", "gear") +#' df2 <- select(createDataFrame(mtcars), "am", "gear", "carb") +#' head(unionByName(df1, df2)) +#' } +#' @note unionByName since 2.3.0 +setMethod("unionByName", + signature(x = "SparkDataFrame", y = "SparkDataFrame"), + function(x, y) { + unioned <- callJMethod(x@sdf, "unionByName", y@sdf) + dataFrame(unioned) + }) + #' Union two or more SparkDataFrames #' #' Union two or more SparkDataFrames by row. As in R's \code{rbind}, this method @@ -2730,7 +2764,7 @@ setMethod("unionAll", #' @aliases rbind,SparkDataFrame-method #' @rdname rbind #' @name rbind -#' @seealso \link{union} +#' @seealso \link{union} \link{unionByName} #' @export #' @examples #'\dontrun{ diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index f0cc2dc3f195a78d20bc67b2661fec6fe80fb6e1..603ff4e4a2e3b81699d27f264bbfa8b289e75105 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -769,6 +769,10 @@ setGeneric("union", function(x, y) { standardGeneric("union") }) #' @export setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") }) +#' @rdname unionByName +#' @export +setGeneric("unionByName", function(x, y) { standardGeneric("unionByName") }) + #' @rdname unpersist #' @export setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") }) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index d477fc6a4256cd41dd371aec803297ef10ba235f..7abc8720473c1ec8c2b97cd0ac3f978dd2371ccb 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -2255,7 +2255,7 @@ test_that("isLocal()", { expect_false(isLocal(df)) }) -test_that("union(), rbind(), except(), and intersect() on a DataFrame", { +test_that("union(), unionByName(), rbind(), except(), and intersect() on a DataFrame", { df <- read.json(jsonPath) lines <- c("{\"name\":\"Bob\", \"age\":24}", @@ -2271,6 +2271,13 @@ test_that("union(), rbind(), except(), and intersect() on a DataFrame", { expect_equal(first(unioned)$name, "Michael") expect_equal(count(arrange(suppressWarnings(unionAll(df, df2)), df$age)), 6) + df1 <- select(df2, "age", "name") + unioned1 <- arrange(unionByName(df1, df), df1$age) + expect_is(unioned, "SparkDataFrame") + expect_equal(count(unioned), 6) + # Here, we test if 'Michael' in df is correctly mapped to the same name. + expect_equal(first(unioned)$name, "Michael") + unioned2 <- arrange(rbind(unioned, df, df2), df$age) expect_is(unioned2, "SparkDataFrame") expect_equal(count(unioned2), 12) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index c19e599814e540dfa329df7e0c3d5dd19df639a4..1cea130d918ad618fabfdb54a4f7ca3ea9af49e3 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1290,7 +1290,7 @@ class DataFrame(object): """ Return a new :class:`DataFrame` containing union of rows in this and another frame. This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union - (that does deduplication of elements), use this function followed by a distinct. + (that does deduplication of elements), use this function followed by :func:`distinct`. Also as standard in SQL, this function resolves columns by position (not by name). """ @@ -1301,14 +1301,36 @@ class DataFrame(object): """ Return a new :class:`DataFrame` containing union of rows in this and another frame. This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union - (that does deduplication of elements), use this function followed by a distinct. + (that does deduplication of elements), use this function followed by :func:`distinct`. Also as standard in SQL, this function resolves columns by position (not by name). - .. note:: Deprecated in 2.0, use union instead. + .. note:: Deprecated in 2.0, use :func:`union` instead. """ return self.union(other) + @since(2.3) + def unionByName(self, other): + """ Returns a new :class:`DataFrame` containing union of rows in this and another frame. + + This is different from both `UNION ALL` and `UNION DISTINCT` in SQL. To do a SQL-style set + union (that does deduplication of elements), use this function followed by :func:`distinct`. + + The difference between this function and :func:`union` is that this function + resolves columns by name (not by position): + + >>> df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"]) + >>> df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"]) + >>> df1.unionByName(df2).show() + +----+----+----+ + |col0|col1|col2| + +----+----+----+ + | 1| 2| 3| + | 6| 4| 5| + +----+----+----+ + """ + return DataFrame(self._jdf.unionByName(other._jdf), self.sql_ctx) + @since(1.3) def intersect(self, other): """ Return a new :class:`DataFrame` containing rows only in