Skip to content
Snippets Groups Projects
Commit d38c5029 authored by Cheng Lian's avatar Cheng Lian
Browse files

[SPARK-9100] [SQL] Adds DataFrame reader/writer shortcut methods for ORC

This PR adds DataFrame reader/writer shortcut methods for ORC in both Scala and Python.

Author: Cheng Lian <lian@databricks.com>

Closes #7444 from liancheng/spark-9100 and squashes the following commits:

284d043 [Cheng Lian] Fixes PySpark test cases and addresses PR comments
e0b09fb [Cheng Lian] Adds DataFrame reader/writer shortcut methods for ORC
parent 1ddd0f2f
No related branches found
No related tags found
No related merge requests found
Showing
with 79 additions and 23 deletions
......@@ -146,14 +146,28 @@ class DataFrameReader(object):
return self._df(self._jreader.table(tableName))
@since(1.4)
def parquet(self, *path):
def parquet(self, *paths):
"""Loads a Parquet file, returning the result as a :class:`DataFrame`.
>>> df = sqlContext.read.parquet('python/test_support/sql/parquet_partitioned')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
"""
return self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path)))
return self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, paths)))
@since(1.5)
def orc(self, path):
"""
Loads an ORC file, returning the result as a :class:`DataFrame`.
::Note: Currently ORC support is only available together with
:class:`HiveContext`.
>>> df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
>>> df.dtypes
[('a', 'bigint'), ('b', 'int'), ('c', 'int')]
"""
return self._df(self._jreader.orc(path))
@since(1.4)
def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None,
......@@ -378,6 +392,29 @@ class DataFrameWriter(object):
self.partitionBy(partitionBy)
self._jwrite.parquet(path)
def orc(self, path, mode=None, partitionBy=None):
"""Saves the content of the :class:`DataFrame` in ORC format at the specified path.
::Note: Currently ORC support is only available together with
:class:`HiveContext`.
:param path: the path in any Hadoop supported file system
:param mode: specifies the behavior of the save operation when data already exists.
* ``append``: Append contents of this :class:`DataFrame` to existing data.
* ``overwrite``: Overwrite existing data.
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.
:param partitionBy: names of partitioning columns
>>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
>>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
if partitionBy is not None:
self.partitionBy(partitionBy)
self._jwrite.orc(path)
@since(1.4)
def jdbc(self, url, table, mode=None, properties={}):
"""Saves the content of the :class:`DataFrame` to a external database table via JDBC.
......@@ -408,7 +445,7 @@ def _test():
import os
import tempfile
from pyspark.context import SparkContext
from pyspark.sql import Row, SQLContext
from pyspark.sql import Row, SQLContext, HiveContext
import pyspark.sql.readwriter
os.chdir(os.environ["SPARK_HOME"])
......@@ -420,6 +457,7 @@ def _test():
globs['os'] = os
globs['sc'] = sc
globs['sqlContext'] = SQLContext(sc)
globs['hiveContext'] = HiveContext(sc)
globs['df'] = globs['sqlContext'].read.parquet('python/test_support/sql/parquet_partitioned')
(failure_count, test_count) = doctest.testmod(
......
File added
......@@ -264,6 +264,15 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
}
}
/**
* Loads an ORC file and returns the result as a [[DataFrame]].
*
* @param path input path
* @since 1.5.0
* @note Currently, this method can only be used together with `HiveContext`.
*/
def orc(path: String): DataFrame = format("orc").load(path)
/**
* Returns the specified table as a [[DataFrame]].
*
......
......@@ -280,6 +280,18 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*/
def parquet(path: String): Unit = format("parquet").save(path)
/**
* Saves the content of the [[DataFrame]] in ORC format at the specified path.
* This is equivalent to:
* {{{
* format("orc").save(path)
* }}}
*
* @since 1.5.0
* @note Currently, this method can only be used together with `HiveContext`.
*/
def orc(path: String): Unit = format("orc").save(path)
///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////
......
......@@ -41,8 +41,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
.parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
.toDF("a", "b", "p1")
.write
.format("orc")
.save(partitionDir.toString)
.orc(partitionDir.toString)
}
val dataSchemaWithPartition =
......
......@@ -49,13 +49,13 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll {
def makeOrcFile[T <: Product: ClassTag: TypeTag](
data: Seq[T], path: File): Unit = {
data.toDF().write.format("orc").mode("overwrite").save(path.getCanonicalPath)
data.toDF().write.mode("overwrite").orc(path.getCanonicalPath)
}
def makeOrcFile[T <: Product: ClassTag: TypeTag](
df: DataFrame, path: File): Unit = {
df.write.format("orc").mode("overwrite").save(path.getCanonicalPath)
df.write.mode("overwrite").orc(path.getCanonicalPath)
}
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
......@@ -90,7 +90,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll {
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
}
read.format("orc").load(base.getCanonicalPath).registerTempTable("t")
read.orc(base.getCanonicalPath).registerTempTable("t")
withTempTable("t") {
checkAnswer(
......@@ -137,7 +137,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll {
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
}
read.format("orc").load(base.getCanonicalPath).registerTempTable("t")
read.orc(base.getCanonicalPath).registerTempTable("t")
withTempTable("t") {
checkAnswer(
......@@ -187,9 +187,8 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll {
}
read
.format("orc")
.option(ConfVars.DEFAULTPARTITIONNAME.varname, defaultPartitionName)
.load(base.getCanonicalPath)
.orc(base.getCanonicalPath)
.registerTempTable("t")
withTempTable("t") {
......@@ -230,9 +229,8 @@ class OrcPartitionDiscoverySuite extends QueryTest with BeforeAndAfterAll {
}
read
.format("orc")
.option(ConfVars.DEFAULTPARTITIONNAME.varname, defaultPartitionName)
.load(base.getCanonicalPath)
.orc(base.getCanonicalPath)
.registerTempTable("t")
withTempTable("t") {
......
......@@ -63,14 +63,14 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
withOrcFile(data) { file =>
checkAnswer(
sqlContext.read.format("orc").load(file),
sqlContext.read.orc(file),
data.toDF().collect())
}
}
test("Read/write binary data") {
withOrcFile(BinaryData("test".getBytes("utf8")) :: Nil) { file =>
val bytes = read.format("orc").load(file).head().getAs[Array[Byte]](0)
val bytes = read.orc(file).head().getAs[Array[Byte]](0)
assert(new String(bytes, "utf8") === "test")
}
}
......@@ -88,7 +88,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
withOrcFile(data) { file =>
checkAnswer(
read.format("orc").load(file),
read.orc(file),
data.toDF().collect())
}
}
......@@ -158,7 +158,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
withOrcFile(data) { file =>
checkAnswer(
read.format("orc").load(file),
read.orc(file),
Row(Seq.fill(5)(null): _*))
}
}
......@@ -310,7 +310,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
""".stripMargin)
val errorMessage = intercept[AnalysisException] {
sqlContext.read.format("orc").load(path)
sqlContext.read.orc(path)
}.getMessage
assert(errorMessage.contains("Failed to discover schema from ORC files"))
......@@ -323,7 +323,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
|SELECT key, value FROM single
""".stripMargin)
val df = sqlContext.read.format("orc").load(path)
val df = sqlContext.read.orc(path)
assert(df.schema === singleRowDF.schema.asNullable)
checkAnswer(df, singleRowDF)
}
......
......@@ -39,7 +39,7 @@ private[sql] trait OrcTest extends SQLTestUtils {
(data: Seq[T])
(f: String => Unit): Unit = {
withTempPath { file =>
sparkContext.parallelize(data).toDF().write.format("orc").save(file.getCanonicalPath)
sparkContext.parallelize(data).toDF().write.orc(file.getCanonicalPath)
f(file.getCanonicalPath)
}
}
......@@ -51,7 +51,7 @@ private[sql] trait OrcTest extends SQLTestUtils {
protected def withOrcDataFrame[T <: Product: ClassTag: TypeTag]
(data: Seq[T])
(f: DataFrame => Unit): Unit = {
withOrcFile(data)(path => f(sqlContext.read.format("orc").load(path)))
withOrcFile(data)(path => f(sqlContext.read.orc(path)))
}
/**
......@@ -70,11 +70,11 @@ private[sql] trait OrcTest extends SQLTestUtils {
protected def makeOrcFile[T <: Product: ClassTag: TypeTag](
data: Seq[T], path: File): Unit = {
data.toDF().write.format("orc").mode(SaveMode.Overwrite).save(path.getCanonicalPath)
data.toDF().write.mode(SaveMode.Overwrite).orc(path.getCanonicalPath)
}
protected def makeOrcFile[T <: Product: ClassTag: TypeTag](
df: DataFrame, path: File): Unit = {
df.write.format("orc").mode(SaveMode.Overwrite).save(path.getCanonicalPath)
df.write.mode(SaveMode.Overwrite).orc(path.getCanonicalPath)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment