From c76457c8e422ce6fbf72a8fe5db94565783b12d0 Mon Sep 17 00:00:00 2001 From: felixcheung <felixcheung_m@hotmail.com> Date: Thu, 26 May 2016 11:20:20 -0700 Subject: [PATCH] [SPARK-10903][SPARKR] R - Simplify SQLContext method signatures and use a singleton Eliminate the need to pass sqlContext to method since it is a singleton - and we don't want to support multiple contexts in a R session. Changes are done in a back compat way with deprecation warning added. Method signature for S3 methods are added in a concise, clean approach such that in the next release the deprecated signature can be taken out easily/cleanly (just delete a few lines per method). Custom method dispatch is implemented to allow for multiple JVM reference types that are all 'jobj' in R and to avoid having to add 30 new exports. Author: felixcheung <felixcheung_m@hotmail.com> Closes #9192 from felixcheung/rsqlcontext. --- R/pkg/R/DataFrame.R | 20 +- R/pkg/R/SQLContext.R | 298 +++++++++++----- R/pkg/R/jobj.R | 5 + R/pkg/inst/tests/testthat/test_context.R | 2 +- R/pkg/inst/tests/testthat/test_mllib.R | 30 +- R/pkg/inst/tests/testthat/test_sparkSQL.R | 392 +++++++++++----------- 6 files changed, 450 insertions(+), 297 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index f719173607..d54ee54cd8 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2213,13 +2213,7 @@ setMethod("write.df", signature(df = "SparkDataFrame", path = "character"), function(df, path, source = NULL, mode = "error", ...){ if (is.null(source)) { - if (exists(".sparkRSQLsc", envir = .sparkREnv)) { - sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv) - } else if (exists(".sparkRHivesc", envir = .sparkREnv)) { - sqlContext <- get(".sparkRHivesc", envir = .sparkREnv) - } else { - stop("sparkRHive or sparkRSQL context has to be specified") - } + sqlContext <- getSqlContext() source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default", "org.apache.spark.sql.parquet") } @@ -2281,15 +2275,9 @@ setMethod("saveAsTable", signature(df = "SparkDataFrame", tableName = "character"), function(df, tableName, source = NULL, mode="error", ...){ if (is.null(source)) { - if (exists(".sparkRSQLsc", envir = .sparkREnv)) { - sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv) - } else if (exists(".sparkRHivesc", envir = .sparkREnv)) { - sqlContext <- get(".sparkRHivesc", envir = .sparkREnv) - } else { - stop("sparkRHive or sparkRSQL context has to be specified") - } - source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default", - "org.apache.spark.sql.parquet") + sqlContext <- getSqlContext() + source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default", + "org.apache.spark.sql.parquet") } jmode <- convertToJSaveMode(mode) options <- varargsToEnv(...) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 6b7a341bee..584bbbf0e4 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -37,6 +37,45 @@ getInternalType <- function(x) { stop(paste("Unsupported type for SparkDataFrame:", class(x)))) } +#' Temporary function to reroute old S3 Method call to new +#' This function is specifically implemented to remove SQLContext from the parameter list. +#' It determines the target to route the call by checking the parent of this callsite (say 'func'). +#' The target should be called 'func.default'. +#' We need to check the class of x to ensure it is SQLContext/HiveContext before dispatching. +#' @param newFuncSig name of the function the user should call instead in the deprecation message +#' @param x the first parameter of the original call +#' @param ... the rest of parameter to pass along +#' @return whatever the target returns +#' @noRd +dispatchFunc <- function(newFuncSig, x, ...) { + funcName <- as.character(sys.call(sys.parent())[[1]]) + f <- get(paste0(funcName, ".default")) + # Strip sqlContext from list of parameters and then pass the rest along. + contextNames <- c("org.apache.spark.sql.SQLContext", + "org.apache.spark.sql.hive.HiveContext", + "org.apache.spark.sql.hive.test.TestHiveContext") + if (missing(x) && length(list(...)) == 0) { + f() + } else if (class(x) == "jobj" && + any(grepl(paste(contextNames, collapse = "|"), getClassName.jobj(x)))) { + .Deprecated(newFuncSig, old = paste0(funcName, "(sqlContext...)")) + f(...) + } else { + f(x, ...) + } +} + +#' return the SQL Context +getSqlContext <- function() { + if (exists(".sparkRHivesc", envir = .sparkREnv)) { + get(".sparkRHivesc", envir = .sparkREnv) + } else if (exists(".sparkRSQLsc", envir = .sparkREnv)) { + get(".sparkRSQLsc", envir = .sparkREnv) + } else { + stop("SQL context not initialized") + } +} + #' infer the SQL type infer_type <- function(x) { if (is.null(x)) { @@ -74,7 +113,6 @@ infer_type <- function(x) { #' #' Converts R data.frame or list into SparkDataFrame. #' -#' @param sqlContext A SQLContext #' @param data An RDD or list or data.frame #' @param schema a list of column names or named list (StructType), optional #' @return a SparkDataFrame @@ -84,13 +122,16 @@ infer_type <- function(x) { #'\dontrun{ #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) -#' df1 <- as.DataFrame(sqlContext, iris) -#' df2 <- as.DataFrame(sqlContext, list(3,4,5,6)) -#' df3 <- createDataFrame(sqlContext, iris) +#' df1 <- as.DataFrame(iris) +#' df2 <- as.DataFrame(list(3,4,5,6)) +#' df3 <- createDataFrame(iris) #' } +#' @name createDataFrame +#' @method createDataFrame default # TODO(davies): support sampling and infer type from NA -createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0) { +createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) { + sqlContext <- getSqlContext() if (is.data.frame(data)) { # get the names of columns, they will be put into RDD if (is.null(schema)) { @@ -164,11 +205,21 @@ createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0 dataFrame(sdf) } +createDataFrame <- function(x, ...) { + dispatchFunc("createDataFrame(data, schema = NULL, samplingRatio = 1.0)", x, ...) +} + #' @rdname createDataFrame #' @aliases createDataFrame #' @export -as.DataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0) { - createDataFrame(sqlContext, data, schema, samplingRatio) +#' @method as.DataFrame default + +as.DataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) { + createDataFrame(data, schema, samplingRatio) +} + +as.DataFrame <- function(x, ...) { + dispatchFunc("as.DataFrame(data, schema = NULL, samplingRatio = 1.0)", x, ...) } #' toDF @@ -190,14 +241,7 @@ setGeneric("toDF", function(x, ...) { standardGeneric("toDF") }) setMethod("toDF", signature(x = "RDD"), function(x, ...) { - sqlContext <- if (exists(".sparkRHivesc", envir = .sparkREnv)) { - get(".sparkRHivesc", envir = .sparkREnv) - } else if (exists(".sparkRSQLsc", envir = .sparkREnv)) { - get(".sparkRSQLsc", envir = .sparkREnv) - } else { - stop("no SQL context available") - } - createDataFrame(sqlContext, x, ...) + createDataFrame(x, ...) }) #' Create a SparkDataFrame from a JSON file. @@ -205,21 +249,23 @@ setMethod("toDF", signature(x = "RDD"), #' Loads a JSON file (one object per line), returning the result as a SparkDataFrame #' It goes through the entire dataset once to determine the schema. #' -#' @param sqlContext SQLContext to use #' @param path Path of file to read. A vector of multiple paths is allowed. #' @return SparkDataFrame #' @rdname read.json -#' @name read.json #' @export #' @examples #'\dontrun{ #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- read.json(sqlContext, path) -#' df <- jsonFile(sqlContext, path) +#' df <- read.json(path) +#' df <- jsonFile(path) #' } -read.json <- function(sqlContext, path) { +#' @name read.json +#' @method read.json default + +read.json.default <- function(path) { + sqlContext <- getSqlContext() # Allow the user to have a more flexible definiton of the text file path paths <- as.list(suppressWarnings(normalizePath(path))) read <- callJMethod(sqlContext, "read") @@ -227,14 +273,23 @@ read.json <- function(sqlContext, path) { dataFrame(sdf) } +read.json <- function(x, ...) { + dispatchFunc("read.json(path)", x, ...) +} + #' @rdname read.json #' @name jsonFile #' @export -jsonFile <- function(sqlContext, path) { +#' @method jsonFile default + +jsonFile.default <- function(path) { .Deprecated("read.json") - read.json(sqlContext, path) + read.json(path) } +jsonFile <- function(x, ...) { + dispatchFunc("jsonFile(path)", x, ...) +} #' JSON RDD #' @@ -254,6 +309,7 @@ jsonFile <- function(sqlContext, path) { #' df <- jsonRDD(sqlContext, rdd) #'} +# TODO: remove - this method is no longer exported # TODO: support schema jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) { .Deprecated("read.json") @@ -272,13 +328,15 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) { #' #' Loads a Parquet file, returning the result as a SparkDataFrame. #' -#' @param sqlContext SQLContext to use #' @param path Path of file to read. A vector of multiple paths is allowed. #' @return SparkDataFrame #' @rdname read.parquet -#' @name read.parquet #' @export -read.parquet <- function(sqlContext, path) { +#' @name read.parquet +#' @method read.parquet default + +read.parquet.default <- function(path) { + sqlContext <- getSqlContext() # Allow the user to have a more flexible definiton of the text file path paths <- as.list(suppressWarnings(normalizePath(path))) read <- callJMethod(sqlContext, "read") @@ -286,13 +344,22 @@ read.parquet <- function(sqlContext, path) { dataFrame(sdf) } +read.parquet <- function(x, ...) { + dispatchFunc("read.parquet(...)", x, ...) +} + #' @rdname read.parquet #' @name parquetFile #' @export -# TODO: Implement saveasParquetFile and write examples for both -parquetFile <- function(sqlContext, ...) { +#' @method parquetFile default + +parquetFile.default <- function(...) { .Deprecated("read.parquet") - read.parquet(sqlContext, unlist(list(...))) + read.parquet(unlist(list(...))) +} + +parquetFile <- function(x, ...) { + dispatchFunc("parquetFile(...)", x, ...) } #' Create a SparkDataFrame from a text file. @@ -302,20 +369,22 @@ parquetFile <- function(sqlContext, ...) { #' ignored in the resulting DataFrame. #' Each line in the text file is a new row in the resulting SparkDataFrame. #' -#' @param sqlContext SQLContext to use #' @param path Path of file to read. A vector of multiple paths is allowed. #' @return SparkDataFrame #' @rdname read.text -#' @name read.text #' @export #' @examples #'\dontrun{ #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.txt" -#' df <- read.text(sqlContext, path) +#' df <- read.text(path) #' } -read.text <- function(sqlContext, path) { +#' @name read.text +#' @method read.text default + +read.text.default <- function(path) { + sqlContext <- getSqlContext() # Allow the user to have a more flexible definiton of the text file path paths <- as.list(suppressWarnings(normalizePath(path))) read <- callJMethod(sqlContext, "read") @@ -323,27 +392,38 @@ read.text <- function(sqlContext, path) { dataFrame(sdf) } +read.text <- function(x, ...) { + dispatchFunc("read.text(path)", x, ...) +} + #' SQL Query #' #' Executes a SQL query using Spark, returning the result as a SparkDataFrame. #' -#' @param sqlContext SQLContext to use #' @param sqlQuery A character vector containing the SQL query #' @return SparkDataFrame +#' @rdname sql #' @export #' @examples #'\dontrun{ #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- read.json(sqlContext, path) +#' df <- read.json(path) #' registerTempTable(df, "table") -#' new_df <- sql(sqlContext, "SELECT * FROM table") +#' new_df <- sql("SELECT * FROM table") #' } +#' @name sql +#' @method sql default + +sql.default <- function(sqlQuery) { + sqlContext <- getSqlContext() + sdf <- callJMethod(sqlContext, "sql", sqlQuery) + dataFrame(sdf) +} -sql <- function(sqlContext, sqlQuery) { - sdf <- callJMethod(sqlContext, "sql", sqlQuery) - dataFrame(sdf) +sql <- function(x, ...) { + dispatchFunc("sql(sqlQuery)", x, ...) } #' Create a SparkDataFrame from a SparkSQL Table @@ -351,7 +431,6 @@ sql <- function(sqlContext, sqlQuery) { #' Returns the specified Table as a SparkDataFrame. The Table must have already been registered #' in the SQLContext. #' -#' @param sqlContext SQLContext to use #' @param tableName The SparkSQL Table to convert to a SparkDataFrame. #' @return SparkDataFrame #' @rdname tableToDF @@ -362,12 +441,14 @@ sql <- function(sqlContext, sqlQuery) { #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- read.json(sqlContext, path) +#' df <- read.json(path) #' registerTempTable(df, "table") -#' new_df <- tableToDF(sqlContext, "table") +#' new_df <- tableToDF("table") #' } +#' @note since 2.0.0 -tableToDF <- function(sqlContext, tableName) { +tableToDF <- function(tableName) { + sqlContext <- getSqlContext() sdf <- callJMethod(sqlContext, "table", tableName) dataFrame(sdf) } @@ -376,18 +457,21 @@ tableToDF <- function(sqlContext, tableName) { #' #' Returns a SparkDataFrame containing names of tables in the given database. #' -#' @param sqlContext SQLContext to use #' @param databaseName name of the database #' @return a SparkDataFrame +#' @rdname tables #' @export #' @examples #'\dontrun{ #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) -#' tables(sqlContext, "hive") +#' tables("hive") #' } +#' @name tables +#' @method tables default -tables <- function(sqlContext, databaseName = NULL) { +tables.default <- function(databaseName = NULL) { + sqlContext <- getSqlContext() jdf <- if (is.null(databaseName)) { callJMethod(sqlContext, "tables") } else { @@ -396,23 +480,29 @@ tables <- function(sqlContext, databaseName = NULL) { dataFrame(jdf) } +tables <- function(x, ...) { + dispatchFunc("tables(databaseName = NULL)", x, ...) +} #' Table Names #' #' Returns the names of tables in the given database as an array. #' -#' @param sqlContext SQLContext to use #' @param databaseName name of the database #' @return a list of table names +#' @rdname tableNames #' @export #' @examples #'\dontrun{ #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) -#' tableNames(sqlContext, "hive") +#' tableNames("hive") #' } +#' @name tableNames +#' @method tableNames default -tableNames <- function(sqlContext, databaseName = NULL) { +tableNames.default <- function(databaseName = NULL) { + sqlContext <- getSqlContext() if (is.null(databaseName)) { callJMethod(sqlContext, "tableNames") } else { @@ -420,88 +510,121 @@ tableNames <- function(sqlContext, databaseName = NULL) { } } +tableNames <- function(x, ...) { + dispatchFunc("tableNames(databaseName = NULL)", x, ...) +} #' Cache Table #' #' Caches the specified table in-memory. #' -#' @param sqlContext SQLContext to use #' @param tableName The name of the table being cached #' @return SparkDataFrame +#' @rdname cacheTable #' @export #' @examples #'\dontrun{ #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- read.json(sqlContext, path) +#' df <- read.json(path) #' registerTempTable(df, "table") -#' cacheTable(sqlContext, "table") +#' cacheTable("table") #' } +#' @name cacheTable +#' @method cacheTable default -cacheTable <- function(sqlContext, tableName) { +cacheTable.default <- function(tableName) { + sqlContext <- getSqlContext() callJMethod(sqlContext, "cacheTable", tableName) } +cacheTable <- function(x, ...) { + dispatchFunc("cacheTable(tableName)", x, ...) +} + #' Uncache Table #' #' Removes the specified table from the in-memory cache. #' -#' @param sqlContext SQLContext to use #' @param tableName The name of the table being uncached #' @return SparkDataFrame +#' @rdname uncacheTable #' @export #' @examples #'\dontrun{ #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- read.json(sqlContext, path) +#' df <- read.json(path) #' registerTempTable(df, "table") -#' uncacheTable(sqlContext, "table") +#' uncacheTable("table") #' } +#' @name uncacheTable +#' @method uncacheTable default -uncacheTable <- function(sqlContext, tableName) { +uncacheTable.default <- function(tableName) { + sqlContext <- getSqlContext() callJMethod(sqlContext, "uncacheTable", tableName) } +uncacheTable <- function(x, ...) { + dispatchFunc("uncacheTable(tableName)", x, ...) +} + #' Clear Cache #' #' Removes all cached tables from the in-memory cache. #' -#' @param sqlContext SQLContext to use +#' @rdname clearCache +#' @export #' @examples #' \dontrun{ -#' clearCache(sqlContext) +#' clearCache() #' } +#' @name clearCache +#' @method clearCache default -clearCache <- function(sqlContext) { +clearCache.default <- function() { + sqlContext <- getSqlContext() callJMethod(sqlContext, "clearCache") } +clearCache <- function() { + dispatchFunc("clearCache()") +} + #' Drop Temporary Table #' #' Drops the temporary table with the given table name in the catalog. #' If the table has been cached/persisted before, it's also unpersisted. #' -#' @param sqlContext SQLContext to use #' @param tableName The name of the SparkSQL table to be dropped. +#' @rdname dropTempTable +#' @export #' @examples #' \dontrun{ #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) -#' df <- read.df(sqlContext, path, "parquet") +#' df <- read.df(path, "parquet") #' registerTempTable(df, "table") -#' dropTempTable(sqlContext, "table") +#' dropTempTable("table") #' } +#' @name dropTempTable +#' @method dropTempTable default -dropTempTable <- function(sqlContext, tableName) { +dropTempTable.default <- function(tableName) { + sqlContext <- getSqlContext() if (class(tableName) != "character") { stop("tableName must be a string.") } callJMethod(sqlContext, "dropTempTable", tableName) } +dropTempTable <- function(x, ...) { + dispatchFunc("dropTempTable(tableName)", x, ...) +} + #' Load a SparkDataFrame #' #' Returns the dataset in a data source as a SparkDataFrame @@ -510,7 +633,6 @@ dropTempTable <- function(sqlContext, tableName) { #' If `source` is not specified, the default data source configured by #' "spark.sql.sources.default" will be used. #' -#' @param sqlContext SQLContext to use #' @param path The path of files to load #' @param source The name of external data source #' @param schema The data schema defined in structType @@ -522,20 +644,22 @@ dropTempTable <- function(sqlContext, tableName) { #'\dontrun{ #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) -#' df1 <- read.df(sqlContext, "path/to/file.json", source = "json") +#' df1 <- read.df("path/to/file.json", source = "json") #' schema <- structType(structField("name", "string"), #' structField("info", "map<string,double>")) -#' df2 <- read.df(sqlContext, mapTypeJsonPath, "json", schema) -#' df3 <- loadDF(sqlContext, "data/test_table", "parquet", mergeSchema = "true") +#' df2 <- read.df(mapTypeJsonPath, "json", schema) +#' df3 <- loadDF("data/test_table", "parquet", mergeSchema = "true") #' } +#' @name read.df +#' @method read.df default -read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) { +read.df.default <- function(path = NULL, source = NULL, schema = NULL, ...) { + sqlContext <- getSqlContext() options <- varargsToEnv(...) if (!is.null(path)) { options[["path"]] <- path } if (is.null(source)) { - sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv) source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default", "org.apache.spark.sql.parquet") } @@ -549,10 +673,20 @@ read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) dataFrame(sdf) } +read.df <- function(x, ...) { + dispatchFunc("read.df(path = NULL, source = NULL, schema = NULL, ...)", x, ...) +} + #' @rdname read.df #' @name loadDF -loadDF <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) { - read.df(sqlContext, path, source, schema, ...) +#' @method loadDF default + +loadDF.default <- function(path = NULL, source = NULL, schema = NULL, ...) { + read.df(path, source, schema, ...) +} + +loadDF <- function(x, ...) { + dispatchFunc("loadDF(path = NULL, source = NULL, schema = NULL, ...)", x, ...) } #' Create an external table @@ -564,20 +698,23 @@ loadDF <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) { #' If `source` is not specified, the default data source configured by #' "spark.sql.sources.default" will be used. #' -#' @param sqlContext SQLContext to use #' @param tableName A name of the table #' @param path The path of files to load #' @param source the name of external data source #' @return SparkDataFrame +#' @rdname createExternalTable #' @export #' @examples #'\dontrun{ #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) -#' df <- sparkRSQL.createExternalTable(sqlContext, "myjson", path="path/to/json", source="json") +#' df <- sparkRSQL.createExternalTable("myjson", path="path/to/json", source="json") #' } +#' @name createExternalTable +#' @method createExternalTable default -createExternalTable <- function(sqlContext, tableName, path = NULL, source = NULL, ...) { +createExternalTable.default <- function(tableName, path = NULL, source = NULL, ...) { + sqlContext <- getSqlContext() options <- varargsToEnv(...) if (!is.null(path)) { options[["path"]] <- path @@ -586,6 +723,10 @@ createExternalTable <- function(sqlContext, tableName, path = NULL, source = NUL dataFrame(sdf) } +createExternalTable <- function(x, ...) { + dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...) +} + #' Create a SparkDataFrame representing the database table accessible via JDBC URL #' #' Additional JDBC database connection properties can be set (...) @@ -596,7 +737,6 @@ createExternalTable <- function(sqlContext, tableName, path = NULL, source = NUL #' Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash #' your external database systems. #' -#' @param sqlContext SQLContext to use #' @param url JDBC database url of the form `jdbc:subprotocol:subname` #' @param tableName the name of the table in the external database #' @param partitionColumn the name of a column of integral type that will be used for partitioning @@ -616,12 +756,13 @@ createExternalTable <- function(sqlContext, tableName, path = NULL, source = NUL #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) #' jdbcUrl <- "jdbc:mysql://localhost:3306/databasename" -#' df <- read.jdbc(sqlContext, jdbcUrl, "table", predicates = list("field<=123"), user = "username") -#' df2 <- read.jdbc(sqlContext, jdbcUrl, "table2", partitionColumn = "index", lowerBound = 0, +#' df <- read.jdbc(jdbcUrl, "table", predicates = list("field<=123"), user = "username") +#' df2 <- read.jdbc(jdbcUrl, "table2", partitionColumn = "index", lowerBound = 0, #' upperBound = 10000, user = "username", password = "password") #' } +#' @note since 2.0.0 -read.jdbc <- function(sqlContext, url, tableName, +read.jdbc <- function(url, tableName, partitionColumn = NULL, lowerBound = NULL, upperBound = NULL, numPartitions = 0L, predicates = list(), ...) { jprops <- varargsToJProperties(...) @@ -629,6 +770,7 @@ read.jdbc <- function(sqlContext, url, tableName, read <- callJMethod(sqlContext, "read") if (!is.null(partitionColumn)) { if (is.null(numPartitions) || numPartitions == 0) { + sqlContext <- getSqlContext() sc <- callJMethod(sqlContext, "sparkContext") numPartitions <- callJMethod(sc, "defaultParallelism") } else { diff --git a/R/pkg/R/jobj.R b/R/pkg/R/jobj.R index 0838a7bb35..898e80648f 100644 --- a/R/pkg/R/jobj.R +++ b/R/pkg/R/jobj.R @@ -77,6 +77,11 @@ print.jobj <- function(x, ...) { cat("Java ref type", name, "id", x$id, "\n", sep = " ") } +getClassName.jobj <- function(x) { + cls <- callJMethod(x, "getClass") + callJMethod(cls, "getName") +} + cleanup.jobj <- function(jobj) { if (isValidJobj(jobj)) { objId <- jobj$id diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index 95258babbf..c1f1a8932d 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -58,7 +58,7 @@ test_that("repeatedly starting and stopping SparkR SQL", { for (i in 1:4) { sc <- sparkR.init() sqlContext <- sparkRSQL.init(sc) - df <- createDataFrame(sqlContext, data.frame(a = 1:20)) + df <- createDataFrame(data.frame(a = 1:20)) expect_equal(count(df), 20) sparkR.stop() } diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 5f8a27d4e0..59ef15c1e9 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -26,7 +26,7 @@ sc <- sparkR.init() sqlContext <- sparkRSQL.init(sc) test_that("formula of spark.glm", { - training <- suppressWarnings(createDataFrame(sqlContext, iris)) + training <- suppressWarnings(createDataFrame(iris)) # directly calling the spark API # dot minus and intercept vs native glm model <- spark.glm(training, Sepal_Width ~ . - Species + 0) @@ -41,7 +41,7 @@ test_that("formula of spark.glm", { expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) # glm should work with long formula - training <- suppressWarnings(createDataFrame(sqlContext, iris)) + training <- suppressWarnings(createDataFrame(iris)) training$LongLongLongLongLongName <- training$Sepal_Width training$VeryLongLongLongLonLongName <- training$Sepal_Length training$AnotherLongLongLongLongName <- training$Species @@ -53,7 +53,7 @@ test_that("formula of spark.glm", { }) test_that("spark.glm and predict", { - training <- suppressWarnings(createDataFrame(sqlContext, iris)) + training <- suppressWarnings(createDataFrame(iris)) # gaussian family model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species) prediction <- predict(model, training) @@ -80,7 +80,7 @@ test_that("spark.glm and predict", { test_that("spark.glm summary", { # gaussian family - training <- suppressWarnings(createDataFrame(sqlContext, iris)) + training <- suppressWarnings(createDataFrame(iris)) stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species)) rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris)) @@ -99,7 +99,7 @@ test_that("spark.glm summary", { expect_equal(stats$aic, rStats$aic) # binomial family - df <- suppressWarnings(createDataFrame(sqlContext, iris)) + df <- suppressWarnings(createDataFrame(iris)) training <- df[df$Species %in% c("versicolor", "virginica"), ] stats <- summary(spark.glm(training, Species ~ Sepal_Length + Sepal_Width, family = binomial(link = "logit"))) @@ -128,7 +128,7 @@ test_that("spark.glm summary", { }) test_that("spark.glm save/load", { - training <- suppressWarnings(createDataFrame(sqlContext, iris)) + training <- suppressWarnings(createDataFrame(iris)) m <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species) s <- summary(m) @@ -157,7 +157,7 @@ test_that("spark.glm save/load", { test_that("formula of glm", { - training <- suppressWarnings(createDataFrame(sqlContext, iris)) + training <- suppressWarnings(createDataFrame(iris)) # dot minus and intercept vs native glm model <- glm(Sepal_Width ~ . - Species + 0, data = training) vals <- collect(select(predict(model, training), "prediction")) @@ -171,7 +171,7 @@ test_that("formula of glm", { expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) # glm should work with long formula - training <- suppressWarnings(createDataFrame(sqlContext, iris)) + training <- suppressWarnings(createDataFrame(iris)) training$LongLongLongLongLongName <- training$Sepal_Width training$VeryLongLongLongLonLongName <- training$Sepal_Length training$AnotherLongLongLongLongName <- training$Species @@ -183,7 +183,7 @@ test_that("formula of glm", { }) test_that("glm and predict", { - training <- suppressWarnings(createDataFrame(sqlContext, iris)) + training <- suppressWarnings(createDataFrame(iris)) # gaussian family model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training) prediction <- predict(model, training) @@ -210,7 +210,7 @@ test_that("glm and predict", { test_that("glm summary", { # gaussian family - training <- suppressWarnings(createDataFrame(sqlContext, iris)) + training <- suppressWarnings(createDataFrame(iris)) stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training)) rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris)) @@ -229,7 +229,7 @@ test_that("glm summary", { expect_equal(stats$aic, rStats$aic) # binomial family - df <- suppressWarnings(createDataFrame(sqlContext, iris)) + df <- suppressWarnings(createDataFrame(iris)) training <- df[df$Species %in% c("versicolor", "virginica"), ] stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training, family = binomial(link = "logit"))) @@ -258,7 +258,7 @@ test_that("glm summary", { }) test_that("glm save/load", { - training <- suppressWarnings(createDataFrame(sqlContext, iris)) + training <- suppressWarnings(createDataFrame(iris)) m <- glm(Sepal_Width ~ Sepal_Length + Species, data = training) s <- summary(m) @@ -287,7 +287,7 @@ test_that("glm save/load", { test_that("spark.kmeans", { newIris <- iris newIris$Species <- NULL - training <- suppressWarnings(createDataFrame(sqlContext, newIris)) + training <- suppressWarnings(createDataFrame(newIris)) take(training, 1) @@ -365,7 +365,7 @@ test_that("spark.naiveBayes", { t <- as.data.frame(Titanic) t1 <- t[t$Freq > 0, -5] - df <- suppressWarnings(createDataFrame(sqlContext, t1)) + df <- suppressWarnings(createDataFrame(t1)) m <- spark.naiveBayes(df, Survived ~ .) s <- summary(m) expect_equal(as.double(s$apriori[1, "Yes"]), 0.5833333, tolerance = 1e-6) @@ -420,7 +420,7 @@ test_that("spark.survreg", { # data <- list(list(4, 1, 0, 0), list(3, 1, 2, 0), list(1, 1, 1, 0), list(1, 0, 1, 0), list(2, 1, 1, 1), list(2, 1, 0, 1), list(3, 0, 0, 1)) - df <- createDataFrame(sqlContext, data, c("time", "status", "x", "sex")) + df <- createDataFrame(data, c("time", "status", "x", "sex")) model <- spark.survreg(df, Surv(time, status) ~ x + sex) stats <- summary(model) coefs <- as.vector(stats$coefficients[, 1]) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index b2d769f2ac..94fa363d7e 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -32,6 +32,22 @@ markUtf8 <- function(s) { s } +setHiveContext <- function(sc) { + ssc <- callJMethod(sc, "sc") + hiveCtx <- tryCatch({ + newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc) + }, + error = function(err) { + skip("Hive is not build with SparkSQL, skipped") + }) + assign(".sparkRHivesc", hiveCtx, envir = .sparkREnv) + hiveCtx +} + +unsetHiveContext <- function() { + remove(".sparkRHivesc", envir = .sparkREnv) +} + # Tests for SparkSQL functions in SparkR sc <- sparkR.init() @@ -99,8 +115,8 @@ test_that("structType and structField", { test_that("create DataFrame from RDD", { rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) - df <- createDataFrame(sqlContext, rdd, list("a", "b")) - dfAsDF <- as.DataFrame(sqlContext, rdd, list("a", "b")) + df <- createDataFrame(rdd, list("a", "b")) + dfAsDF <- as.DataFrame(rdd, list("a", "b")) expect_is(df, "SparkDataFrame") expect_is(dfAsDF, "SparkDataFrame") expect_equal(count(df), 10) @@ -116,8 +132,8 @@ test_that("create DataFrame from RDD", { expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) expect_equal(dtypes(dfAsDF), list(c("a", "int"), c("b", "string"))) - df <- createDataFrame(sqlContext, rdd) - dfAsDF <- as.DataFrame(sqlContext, rdd) + df <- createDataFrame(rdd) + dfAsDF <- as.DataFrame(rdd) expect_is(df, "SparkDataFrame") expect_is(dfAsDF, "SparkDataFrame") expect_equal(columns(df), c("_1", "_2")) @@ -125,13 +141,13 @@ test_that("create DataFrame from RDD", { schema <- structType(structField(x = "a", type = "integer", nullable = TRUE), structField(x = "b", type = "string", nullable = TRUE)) - df <- createDataFrame(sqlContext, rdd, schema) + df <- createDataFrame(rdd, schema) expect_is(df, "SparkDataFrame") expect_equal(columns(df), c("a", "b")) expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) rdd <- lapply(parallelize(sc, 1:10), function(x) { list(a = x, b = as.character(x)) }) - df <- createDataFrame(sqlContext, rdd) + df <- createDataFrame(rdd) expect_is(df, "SparkDataFrame") expect_equal(count(df), 10) expect_equal(columns(df), c("a", "b")) @@ -139,9 +155,9 @@ test_that("create DataFrame from RDD", { schema <- structType(structField("name", "string"), structField("age", "integer"), structField("height", "float")) - df <- read.df(sqlContext, jsonPathNa, "json", schema) - df2 <- createDataFrame(sqlContext, toRDD(df), schema) - df2AsDF <- as.DataFrame(sqlContext, toRDD(df), schema) + df <- read.df(jsonPathNa, "json", schema) + df2 <- createDataFrame(toRDD(df), schema) + df2AsDF <- as.DataFrame(toRDD(df), schema) expect_equal(columns(df2), c("name", "age", "height")) expect_equal(columns(df2AsDF), c("name", "age", "height")) expect_equal(dtypes(df2), list(c("name", "string"), c("age", "int"), c("height", "float"))) @@ -154,7 +170,7 @@ test_that("create DataFrame from RDD", { localDF <- data.frame(name = c("John", "Smith", "Sarah"), age = c(19L, 23L, 18L), height = c(176.5, 181.4, 173.7)) - df <- createDataFrame(sqlContext, localDF, schema) + df <- createDataFrame(localDF, schema) expect_is(df, "SparkDataFrame") expect_equal(count(df), 3) expect_equal(columns(df), c("name", "age", "height")) @@ -162,55 +178,50 @@ test_that("create DataFrame from RDD", { expect_equal(as.list(collect(where(df, df$name == "John"))), list(name = "John", age = 19L, height = 176.5)) - ssc <- callJMethod(sc, "sc") - hiveCtx <- tryCatch({ - newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc) - }, - error = function(err) { - skip("Hive is not build with SparkSQL, skipped") - }) - sql(hiveCtx, "CREATE TABLE people (name string, age double, height float)") - df <- read.df(hiveCtx, jsonPathNa, "json", schema) + setHiveContext(sc) + sql("CREATE TABLE people (name string, age double, height float)") + df <- read.df(jsonPathNa, "json", schema) invisible(insertInto(df, "people")) - expect_equal(collect(sql(hiveCtx, "SELECT age from people WHERE name = 'Bob'"))$age, + expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age, c(16)) - expect_equal(collect(sql(hiveCtx, "SELECT height from people WHERE name ='Bob'"))$height, + expect_equal(collect(sql("SELECT height from people WHERE name ='Bob'"))$height, c(176.5)) + unsetHiveContext() }) test_that("convert NAs to null type in DataFrames", { rdd <- parallelize(sc, list(list(1L, 2L), list(NA, 4L))) - df <- createDataFrame(sqlContext, rdd, list("a", "b")) + df <- createDataFrame(rdd, list("a", "b")) expect_true(is.na(collect(df)[2, "a"])) expect_equal(collect(df)[2, "b"], 4L) l <- data.frame(x = 1L, y = c(1L, NA_integer_, 3L)) - df <- createDataFrame(sqlContext, l) + df <- createDataFrame(l) expect_equal(collect(df)[2, "x"], 1L) expect_true(is.na(collect(df)[2, "y"])) rdd <- parallelize(sc, list(list(1, 2), list(NA, 4))) - df <- createDataFrame(sqlContext, rdd, list("a", "b")) + df <- createDataFrame(rdd, list("a", "b")) expect_true(is.na(collect(df)[2, "a"])) expect_equal(collect(df)[2, "b"], 4) l <- data.frame(x = 1, y = c(1, NA_real_, 3)) - df <- createDataFrame(sqlContext, l) + df <- createDataFrame(l) expect_equal(collect(df)[2, "x"], 1) expect_true(is.na(collect(df)[2, "y"])) l <- list("a", "b", NA, "d") - df <- createDataFrame(sqlContext, l) + df <- createDataFrame(l) expect_true(is.na(collect(df)[3, "_1"])) expect_equal(collect(df)[4, "_1"], "d") l <- list("a", "b", NA_character_, "d") - df <- createDataFrame(sqlContext, l) + df <- createDataFrame(l) expect_true(is.na(collect(df)[3, "_1"])) expect_equal(collect(df)[4, "_1"], "d") l <- list(TRUE, FALSE, NA, TRUE) - df <- createDataFrame(sqlContext, l) + df <- createDataFrame(l) expect_true(is.na(collect(df)[3, "_1"])) expect_equal(collect(df)[4, "_1"], TRUE) }) @@ -244,40 +255,40 @@ test_that("toDF", { test_that("create DataFrame from list or data.frame", { l <- list(list(1, 2), list(3, 4)) - df <- createDataFrame(sqlContext, l, c("a", "b")) + df <- createDataFrame(l, c("a", "b")) expect_equal(columns(df), c("a", "b")) l <- list(list(a = 1, b = 2), list(a = 3, b = 4)) - df <- createDataFrame(sqlContext, l) + df <- createDataFrame(l) expect_equal(columns(df), c("a", "b")) a <- 1:3 b <- c("a", "b", "c") ldf <- data.frame(a, b) - df <- createDataFrame(sqlContext, ldf) + df <- createDataFrame(ldf) expect_equal(columns(df), c("a", "b")) expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) expect_equal(count(df), 3) ldf2 <- collect(df) expect_equal(ldf$a, ldf2$a) - irisdf <- suppressWarnings(createDataFrame(sqlContext, iris)) + irisdf <- suppressWarnings(createDataFrame(iris)) iris_collected <- collect(irisdf) expect_equivalent(iris_collected[, -5], iris[, -5]) expect_equal(iris_collected$Species, as.character(iris$Species)) - mtcarsdf <- createDataFrame(sqlContext, mtcars) + mtcarsdf <- createDataFrame(mtcars) expect_equivalent(collect(mtcarsdf), mtcars) bytes <- as.raw(c(1, 2, 3)) - df <- createDataFrame(sqlContext, list(list(bytes))) + df <- createDataFrame(list(list(bytes))) expect_equal(collect(df)[[1]][[1]], bytes) }) test_that("create DataFrame with different data types", { l <- list(a = 1L, b = 2, c = TRUE, d = "ss", e = as.Date("2012-12-13"), f = as.POSIXct("2015-03-15 12:13:14.056")) - df <- createDataFrame(sqlContext, list(l)) + df <- createDataFrame(list(l)) expect_equal(dtypes(df), list(c("a", "int"), c("b", "double"), c("c", "boolean"), c("d", "string"), c("e", "date"), c("f", "timestamp"))) expect_equal(count(df), 1) @@ -291,7 +302,7 @@ test_that("create DataFrame with complex types", { s <- listToStruct(list(a = "aa", b = 3L)) l <- list(as.list(1:10), list("a", "b"), e, s) - df <- createDataFrame(sqlContext, list(l), c("a", "b", "c", "d")) + df <- createDataFrame(list(l), c("a", "b", "c", "d")) expect_equal(dtypes(df), list(c("a", "array<int>"), c("b", "array<string>"), c("c", "map<string,int>"), @@ -318,7 +329,7 @@ test_that("create DataFrame from a data.frame with complex types", { ldf$a_list <- list(list(1, 2), list(3, 4)) ldf$an_envir <- c(as.environment(list(a = 1, b = 2)), as.environment(list(c = 3))) - sdf <- createDataFrame(sqlContext, ldf) + sdf <- createDataFrame(ldf) collected <- collect(sdf) expect_identical(ldf[, 1, FALSE], collected[, 1, FALSE]) @@ -334,7 +345,7 @@ writeLines(mockLinesMapType, mapTypeJsonPath) test_that("Collect DataFrame with complex types", { # ArrayType - df <- read.json(sqlContext, complexTypeJsonPath) + df <- read.json(complexTypeJsonPath) ldf <- collect(df) expect_equal(nrow(ldf), 3) expect_equal(ncol(ldf), 3) @@ -346,7 +357,7 @@ test_that("Collect DataFrame with complex types", { # MapType schema <- structType(structField("name", "string"), structField("info", "map<string,double>")) - df <- read.df(sqlContext, mapTypeJsonPath, "json", schema) + df <- read.df(mapTypeJsonPath, "json", schema) expect_equal(dtypes(df), list(c("name", "string"), c("info", "map<string,double>"))) ldf <- collect(df) @@ -360,7 +371,7 @@ test_that("Collect DataFrame with complex types", { expect_equal(bob$height, 176.5) # StructType - df <- read.json(sqlContext, mapTypeJsonPath) + df <- read.json(mapTypeJsonPath) expect_equal(dtypes(df), list(c("info", "struct<age:bigint,height:double>"), c("name", "string"))) ldf <- collect(df) @@ -376,7 +387,7 @@ test_that("Collect DataFrame with complex types", { test_that("read/write json files", { # Test read.df - df <- read.df(sqlContext, jsonPath, "json") + df <- read.df(jsonPath, "json") expect_is(df, "SparkDataFrame") expect_equal(count(df), 3) @@ -384,17 +395,17 @@ test_that("read/write json files", { schema <- structType(structField("name", type = "string"), structField("age", type = "double")) - df1 <- read.df(sqlContext, jsonPath, "json", schema) + df1 <- read.df(jsonPath, "json", schema) expect_is(df1, "SparkDataFrame") expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double"))) # Test loadDF - df2 <- loadDF(sqlContext, jsonPath, "json", schema) + df2 <- loadDF(jsonPath, "json", schema) expect_is(df2, "SparkDataFrame") expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double"))) # Test read.json - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) expect_is(df, "SparkDataFrame") expect_equal(count(df), 3) @@ -407,11 +418,11 @@ test_that("read/write json files", { write.json(df, jsonPath3) # Test read.json()/jsonFile() works with multiple input paths - jsonDF1 <- read.json(sqlContext, c(jsonPath2, jsonPath3)) + jsonDF1 <- read.json(c(jsonPath2, jsonPath3)) expect_is(jsonDF1, "SparkDataFrame") expect_equal(count(jsonDF1), 6) # Suppress warnings because jsonFile is deprecated - jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath2, jsonPath3))) + jsonDF2 <- suppressWarnings(jsonFile(c(jsonPath2, jsonPath3))) expect_is(jsonDF2, "SparkDataFrame") expect_equal(count(jsonDF2), 6) @@ -433,82 +444,82 @@ test_that("jsonRDD() on a RDD with json string", { }) test_that("test cache, uncache and clearCache", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) registerTempTable(df, "table1") - cacheTable(sqlContext, "table1") - uncacheTable(sqlContext, "table1") - clearCache(sqlContext) - dropTempTable(sqlContext, "table1") + cacheTable("table1") + uncacheTable("table1") + clearCache() + dropTempTable("table1") }) test_that("test tableNames and tables", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) registerTempTable(df, "table1") - expect_equal(length(tableNames(sqlContext)), 1) - df <- tables(sqlContext) + expect_equal(length(tableNames()), 1) + df <- tables() expect_equal(count(df), 1) - dropTempTable(sqlContext, "table1") + dropTempTable("table1") }) test_that("registerTempTable() results in a queryable table and sql() results in a new DataFrame", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) registerTempTable(df, "table1") - newdf <- sql(sqlContext, "SELECT * FROM table1 where name = 'Michael'") + newdf <- sql("SELECT * FROM table1 where name = 'Michael'") expect_is(newdf, "SparkDataFrame") expect_equal(count(newdf), 1) - dropTempTable(sqlContext, "table1") + dropTempTable("table1") }) test_that("insertInto() on a registered table", { - df <- read.df(sqlContext, jsonPath, "json") + df <- read.df(jsonPath, "json") write.df(df, parquetPath, "parquet", "overwrite") - dfParquet <- read.df(sqlContext, parquetPath, "parquet") + dfParquet <- read.df(parquetPath, "parquet") lines <- c("{\"name\":\"Bob\", \"age\":24}", "{\"name\":\"James\", \"age\":35}") jsonPath2 <- tempfile(pattern = "jsonPath2", fileext = ".tmp") parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet") writeLines(lines, jsonPath2) - df2 <- read.df(sqlContext, jsonPath2, "json") + df2 <- read.df(jsonPath2, "json") write.df(df2, parquetPath2, "parquet", "overwrite") - dfParquet2 <- read.df(sqlContext, parquetPath2, "parquet") + dfParquet2 <- read.df(parquetPath2, "parquet") registerTempTable(dfParquet, "table1") insertInto(dfParquet2, "table1") - expect_equal(count(sql(sqlContext, "select * from table1")), 5) - expect_equal(first(sql(sqlContext, "select * from table1 order by age"))$name, "Michael") - dropTempTable(sqlContext, "table1") + expect_equal(count(sql("select * from table1")), 5) + expect_equal(first(sql("select * from table1 order by age"))$name, "Michael") + dropTempTable("table1") registerTempTable(dfParquet, "table1") insertInto(dfParquet2, "table1", overwrite = TRUE) - expect_equal(count(sql(sqlContext, "select * from table1")), 2) - expect_equal(first(sql(sqlContext, "select * from table1 order by age"))$name, "Bob") - dropTempTable(sqlContext, "table1") + expect_equal(count(sql("select * from table1")), 2) + expect_equal(first(sql("select * from table1 order by age"))$name, "Bob") + dropTempTable("table1") unlink(jsonPath2) unlink(parquetPath2) }) test_that("tableToDF() returns a new DataFrame", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) registerTempTable(df, "table1") - tabledf <- tableToDF(sqlContext, "table1") + tabledf <- tableToDF("table1") expect_is(tabledf, "SparkDataFrame") expect_equal(count(tabledf), 3) - tabledf2 <- tableToDF(sqlContext, "table1") + tabledf2 <- tableToDF("table1") expect_equal(count(tabledf2), 3) - dropTempTable(sqlContext, "table1") + dropTempTable("table1") }) test_that("toRDD() returns an RRDD", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) testRDD <- toRDD(df) expect_is(testRDD, "RDD") expect_equal(count(testRDD), 3) }) test_that("union on two RDDs created from DataFrames returns an RRDD", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) RDD1 <- toRDD(df) RDD2 <- toRDD(df) unioned <- unionRDD(RDD1, RDD2) @@ -530,7 +541,7 @@ test_that("union on mixed serialization types correctly returns a byte RRDD", { writeLines(textLines, textPath) textRDD <- textFile(sc, textPath) - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) dfRDD <- toRDD(df) unionByte <- unionRDD(rdd, dfRDD) @@ -548,7 +559,7 @@ test_that("union on mixed serialization types correctly returns a byte RRDD", { test_that("objectFile() works with row serialization", { objectPath <- tempfile(pattern = "spark-test", fileext = ".tmp") - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) dfRDD <- toRDD(df) saveAsObjectFile(coalesce(dfRDD, 1L), objectPath) objectIn <- objectFile(sc, objectPath) @@ -559,7 +570,7 @@ test_that("objectFile() works with row serialization", { }) test_that("lapply() on a DataFrame returns an RDD with the correct columns", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) testRDD <- lapply(df, function(row) { row$newCol <- row$age + 5 row @@ -571,7 +582,7 @@ test_that("lapply() on a DataFrame returns an RDD with the correct columns", { }) test_that("collect() returns a data.frame", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) rdf <- collect(df) expect_true(is.data.frame(rdf)) expect_equal(names(rdf)[1], "age") @@ -587,20 +598,20 @@ test_that("collect() returns a data.frame", { expect_equal(ncol(rdf), 2) # collect() correctly handles multiple columns with same name - df <- createDataFrame(sqlContext, list(list(1, 2)), schema = c("name", "name")) + df <- createDataFrame(list(list(1, 2)), schema = c("name", "name")) ldf <- collect(df) expect_equal(names(ldf), c("name", "name")) }) test_that("limit() returns DataFrame with the correct number of rows", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) dfLimited <- limit(df, 2) expect_is(dfLimited, "SparkDataFrame") expect_equal(count(dfLimited), 2) }) test_that("collect() and take() on a DataFrame return the same number of rows and columns", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) expect_equal(nrow(collect(df)), nrow(take(df, 10))) expect_equal(ncol(collect(df)), ncol(take(df, 10))) }) @@ -614,7 +625,7 @@ test_that("collect() support Unicode characters", { jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") writeLines(lines, jsonPath) - df <- read.df(sqlContext, jsonPath, "json") + df <- read.df(jsonPath, "json") rdf <- collect(df) expect_true(is.data.frame(rdf)) expect_equal(rdf$name[1], markUtf8("안녕하세요")) @@ -622,12 +633,12 @@ test_that("collect() support Unicode characters", { expect_equal(rdf$name[3], markUtf8("ã“ã‚“ã«ã¡ã¯")) expect_equal(rdf$name[4], markUtf8("Xin chà o")) - df1 <- createDataFrame(sqlContext, rdf) + df1 <- createDataFrame(rdf) expect_equal(collect(where(df1, df1$name == markUtf8("您好")))$name, markUtf8("您好")) }) test_that("multiple pipeline transformations result in an RDD with the correct values", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) first <- lapply(df, function(row) { row$age <- row$age + 5 row @@ -644,7 +655,7 @@ test_that("multiple pipeline transformations result in an RDD with the correct v }) test_that("cache(), persist(), and unpersist() on a DataFrame", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) expect_false(df@env$isCached) cache(df) expect_true(df@env$isCached) @@ -663,7 +674,7 @@ test_that("cache(), persist(), and unpersist() on a DataFrame", { }) test_that("schema(), dtypes(), columns(), names() return the correct values/format", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) testSchema <- schema(df) expect_equal(length(testSchema$fields()), 2) expect_equal(testSchema$fields()[[1]]$dataType.toString(), "LongType") @@ -684,7 +695,7 @@ test_that("schema(), dtypes(), columns(), names() return the correct values/form }) test_that("names() colnames() set the column names", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) names(df) <- c("col1", "col2") expect_equal(colnames(df)[2], "col2") @@ -699,7 +710,7 @@ test_that("names() colnames() set the column names", { expect_error(colnames(df) <- c("1", NA), "Column names cannot be NA.") # Note: if this test is broken, remove check for "." character on colnames<- method - irisDF <- suppressWarnings(createDataFrame(sqlContext, iris)) + irisDF <- suppressWarnings(createDataFrame(iris)) expect_equal(names(irisDF)[1], "Sepal_Length") # Test base::colnames base::names @@ -715,7 +726,7 @@ test_that("names() colnames() set the column names", { }) test_that("head() and first() return the correct data", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) testHead <- head(df) expect_equal(nrow(testHead), 3) expect_equal(ncol(testHead), 2) @@ -748,7 +759,7 @@ test_that("distinct(), unique() and dropDuplicates() on DataFrames", { jsonPathWithDup <- tempfile(pattern = "sparkr-test", fileext = ".tmp") writeLines(lines, jsonPathWithDup) - df <- read.json(sqlContext, jsonPathWithDup) + df <- read.json(jsonPathWithDup) uniques <- distinct(df) expect_is(uniques, "SparkDataFrame") expect_equal(count(uniques), 3) @@ -759,7 +770,6 @@ test_that("distinct(), unique() and dropDuplicates() on DataFrames", { # Test dropDuplicates() df <- createDataFrame( - sqlContext, list( list(2, 1, 2), list(1, 1, 1), list(1, 2, 1), list(2, 1, 2), @@ -795,7 +805,7 @@ test_that("distinct(), unique() and dropDuplicates() on DataFrames", { }) test_that("sample on a DataFrame", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) sampled <- sample(df, FALSE, 1.0) expect_equal(nrow(collect(sampled)), count(df)) expect_is(sampled, "SparkDataFrame") @@ -817,7 +827,7 @@ test_that("sample on a DataFrame", { }) test_that("select operators", { - df <- select(read.json(sqlContext, jsonPath), "name", "age") + df <- select(read.json(jsonPath), "name", "age") expect_is(df$name, "Column") expect_is(df[[2]], "Column") expect_is(df[["age"]], "Column") @@ -846,7 +856,7 @@ test_that("select operators", { }) test_that("select with column", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) df1 <- select(df, "name") expect_equal(columns(df1), c("name")) expect_equal(count(df1), 3) @@ -869,7 +879,7 @@ test_that("select with column", { }) test_that("drop column", { - df <- select(read.json(sqlContext, jsonPath), "name", "age") + df <- select(read.json(jsonPath), "name", "age") df1 <- drop(df, "name") expect_equal(columns(df1), c("age")) @@ -891,7 +901,7 @@ test_that("drop column", { test_that("subsetting", { # read.json returns columns in random order - df <- select(read.json(sqlContext, jsonPath), "name", "age") + df <- select(read.json(jsonPath), "name", "age") filtered <- df[df$age > 20, ] expect_equal(count(filtered), 1) expect_equal(columns(filtered), c("name", "age")) @@ -928,7 +938,7 @@ test_that("subsetting", { }) test_that("selectExpr() on a DataFrame", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) selected <- selectExpr(df, "age * 2") expect_equal(names(selected), "(age * 2)") expect_equal(collect(selected), collect(select(df, df$age * 2L))) @@ -939,12 +949,12 @@ test_that("selectExpr() on a DataFrame", { }) test_that("expr() on a DataFrame", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) expect_equal(collect(select(df, expr("abs(-123)")))[1, 1], 123) }) test_that("column calculation", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) d <- collect(select(df, alias(df$age + 1, "age2"))) expect_equal(names(d), c("age2")) df2 <- select(df, lower(df$name), abs(df$age)) @@ -953,40 +963,35 @@ test_that("column calculation", { }) test_that("test HiveContext", { - ssc <- callJMethod(sc, "sc") - hiveCtx <- tryCatch({ - newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc) - }, - error = function(err) { - skip("Hive is not build with SparkSQL, skipped") - }) - df <- createExternalTable(hiveCtx, "json", jsonPath, "json") + setHiveContext(sc) + df <- createExternalTable("json", jsonPath, "json") expect_is(df, "SparkDataFrame") expect_equal(count(df), 3) - df2 <- sql(hiveCtx, "select * from json") + df2 <- sql("select * from json") expect_is(df2, "SparkDataFrame") expect_equal(count(df2), 3) jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp") invisible(saveAsTable(df, "json2", "json", "append", path = jsonPath2)) - df3 <- sql(hiveCtx, "select * from json2") + df3 <- sql("select * from json2") expect_is(df3, "SparkDataFrame") expect_equal(count(df3), 3) unlink(jsonPath2) hivetestDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") invisible(saveAsTable(df, "hivetestbl", path = hivetestDataPath)) - df4 <- sql(hiveCtx, "select * from hivetestbl") + df4 <- sql("select * from hivetestbl") expect_is(df4, "SparkDataFrame") expect_equal(count(df4), 3) unlink(hivetestDataPath) parquetDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") invisible(saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath)) - df5 <- sql(hiveCtx, "select * from parquetest") + df5 <- sql("select * from parquetest") expect_is(df5, "SparkDataFrame") expect_equal(count(df5), 3) unlink(parquetDataPath) + unsetHiveContext() }) test_that("column operators", { @@ -1025,7 +1030,7 @@ test_that("column functions", { expect_equal(class(rank())[[1]], "Column") expect_equal(rank(1:3), as.numeric(c(1:3))) - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) df2 <- select(df, between(df$age, c(20, 30)), between(df$age, c(10, 20))) expect_equal(collect(df2)[[2, 1]], TRUE) expect_equal(collect(df2)[[2, 2]], FALSE) @@ -1044,11 +1049,11 @@ test_that("column functions", { expect_true(abs(collect(select(df, stddev(df$age)))[1, 1] - 7.778175) < 1e-6) expect_equal(collect(select(df, var_pop(df$age)))[1, 1], 30.25) - df5 <- createDataFrame(sqlContext, list(list(a = "010101"))) + df5 <- createDataFrame(list(list(a = "010101"))) expect_equal(collect(select(df5, conv(df5$a, 2, 16)))[1, 1], "15") # Test array_contains() and sort_array() - df <- createDataFrame(sqlContext, list(list(list(1L, 2L, 3L)), list(list(6L, 5L, 4L)))) + df <- createDataFrame(list(list(list(1L, 2L, 3L)), list(list(6L, 5L, 4L)))) result <- collect(select(df, array_contains(df[[1]], 1L)))[[1]] expect_equal(result, c(TRUE, FALSE)) @@ -1061,8 +1066,7 @@ test_that("column functions", { expect_equal(length(lag(ldeaths, 12)), 72) # Test struct() - df <- createDataFrame(sqlContext, - list(list(1L, 2L, 3L), list(4L, 5L, 6L)), + df <- createDataFrame(list(list(1L, 2L, 3L), list(4L, 5L, 6L)), schema = c("a", "b", "c")) result <- collect(select(df, struct("a", "c"))) expected <- data.frame(row.names = 1:2) @@ -1078,15 +1082,14 @@ test_that("column functions", { # Test encode(), decode() bytes <- as.raw(c(0xe5, 0xa4, 0xa7, 0xe5, 0x8d, 0x83, 0xe4, 0xb8, 0x96, 0xe7, 0x95, 0x8c)) - df <- createDataFrame(sqlContext, - list(list(markUtf8("大åƒä¸–界"), "utf-8", bytes)), + df <- createDataFrame(list(list(markUtf8("大åƒä¸–界"), "utf-8", bytes)), schema = c("a", "b", "c")) result <- collect(select(df, encode(df$a, "utf-8"), decode(df$c, "utf-8"))) expect_equal(result[[1]][[1]], bytes) expect_equal(result[[2]], markUtf8("大åƒä¸–界")) # Test first(), last() - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) expect_equal(collect(select(df, first(df$age)))[[1]], NA) expect_equal(collect(select(df, first(df$age, TRUE)))[[1]], 30) expect_equal(collect(select(df, first("age")))[[1]], NA) @@ -1097,7 +1100,7 @@ test_that("column functions", { expect_equal(collect(select(df, last("age", TRUE)))[[1]], 19) # Test bround() - df <- createDataFrame(sqlContext, data.frame(x = c(2.5, 3.5))) + df <- createDataFrame(data.frame(x = c(2.5, 3.5))) expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2) expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4) }) @@ -1109,7 +1112,7 @@ test_that("column binary mathfunctions", { "{\"a\":4, \"b\":8}") jsonPathWithDup <- tempfile(pattern = "sparkr-test", fileext = ".tmp") writeLines(lines, jsonPathWithDup) - df <- read.json(sqlContext, jsonPathWithDup) + df <- read.json(jsonPathWithDup) expect_equal(collect(select(df, atan2(df$a, df$b)))[1, "ATAN2(a, b)"], atan2(1, 5)) expect_equal(collect(select(df, atan2(df$a, df$b)))[2, "ATAN2(a, b)"], atan2(2, 6)) expect_equal(collect(select(df, atan2(df$a, df$b)))[3, "ATAN2(a, b)"], atan2(3, 7)) @@ -1130,7 +1133,7 @@ test_that("column binary mathfunctions", { }) test_that("string operators", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) expect_equal(count(where(df, like(df$name, "A%"))), 1) expect_equal(count(where(df, startsWith(df$name, "A"))), 1) expect_equal(first(select(df, substr(df$name, 1, 2)))[[1]], "Mi") @@ -1150,14 +1153,14 @@ test_that("string operators", { expect_equal(collect(select(df, regexp_replace(df$name, "(n.y)", "ydn")))[2, 1], "Aydn") l2 <- list(list(a = "aaads")) - df2 <- createDataFrame(sqlContext, l2) + df2 <- createDataFrame(l2) expect_equal(collect(select(df2, locate("aa", df2$a)))[1, 1], 1) expect_equal(collect(select(df2, locate("aa", df2$a, 2)))[1, 1], 2) expect_equal(collect(select(df2, lpad(df2$a, 8, "#")))[1, 1], "###aaads") # nolint expect_equal(collect(select(df2, rpad(df2$a, 8, "#")))[1, 1], "aaads###") # nolint l3 <- list(list(a = "a.b.c.d")) - df3 <- createDataFrame(sqlContext, l3) + df3 <- createDataFrame(l3) expect_equal(collect(select(df3, substring_index(df3$a, ".", 2)))[1, 1], "a.b") expect_equal(collect(select(df3, substring_index(df3$a, ".", -3)))[1, 1], "b.c.d") expect_equal(collect(select(df3, translate(df3$a, "bc", "12")))[1, 1], "a.1.2.d") @@ -1169,7 +1172,7 @@ test_that("date functions on a DataFrame", { l <- list(list(a = 1L, b = as.Date("2012-12-13")), list(a = 2L, b = as.Date("2013-12-14")), list(a = 3L, b = as.Date("2014-12-15"))) - df <- createDataFrame(sqlContext, l) + df <- createDataFrame(l) expect_equal(collect(select(df, dayofmonth(df$b)))[, 1], c(13, 14, 15)) expect_equal(collect(select(df, dayofyear(df$b)))[, 1], c(348, 348, 349)) expect_equal(collect(select(df, weekofyear(df$b)))[, 1], c(50, 50, 51)) @@ -1189,7 +1192,7 @@ test_that("date functions on a DataFrame", { l2 <- list(list(a = 1L, b = as.POSIXlt("2012-12-13 12:34:00", tz = "UTC")), list(a = 2L, b = as.POSIXlt("2014-12-15 01:24:34", tz = "UTC"))) - df2 <- createDataFrame(sqlContext, l2) + df2 <- createDataFrame(l2) expect_equal(collect(select(df2, minute(df2$b)))[, 1], c(34, 24)) expect_equal(collect(select(df2, second(df2$b)))[, 1], c(0, 34)) expect_equal(collect(select(df2, from_utc_timestamp(df2$b, "JST")))[, 1], @@ -1201,7 +1204,7 @@ test_that("date functions on a DataFrame", { expect_gt(collect(select(df2, unix_timestamp(lit("2015-01-01"), "yyyy-MM-dd")))[1, 1], 0) l3 <- list(list(a = 1000), list(a = -1000)) - df3 <- createDataFrame(sqlContext, l3) + df3 <- createDataFrame(l3) result31 <- collect(select(df3, from_unixtime(df3$a))) expect_equal(grep("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}", result31[, 1], perl = TRUE), c(1, 2)) @@ -1212,13 +1215,13 @@ test_that("date functions on a DataFrame", { test_that("greatest() and least() on a DataFrame", { l <- list(list(a = 1, b = 2), list(a = 3, b = 4)) - df <- createDataFrame(sqlContext, l) + df <- createDataFrame(l) expect_equal(collect(select(df, greatest(df$a, df$b)))[, 1], c(2, 4)) expect_equal(collect(select(df, least(df$a, df$b)))[, 1], c(1, 3)) }) test_that("time windowing (window()) with all inputs", { - df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) + df <- createDataFrame(data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) df$window <- window(df$t, "5 seconds", "5 seconds", "0 seconds") local <- collect(df)$v # Not checking time windows because of possible time zone issues. Just checking that the function @@ -1227,7 +1230,7 @@ test_that("time windowing (window()) with all inputs", { }) test_that("time windowing (window()) with slide duration", { - df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) + df <- createDataFrame(data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) df$window <- window(df$t, "5 seconds", "2 seconds") local <- collect(df)$v # Not checking time windows because of possible time zone issues. Just checking that the function @@ -1236,7 +1239,7 @@ test_that("time windowing (window()) with slide duration", { }) test_that("time windowing (window()) with start time", { - df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) + df <- createDataFrame(data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) df$window <- window(df$t, "5 seconds", startTime = "2 seconds") local <- collect(df)$v # Not checking time windows because of possible time zone issues. Just checking that the function @@ -1245,7 +1248,7 @@ test_that("time windowing (window()) with start time", { }) test_that("time windowing (window()) with just window duration", { - df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) + df <- createDataFrame(data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) df$window <- window(df$t, "5 seconds") local <- collect(df)$v # Not checking time windows because of possible time zone issues. Just checking that the function @@ -1255,7 +1258,7 @@ test_that("time windowing (window()) with just window duration", { test_that("when(), otherwise() and ifelse() on a DataFrame", { l <- list(list(a = 1, b = 2), list(a = 3, b = 4)) - df <- createDataFrame(sqlContext, l) + df <- createDataFrame(l) expect_equal(collect(select(df, when(df$a > 1 & df$b > 2, 1)))[, 1], c(NA, 1)) expect_equal(collect(select(df, otherwise(when(df$a > 1, 1), 0)))[, 1], c(0, 1)) expect_equal(collect(select(df, ifelse(df$a > 1 & df$b > 2, 0, 1)))[, 1], c(1, 0)) @@ -1263,14 +1266,14 @@ test_that("when(), otherwise() and ifelse() on a DataFrame", { test_that("when(), otherwise() and ifelse() with column on a DataFrame", { l <- list(list(a = 1, b = 2), list(a = 3, b = 4)) - df <- createDataFrame(sqlContext, l) + df <- createDataFrame(l) expect_equal(collect(select(df, when(df$a > 1 & df$b > 2, lit(1))))[, 1], c(NA, 1)) expect_equal(collect(select(df, otherwise(when(df$a > 1, lit(1)), lit(0))))[, 1], c(0, 1)) expect_equal(collect(select(df, ifelse(df$a > 1 & df$b > 2, lit(0), lit(1))))[, 1], c(1, 0)) }) test_that("group by, agg functions", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) df1 <- agg(df, name = "max", age = "sum") expect_equal(1, count(df1)) df1 <- agg(df, age2 = max(df$age)) @@ -1315,7 +1318,7 @@ test_that("group by, agg functions", { "{\"name\":\"ID2\", \"value\": \"-3\"}") jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp") writeLines(mockLines2, jsonPath2) - gd2 <- groupBy(read.json(sqlContext, jsonPath2), "name") + gd2 <- groupBy(read.json(jsonPath2), "name") df6 <- agg(gd2, value = "sum") df6_local <- collect(df6) expect_equal(42, df6_local[df6_local$name == "ID1", ][1, 2]) @@ -1332,7 +1335,7 @@ test_that("group by, agg functions", { "{\"name\":\"Justin\", \"age\":1}") jsonPath3 <- tempfile(pattern = "sparkr-test", fileext = ".tmp") writeLines(mockLines3, jsonPath3) - df8 <- read.json(sqlContext, jsonPath3) + df8 <- read.json(jsonPath3) gd3 <- groupBy(df8, "name") gd3_local <- collect(sum(gd3)) expect_equal(60, gd3_local[gd3_local$name == "Andy", ][1, 2]) @@ -1351,7 +1354,7 @@ test_that("group by, agg functions", { }) test_that("arrange() and orderBy() on a DataFrame", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) sorted <- arrange(df, df$age) expect_equal(collect(sorted)[1, 2], "Michael") @@ -1377,7 +1380,7 @@ test_that("arrange() and orderBy() on a DataFrame", { }) test_that("filter() on a DataFrame", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) filtered <- filter(df, "age > 20") expect_equal(count(filtered), 1) expect_equal(collect(filtered)$name, "Andy") @@ -1400,7 +1403,7 @@ test_that("filter() on a DataFrame", { }) test_that("join() and merge() on a DataFrame", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) mockLines2 <- c("{\"name\":\"Michael\", \"test\": \"yes\"}", "{\"name\":\"Andy\", \"test\": \"no\"}", @@ -1408,7 +1411,7 @@ test_that("join() and merge() on a DataFrame", { "{\"name\":\"Bob\", \"test\": \"yes\"}") jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp") writeLines(mockLines2, jsonPath2) - df2 <- read.json(sqlContext, jsonPath2) + df2 <- read.json(jsonPath2) joined <- join(df, df2) expect_equal(names(joined), c("age", "name", "name", "test")) @@ -1483,7 +1486,7 @@ test_that("join() and merge() on a DataFrame", { "{\"name\":\"Bob\", \"name_y\":\"Bob\", \"test\": \"yes\"}") jsonPath3 <- tempfile(pattern = "sparkr-test", fileext = ".tmp") writeLines(mockLines3, jsonPath3) - df3 <- read.json(sqlContext, jsonPath3) + df3 <- read.json(jsonPath3) expect_error(merge(df, df3), paste("The following column name: name_y occurs more than once in the 'DataFrame'.", "Please use different suffixes for the intersected columns.", sep = "")) @@ -1493,7 +1496,7 @@ test_that("join() and merge() on a DataFrame", { }) test_that("toJSON() returns an RDD of the correct values", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) testRDD <- toJSON(df) expect_is(testRDD, "RDD") expect_equal(getSerializedMode(testRDD), "string") @@ -1501,7 +1504,7 @@ test_that("toJSON() returns an RDD of the correct values", { }) test_that("showDF()", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) expected <- paste("+----+-------+\n", "| age| name|\n", "+----+-------+\n", @@ -1513,19 +1516,19 @@ test_that("showDF()", { }) test_that("isLocal()", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) expect_false(isLocal(df)) }) test_that("unionAll(), rbind(), except(), and intersect() on a DataFrame", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) lines <- c("{\"name\":\"Bob\", \"age\":24}", "{\"name\":\"Andy\", \"age\":30}", "{\"name\":\"James\", \"age\":35}") jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp") writeLines(lines, jsonPath2) - df2 <- read.df(sqlContext, jsonPath2, "json") + df2 <- read.df(jsonPath2, "json") unioned <- arrange(unionAll(df, df2), df$age) expect_is(unioned, "SparkDataFrame") @@ -1557,7 +1560,7 @@ test_that("unionAll(), rbind(), except(), and intersect() on a DataFrame", { }) test_that("withColumn() and withColumnRenamed()", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) newDF <- withColumn(df, "newAge", df$age + 2) expect_equal(length(columns(newDF)), 3) expect_equal(columns(newDF)[3], "newAge") @@ -1574,7 +1577,7 @@ test_that("withColumn() and withColumnRenamed()", { }) test_that("mutate(), transform(), rename() and names()", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) newDF <- mutate(df, newAge = df$age + 2) expect_equal(length(columns(newDF)), 3) expect_equal(columns(newDF)[3], "newAge") @@ -1622,10 +1625,10 @@ test_that("mutate(), transform(), rename() and names()", { }) test_that("read/write Parquet files", { - df <- read.df(sqlContext, jsonPath, "json") + df <- read.df(jsonPath, "json") # Test write.df and read.df write.df(df, parquetPath, "parquet", mode = "overwrite") - df2 <- read.df(sqlContext, parquetPath, "parquet") + df2 <- read.df(parquetPath, "parquet") expect_is(df2, "SparkDataFrame") expect_equal(count(df2), 3) @@ -1634,10 +1637,10 @@ test_that("read/write Parquet files", { write.parquet(df, parquetPath2) parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet") suppressWarnings(saveAsParquetFile(df, parquetPath3)) - parquetDF <- read.parquet(sqlContext, c(parquetPath2, parquetPath3)) + parquetDF <- read.parquet(c(parquetPath2, parquetPath3)) expect_is(parquetDF, "SparkDataFrame") expect_equal(count(parquetDF), count(df) * 2) - parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath2, parquetPath3)) + parquetDF2 <- suppressWarnings(parquetFile(parquetPath2, parquetPath3)) expect_is(parquetDF2, "SparkDataFrame") expect_equal(count(parquetDF2), count(df) * 2) @@ -1654,7 +1657,7 @@ test_that("read/write Parquet files", { test_that("read/write text files", { # Test write.df and read.df - df <- read.df(sqlContext, jsonPath, "text") + df <- read.df(jsonPath, "text") expect_is(df, "SparkDataFrame") expect_equal(colnames(df), c("value")) expect_equal(count(df), 3) @@ -1664,7 +1667,7 @@ test_that("read/write text files", { # Test write.text and read.text textPath2 <- tempfile(pattern = "textPath2", fileext = ".txt") write.text(df, textPath2) - df2 <- read.text(sqlContext, c(textPath, textPath2)) + df2 <- read.text(c(textPath, textPath2)) expect_is(df2, "SparkDataFrame") expect_equal(colnames(df2), c("value")) expect_equal(count(df2), count(df) * 2) @@ -1674,7 +1677,7 @@ test_that("read/write text files", { }) test_that("describe() and summarize() on a DataFrame", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) stats <- describe(df, "age") expect_equal(collect(stats)[1, "summary"], "count") expect_equal(collect(stats)[2, "age"], "24.5") @@ -1692,7 +1695,7 @@ test_that("describe() and summarize() on a DataFrame", { }) test_that("dropna() and na.omit() on a DataFrame", { - df <- read.json(sqlContext, jsonPathNa) + df <- read.json(jsonPathNa) rows <- collect(df) # drop with columns @@ -1778,7 +1781,7 @@ test_that("dropna() and na.omit() on a DataFrame", { }) test_that("fillna() on a DataFrame", { - df <- read.json(sqlContext, jsonPathNa) + df <- read.json(jsonPathNa) rows <- collect(df) # fill with value @@ -1829,7 +1832,7 @@ test_that("crosstab() on a DataFrame", { test_that("cov() and corr() on a DataFrame", { l <- lapply(c(0:9), function(x) { list(x, x * 2.0) }) - df <- createDataFrame(sqlContext, l, c("singles", "doubles")) + df <- createDataFrame(l, c("singles", "doubles")) result <- cov(df, "singles", "doubles") expect_true(abs(result - 55.0 / 3) < 1e-12) @@ -1847,7 +1850,7 @@ test_that("freqItems() on a DataFrame", { rdf <- data.frame(numbers = input, letters = as.character(input), negDoubles = input * -1.0, stringsAsFactors = F) rdf[ input %% 3 == 0, ] <- c(1, "1", -1) - df <- createDataFrame(sqlContext, rdf) + df <- createDataFrame(rdf) multiColResults <- freqItems(df, c("numbers", "letters"), support = 0.1) expect_true(1 %in% multiColResults$numbers[[1]]) expect_true("1" %in% multiColResults$letters[[1]]) @@ -1857,7 +1860,7 @@ test_that("freqItems() on a DataFrame", { l <- lapply(c(0:99), function(i) { if (i %% 2 == 0) { list(1L, -1.0) } else { list(i, i * -1.0) }}) - df <- createDataFrame(sqlContext, l, c("a", "b")) + df <- createDataFrame(l, c("a", "b")) result <- freqItems(df, c("a", "b"), 0.4) expect_identical(result[[1]], list(list(1L, 99L))) expect_identical(result[[2]], list(list(-1, -99))) @@ -1865,7 +1868,7 @@ test_that("freqItems() on a DataFrame", { test_that("sampleBy() on a DataFrame", { l <- lapply(c(0:99), function(i) { as.character(i %% 3) }) - df <- createDataFrame(sqlContext, l, "key") + df <- createDataFrame(l, "key") fractions <- list("0" = 0.1, "1" = 0.2) sample <- sampleBy(df, "key", fractions, 0) result <- collect(orderBy(count(groupBy(sample, "key")), "key")) @@ -1875,19 +1878,19 @@ test_that("sampleBy() on a DataFrame", { test_that("approxQuantile() on a DataFrame", { l <- lapply(c(0:99), function(i) { i }) - df <- createDataFrame(sqlContext, l, "key") + df <- createDataFrame(l, "key") quantiles <- approxQuantile(df, "key", c(0.5, 0.8), 0.0) expect_equal(quantiles[[1]], 50) expect_equal(quantiles[[2]], 80) }) test_that("SQL error message is returned from JVM", { - retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e) + retError <- tryCatch(sql("select * from blah"), error = function(e) e) expect_equal(grepl("Table or view not found", retError), TRUE) expect_equal(grepl("blah", retError), TRUE) }) -irisDF <- suppressWarnings(createDataFrame(sqlContext, iris)) +irisDF <- suppressWarnings(createDataFrame(iris)) test_that("Method as.data.frame as a synonym for collect()", { expect_equal(as.data.frame(irisDF), collect(irisDF)) @@ -1899,7 +1902,7 @@ test_that("Method as.data.frame as a synonym for collect()", { }) test_that("attach() on a DataFrame", { - df <- read.json(sqlContext, jsonPath) + df <- read.json(jsonPath) expect_error(age) attach(df) expect_is(age, "SparkDataFrame") @@ -1919,7 +1922,7 @@ test_that("attach() on a DataFrame", { }) test_that("with() on a DataFrame", { - df <- suppressWarnings(createDataFrame(sqlContext, iris)) + df <- suppressWarnings(createDataFrame(iris)) expect_error(Sepal_Length) sum1 <- with(df, list(summary(Sepal_Length), summary(Sepal_Width))) expect_equal(collect(sum1[[1]])[1, "Sepal_Length"], "150") @@ -1939,15 +1942,15 @@ test_that("Method coltypes() to get and set R's data types of a DataFrame", { structField("c4", "timestamp")) # Test primitive types - DF <- createDataFrame(sqlContext, data, schema) + DF <- createDataFrame(data, schema) expect_equal(coltypes(DF), c("integer", "logical", "POSIXct")) # Test complex types - x <- createDataFrame(sqlContext, list(list(as.environment( + x <- createDataFrame(list(list(as.environment( list("a" = "b", "c" = "d", "e" = "f"))))) expect_equal(coltypes(x), "map<string,string>") - df <- selectExpr(read.json(sqlContext, jsonPath), "name", "(age * 1.21) as age") + df <- selectExpr(read.json(jsonPath), "name", "(age * 1.21) as age") expect_equal(dtypes(df), list(c("name", "string"), c("age", "decimal(24,2)"))) df1 <- select(df, cast(df$age, "integer")) @@ -1971,7 +1974,7 @@ test_that("Method str()", { iris2 <- iris colnames(iris2) <- c("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species") iris2$col <- TRUE - irisDF2 <- createDataFrame(sqlContext, iris2) + irisDF2 <- createDataFrame(iris2) out <- capture.output(str(irisDF2)) expect_equal(length(out), 7) @@ -1989,7 +1992,7 @@ test_that("Method str()", { # number of returned rows x <- runif(200, 1, 10) df <- data.frame(t(as.matrix(data.frame(x, x, x, x, x, x, x, x, x)))) - DF <- createDataFrame(sqlContext, df) + DF <- createDataFrame(df) out <- capture.output(str(DF)) expect_equal(length(out), 103) @@ -2039,13 +2042,12 @@ test_that("Histogram", { histogram(irisDF, "Sepal_Width", 12)$counts), T) # Test when there are zero counts - df <- as.DataFrame(sqlContext, data.frame(x = c(1, 2, 3, 4, 100))) + df <- as.DataFrame(data.frame(x = c(1, 2, 3, 4, 100))) expect_equal(histogram(df, "x")$counts, c(4, 0, 0, 0, 0, 0, 0, 0, 0, 1)) }) test_that("dapply() and dapplyCollect() on a DataFrame", { - df <- createDataFrame ( - sqlContext, + df <- createDataFrame( list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")), c("a", "b", "c")) ldf <- collect(df) @@ -2102,8 +2104,7 @@ test_that("dapply() and dapplyCollect() on a DataFrame", { }) test_that("repartition by columns on DataFrame", { - df <- createDataFrame ( - sqlContext, + df <- createDataFrame( list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)), c("a", "b", "c", "d")) @@ -2138,16 +2139,8 @@ test_that("repartition by columns on DataFrame", { }) test_that("Window functions on a DataFrame", { - ssc <- callJMethod(sc, "sc") - hiveCtx <- tryCatch({ - newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc) - }, - error = function(err) { - skip("Hive is not build with SparkSQL, skipped") - }) - - df <- createDataFrame(hiveCtx, - list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")), + setHiveContext(sc) + df <- createDataFrame(list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")), schema = c("key", "value")) ws <- orderBy(window.partitionBy("key"), "value") result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws))) @@ -2171,6 +2164,31 @@ test_that("Window functions on a DataFrame", { result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws))) names(result) <- c("key", "value") expect_equal(result, expected) + unsetHiveContext() +}) + +test_that("createDataFrame sqlContext parameter backward compatibility", { + a <- 1:3 + b <- c("a", "b", "c") + ldf <- data.frame(a, b) + df <- suppressWarnings(createDataFrame(sqlContext, ldf)) + expect_equal(columns(df), c("a", "b")) + expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) + expect_equal(count(df), 3) + ldf2 <- collect(df) + expect_equal(ldf$a, ldf2$a) + + df2 <- suppressWarnings(createDataFrame(sqlContext, iris)) + expect_equal(count(df2), 150) + expect_equal(ncol(df2), 5) + + df3 <- suppressWarnings(read.df(sqlContext, jsonPath, "json")) + expect_is(df3, "SparkDataFrame") + expect_equal(count(df3), 3) + + before <- suppressWarnings(createDataFrame(sqlContext, iris)) + after <- suppressWarnings(createDataFrame(iris)) + expect_equal(collect(before), collect(after)) }) unlink(parquetPath) -- GitLab