diff --git a/bin/pyspark b/bin/pyspark index 8eeea7716cc985f3b24a5beee905a8388c754c9c..98387c2ec5b8ac88b4c818bdce539192bded894c 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -68,7 +68,7 @@ if [[ -n "$SPARK_TESTING" ]]; then unset YARN_CONF_DIR unset HADOOP_CONF_DIR export PYTHONHASHSEED=0 - exec "$PYSPARK_DRIVER_PYTHON" -m "$@" + exec "$PYSPARK_DRIVER_PYTHON" -m "$1" exit fi diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 9868c1ab7c2ab74f456e717b012e3a9a5096cb4a..9287bd47cf113d40f4bb2e7ae5eaa3547e0460d0 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -13,9 +13,6 @@ apacheds-kerberos-codec-2.0.0-M15.jar api-asn1-api-1.0.0-M20.jar api-util-1.0.0-M20.jar arpack_combined_all-0.1.jar -arrow-format-0.4.0.jar -arrow-memory-0.4.0.jar -arrow-vector-0.4.0.jar avro-1.7.7.jar avro-ipc-1.7.7.jar avro-mapred-1.7.7-hadoop2.jar @@ -58,7 +55,6 @@ datanucleus-core-3.2.10.jar datanucleus-rdbms-3.2.9.jar derby-10.12.1.1.jar eigenbase-properties-1.1.5.jar -flatbuffers-1.2.0-3f79e055.jar gson-2.2.4.jar guava-14.0.1.jar guice-3.0.jar @@ -81,7 +77,6 @@ hadoop-yarn-server-web-proxy-2.6.5.jar hk2-api-2.4.0-b34.jar hk2-locator-2.4.0-b34.jar hk2-utils-2.4.0-b34.jar -hppc-0.7.1.jar htrace-core-3.0.4.jar httpclient-4.5.2.jar httpcore-4.4.4.jar diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index 57c78cfe120873fc50d21c2c45acc0011510f866..9127413ab6c2360f9978fb09e3566d63649945c2 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -13,9 +13,6 @@ apacheds-kerberos-codec-2.0.0-M15.jar api-asn1-api-1.0.0-M20.jar api-util-1.0.0-M20.jar arpack_combined_all-0.1.jar -arrow-format-0.4.0.jar -arrow-memory-0.4.0.jar -arrow-vector-0.4.0.jar avro-1.7.7.jar avro-ipc-1.7.7.jar avro-mapred-1.7.7-hadoop2.jar @@ -58,7 +55,6 @@ datanucleus-core-3.2.10.jar datanucleus-rdbms-3.2.9.jar derby-10.12.1.1.jar eigenbase-properties-1.1.5.jar -flatbuffers-1.2.0-3f79e055.jar gson-2.2.4.jar guava-14.0.1.jar guice-3.0.jar @@ -81,7 +77,6 @@ hadoop-yarn-server-web-proxy-2.7.3.jar hk2-api-2.4.0-b34.jar hk2-locator-2.4.0-b34.jar hk2-utils-2.4.0-b34.jar -hppc-0.7.1.jar htrace-core-3.1.0-incubating.jar httpclient-4.5.2.jar httpcore-4.4.4.jar diff --git a/dev/run-pip-tests b/dev/run-pip-tests index 225e9209536f03a10310e5a0ebd9d1fab360745c..d51dde12a03c5917cdabc4f081f9607beb95b7ca 100755 --- a/dev/run-pip-tests +++ b/dev/run-pip-tests @@ -83,8 +83,6 @@ for python in "${PYTHON_EXECS[@]}"; do if [ -n "$USE_CONDA" ]; then conda create -y -p "$VIRTUALENV_PATH" python=$python numpy pandas pip setuptools source activate "$VIRTUALENV_PATH" - conda install -y -c conda-forge pyarrow=0.4.0 - TEST_PYARROW=1 else mkdir -p "$VIRTUALENV_PATH" virtualenv --python=$python "$VIRTUALENV_PATH" @@ -122,10 +120,6 @@ for python in "${PYTHON_EXECS[@]}"; do python "$FWDIR"/dev/pip-sanity-check.py echo "Run the tests for context.py" python "$FWDIR"/python/pyspark/context.py - if [ -n "$TEST_PYARROW" ]; then - echo "Run tests for pyarrow" - SPARK_TESTING=1 "$FWDIR"/bin/pyspark pyspark.sql.tests ArrowTests - fi cd "$FWDIR" diff --git a/pom.xml b/pom.xml index f124ba45007b7814907d43eb991b7fecbdccb824..5f524079495c0b6a4df089ce85ca364c9c3ce699 100644 --- a/pom.xml +++ b/pom.xml @@ -181,7 +181,6 @@ <paranamer.version>2.6</paranamer.version> <maven-antrun.version>1.8</maven-antrun.version> <commons-crypto.version>1.0.0</commons-crypto.version> - <arrow.version>0.4.0</arrow.version> <test.java.home>${java.home}</test.java.home> <test.exclude.tags></test.exclude.tags> @@ -1879,25 +1878,6 @@ <artifactId>paranamer</artifactId> <version>${paranamer.version}</version> </dependency> - <dependency> - <groupId>org.apache.arrow</groupId> - <artifactId>arrow-vector</artifactId> - <version>${arrow.version}</version> - <exclusions> - <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-annotations</artifactId> - </exclusion> - <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </exclusion> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty-handler</artifactId> - </exclusion> - </exclusions> - </dependency> </dependencies> </dependencyManagement> diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index d5c2a7518b18f6d74092c6e98114ddbb56a27d3e..ea5e00e9eeef5d940e9e4dad4965ff09bf55d0d3 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -182,23 +182,6 @@ class FramedSerializer(Serializer): raise NotImplementedError -class ArrowSerializer(FramedSerializer): - """ - Serializes an Arrow stream. - """ - - def dumps(self, obj): - raise NotImplementedError - - def loads(self, obj): - import pyarrow as pa - reader = pa.RecordBatchFileReader(pa.BufferReader(obj)) - return reader.read_all() - - def __repr__(self): - return "ArrowSerializer" - - class BatchedSerializer(Serializer): """ diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 760f113dfd197cfe909a006b753399957f2ce1f1..0649271ed22466686d6054d68999f07f8349c160 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -29,8 +29,7 @@ import warnings from pyspark import copy_func, since from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix -from pyspark.serializers import ArrowSerializer, BatchedSerializer, PickleSerializer, \ - UTF8Deserializer +from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deserializer from pyspark.storagelevel import StorageLevel from pyspark.traceback_utils import SCCallSiteSync from pyspark.sql.types import _parse_datatype_json_string @@ -1709,8 +1708,7 @@ class DataFrame(object): @since(1.3) def toPandas(self): - """ - Returns the contents of this :class:`DataFrame` as Pandas ``pandas.DataFrame``. + """Returns the contents of this :class:`DataFrame` as Pandas ``pandas.DataFrame``. This is only available if Pandas is installed and available. @@ -1723,42 +1721,18 @@ class DataFrame(object): 1 5 Bob """ import pandas as pd - if self.sql_ctx.getConf("spark.sql.execution.arrow.enable", "false").lower() == "true": - try: - import pyarrow - tables = self._collectAsArrow() - if tables: - table = pyarrow.concat_tables(tables) - return table.to_pandas() - else: - return pd.DataFrame.from_records([], columns=self.columns) - except ImportError as e: - msg = "note: pyarrow must be installed and available on calling Python process " \ - "if using spark.sql.execution.arrow.enable=true" - raise ImportError("%s\n%s" % (e.message, msg)) - else: - dtype = {} - for field in self.schema: - pandas_type = _to_corrected_pandas_type(field.dataType) - if pandas_type is not None: - dtype[field.name] = pandas_type - pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) + dtype = {} + for field in self.schema: + pandas_type = _to_corrected_pandas_type(field.dataType) + if pandas_type is not None: + dtype[field.name] = pandas_type - for f, t in dtype.items(): - pdf[f] = pdf[f].astype(t, copy=False) - return pdf + pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) - def _collectAsArrow(self): - """ - Returns all records as list of deserialized ArrowPayloads, pyarrow must be installed - and available. - - .. note:: Experimental. - """ - with SCCallSiteSync(self._sc) as css: - port = self._jdf.collectAsArrowToPython() - return list(_load_from_socket(port, ArrowSerializer())) + for f, t in dtype.items(): + pdf[f] = pdf[f].astype(t, copy=False) + return pdf ########################################################################################## # Pandas compatibility diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 326e8548a617c30aba6aa12212f4b6ea34c78588..0a1cd6856b8e8cfd74c3d8eaa280ed6e219d890a 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -58,21 +58,12 @@ from pyspark import SparkContext from pyspark.sql import SparkSession, SQLContext, HiveContext, Column, Row from pyspark.sql.types import * from pyspark.sql.types import UserDefinedType, _infer_type -from pyspark.tests import QuietTest, ReusedPySparkTestCase, SparkSubmitTests +from pyspark.tests import ReusedPySparkTestCase, SparkSubmitTests from pyspark.sql.functions import UserDefinedFunction, sha2, lit from pyspark.sql.window import Window from pyspark.sql.utils import AnalysisException, ParseException, IllegalArgumentException -_have_arrow = False -try: - import pyarrow - _have_arrow = True -except: - # No Arrow, but that's okay, we'll skip those tests - pass - - class UTCOffsetTimezone(datetime.tzinfo): """ Specifies timezone in UTC offset @@ -2629,74 +2620,6 @@ class HiveContextSQLTests(ReusedPySparkTestCase): importlib.reload(window) - -@unittest.skipIf(not _have_arrow, "Arrow not installed") -class ArrowTests(ReusedPySparkTestCase): - - @classmethod - def setUpClass(cls): - ReusedPySparkTestCase.setUpClass() - cls.spark = SparkSession(cls.sc) - cls.spark.conf.set("spark.sql.execution.arrow.enable", "true") - cls.schema = StructType([ - StructField("1_str_t", StringType(), True), - StructField("2_int_t", IntegerType(), True), - StructField("3_long_t", LongType(), True), - StructField("4_float_t", FloatType(), True), - StructField("5_double_t", DoubleType(), True)]) - cls.data = [("a", 1, 10, 0.2, 2.0), - ("b", 2, 20, 0.4, 4.0), - ("c", 3, 30, 0.8, 6.0)] - - def assertFramesEqual(self, df_with_arrow, df_without): - msg = ("DataFrame from Arrow is not equal" + - ("\n\nWith Arrow:\n%s\n%s" % (df_with_arrow, df_with_arrow.dtypes)) + - ("\n\nWithout:\n%s\n%s" % (df_without, df_without.dtypes))) - self.assertTrue(df_without.equals(df_with_arrow), msg=msg) - - def test_unsupported_datatype(self): - schema = StructType([StructField("array", ArrayType(IntegerType(), False), True)]) - df = self.spark.createDataFrame([([1, 2, 3],)], schema=schema) - with QuietTest(self.sc): - self.assertRaises(Exception, lambda: df.toPandas()) - - def test_null_conversion(self): - df_null = self.spark.createDataFrame([tuple([None for _ in range(len(self.data[0]))])] + - self.data) - pdf = df_null.toPandas() - null_counts = pdf.isnull().sum().tolist() - self.assertTrue(all([c == 1 for c in null_counts])) - - def test_toPandas_arrow_toggle(self): - df = self.spark.createDataFrame(self.data, schema=self.schema) - self.spark.conf.set("spark.sql.execution.arrow.enable", "false") - pdf = df.toPandas() - self.spark.conf.set("spark.sql.execution.arrow.enable", "true") - pdf_arrow = df.toPandas() - self.assertFramesEqual(pdf_arrow, pdf) - - def test_pandas_round_trip(self): - import pandas as pd - import numpy as np - data_dict = {} - for j, name in enumerate(self.schema.names): - data_dict[name] = [self.data[i][j] for i in range(len(self.data))] - # need to convert these to numpy types first - data_dict["2_int_t"] = np.int32(data_dict["2_int_t"]) - data_dict["4_float_t"] = np.float32(data_dict["4_float_t"]) - pdf = pd.DataFrame(data=data_dict) - df = self.spark.createDataFrame(self.data, schema=self.schema) - pdf_arrow = df.toPandas() - self.assertFramesEqual(pdf_arrow, pdf) - - def test_filtered_frame(self): - df = self.spark.range(3).toDF("i") - pdf = df.filter("i < 0").toPandas() - self.assertEqual(len(pdf.columns), 1) - self.assertEqual(pdf.columns[0], "i") - self.assertTrue(pdf.empty) - - if __name__ == "__main__": from pyspark.sql.tests import * if xmlrunner: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9c8e26a8eeadfebd20619985e8ab1a91ffd17b99..c641e4d3a23e182b240fde70d6b5ac3e238051f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -847,24 +847,6 @@ object SQLConf { .intConf .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt) - val ARROW_EXECUTION_ENABLE = - buildConf("spark.sql.execution.arrow.enable") - .internal() - .doc("Make use of Apache Arrow for columnar data transfers. Currently available " + - "for use with pyspark.sql.DataFrame.toPandas with the following data types: " + - "StringType, BinaryType, BooleanType, DoubleType, FloatType, ByteType, IntegerType, " + - "LongType, ShortType") - .booleanConf - .createWithDefault(false) - - val ARROW_EXECUTION_MAX_RECORDS_PER_BATCH = - buildConf("spark.sql.execution.arrow.maxRecordsPerBatch") - .internal() - .doc("When using Apache Arrow, limit the maximum number of records that can be written " + - "to a single ArrowRecordBatch in memory. If set to zero or negative there is no limit.") - .intConf - .createWithDefault(10000) - object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } @@ -1123,10 +1105,6 @@ class SQLConf extends Serializable with Logging { def starSchemaFTRatio: Double = getConf(STARSCHEMA_FACT_TABLE_RATIO) - def arrowEnable: Boolean = getConf(ARROW_EXECUTION_ENABLE) - - def arrowMaxRecordsPerBatch: Int = getConf(ARROW_EXECUTION_MAX_RECORDS_PER_BATCH) - /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 661c31ded7148eedf8f2d0833433a256d415b525..1bc34a6b069d9cc4c263abd9afda8a5dd4627cae 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -103,10 +103,6 @@ <artifactId>jackson-databind</artifactId> <version>${fasterxml.jackson.version}</version> </dependency> - <dependency> - <groupId>org.apache.arrow</groupId> - <artifactId>arrow-vector</artifactId> - </dependency> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-asm5-shaded</artifactId> diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 268a37ff5d2714bb807d4d67b1f1c3395c20dd80..7be4aa1ca9562fa8243890396b26bd82287331ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -47,7 +47,6 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection} import org.apache.spark.sql.catalyst.util.{usePrettyExpression, DateTimeUtils} import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.arrow.{ArrowConverters, ArrowPayload} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.python.EvaluatePython @@ -2887,16 +2886,6 @@ class Dataset[T] private[sql]( } } - /** - * Collect a Dataset as ArrowPayload byte arrays and serve to PySpark. - */ - private[sql] def collectAsArrowToPython(): Int = { - withNewExecutionId { - val iter = toArrowPayload.collect().iterator.map(_.asPythonSerializable) - PythonRDD.serveIterator(iter, "serve-Arrow") - } - } - private[sql] def toPythonIterator(): Int = { withNewExecutionId { PythonRDD.toLocalIteratorAndServe(javaToPython.rdd) @@ -2978,13 +2967,4 @@ class Dataset[T] private[sql]( Dataset(sparkSession, logicalPlan) } } - - /** Convert to an RDD of ArrowPayload byte arrays */ - private[sql] def toArrowPayload: RDD[ArrowPayload] = { - val schemaCaptured = this.schema - val maxRecordsPerBatch = sparkSession.sessionState.conf.arrowMaxRecordsPerBatch - queryExecution.toRdd.mapPartitionsInternal { iter => - ArrowConverters.toPayloadIterator(iter, schemaCaptured, maxRecordsPerBatch) - } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala deleted file mode 100644 index 6af5c73422377845a3fbbb953a05cb40e0a6805e..0000000000000000000000000000000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ /dev/null @@ -1,429 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql.execution.arrow - -import java.io.ByteArrayOutputStream -import java.nio.channels.Channels - -import scala.collection.JavaConverters._ - -import io.netty.buffer.ArrowBuf -import org.apache.arrow.memory.{BufferAllocator, RootAllocator} -import org.apache.arrow.vector._ -import org.apache.arrow.vector.BaseValueVector.BaseMutator -import org.apache.arrow.vector.file._ -import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch} -import org.apache.arrow.vector.types.FloatingPointPrecision -import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} -import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils - - -/** - * Store Arrow data in a form that can be serialized by Spark and served to a Python process. - */ -private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends Serializable { - - /** - * Convert the ArrowPayload to an ArrowRecordBatch. - */ - def loadBatch(allocator: BufferAllocator): ArrowRecordBatch = { - ArrowConverters.byteArrayToBatch(payload, allocator) - } - - /** - * Get the ArrowPayload as a type that can be served to Python. - */ - def asPythonSerializable: Array[Byte] = payload -} - -private[sql] object ArrowPayload { - - /** - * Create an ArrowPayload from an ArrowRecordBatch and Spark schema. - */ - def apply( - batch: ArrowRecordBatch, - schema: StructType, - allocator: BufferAllocator): ArrowPayload = { - new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, allocator)) - } -} - -private[sql] object ArrowConverters { - - /** - * Map a Spark DataType to ArrowType. - */ - private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = { - dataType match { - case BooleanType => ArrowType.Bool.INSTANCE - case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true) - case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true) - case LongType => new ArrowType.Int(8 * LongType.defaultSize, true) - case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) - case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) - case ByteType => new ArrowType.Int(8, true) - case StringType => ArrowType.Utf8.INSTANCE - case BinaryType => ArrowType.Binary.INSTANCE - case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType") - } - } - - /** - * Convert a Spark Dataset schema to Arrow schema. - */ - private[arrow] def schemaToArrowSchema(schema: StructType): Schema = { - val arrowFields = schema.fields.map { f => - new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava) - } - new Schema(arrowFields.toList.asJava) - } - - /** - * Maps Iterator from InternalRow to ArrowPayload. Limit ArrowRecordBatch size in ArrowPayload - * by setting maxRecordsPerBatch or use 0 to fully consume rowIter. - */ - private[sql] def toPayloadIterator( - rowIter: Iterator[InternalRow], - schema: StructType, - maxRecordsPerBatch: Int): Iterator[ArrowPayload] = { - new Iterator[ArrowPayload] { - private val _allocator = new RootAllocator(Long.MaxValue) - private var _nextPayload = if (rowIter.nonEmpty) convert() else null - - override def hasNext: Boolean = _nextPayload != null - - override def next(): ArrowPayload = { - val obj = _nextPayload - if (hasNext) { - if (rowIter.hasNext) { - _nextPayload = convert() - } else { - _allocator.close() - _nextPayload = null - } - } - obj - } - - private def convert(): ArrowPayload = { - val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch) - ArrowPayload(batch, schema, _allocator) - } - } - } - - /** - * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed - * or the number of records in the batch equals maxRecordsInBatch. If maxRecordsPerBatch is 0, - * then rowIter will be fully consumed. - */ - private def internalRowIterToArrowBatch( - rowIter: Iterator[InternalRow], - schema: StructType, - allocator: BufferAllocator, - maxRecordsPerBatch: Int = 0): ArrowRecordBatch = { - - val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) => - ColumnWriter(field.dataType, ordinal, allocator).init() - } - - val writerLength = columnWriters.length - var recordsInBatch = 0 - while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) { - val row = rowIter.next() - var i = 0 - while (i < writerLength) { - columnWriters(i).write(row) - i += 1 - } - recordsInBatch += 1 - } - - val (fieldNodes, bufferArrays) = columnWriters.map(_.finish()).unzip - val buffers = bufferArrays.flatten - - val rowLength = if (fieldNodes.nonEmpty) fieldNodes.head.getLength else 0 - val recordBatch = new ArrowRecordBatch(rowLength, - fieldNodes.toList.asJava, buffers.toList.asJava) - - buffers.foreach(_.release()) - recordBatch - } - - /** - * Convert an ArrowRecordBatch to a byte array and close batch to release resources. Once closed, - * the batch can no longer be used. - */ - private[arrow] def batchToByteArray( - batch: ArrowRecordBatch, - schema: StructType, - allocator: BufferAllocator): Array[Byte] = { - val arrowSchema = ArrowConverters.schemaToArrowSchema(schema) - val root = VectorSchemaRoot.create(arrowSchema, allocator) - val out = new ByteArrayOutputStream() - val writer = new ArrowFileWriter(root, null, Channels.newChannel(out)) - - // Write a batch to byte stream, ensure the batch, allocator and writer are closed - Utils.tryWithSafeFinally { - val loader = new VectorLoader(root) - loader.load(batch) - writer.writeBatch() // writeBatch can throw IOException - } { - batch.close() - root.close() - writer.close() - } - out.toByteArray - } - - /** - * Convert a byte array to an ArrowRecordBatch. - */ - private[arrow] def byteArrayToBatch( - batchBytes: Array[Byte], - allocator: BufferAllocator): ArrowRecordBatch = { - val in = new ByteArrayReadableSeekableByteChannel(batchBytes) - val reader = new ArrowFileReader(in, allocator) - - // Read a batch from a byte stream, ensure the reader is closed - Utils.tryWithSafeFinally { - val root = reader.getVectorSchemaRoot // throws IOException - val unloader = new VectorUnloader(root) - reader.loadNextBatch() // throws IOException - unloader.getRecordBatch - } { - reader.close() - } - } -} - -/** - * Interface for writing InternalRows to Arrow Buffers. - */ -private[arrow] trait ColumnWriter { - def init(): this.type - def write(row: InternalRow): Unit - - /** - * Clear the column writer and return the ArrowFieldNode and ArrowBuf. - * This should be called only once after all the data is written. - */ - def finish(): (ArrowFieldNode, Array[ArrowBuf]) -} - -/** - * Base class for flat arrow column writer, i.e., column without children. - */ -private[arrow] abstract class PrimitiveColumnWriter(val ordinal: Int) - extends ColumnWriter { - - def getFieldType(dtype: ArrowType): FieldType = FieldType.nullable(dtype) - - def valueVector: BaseDataValueVector - def valueMutator: BaseMutator - - def setNull(): Unit - def setValue(row: InternalRow): Unit - - protected var count = 0 - protected var nullCount = 0 - - override def init(): this.type = { - valueVector.allocateNew() - this - } - - override def write(row: InternalRow): Unit = { - if (row.isNullAt(ordinal)) { - setNull() - nullCount += 1 - } else { - setValue(row) - } - count += 1 - } - - override def finish(): (ArrowFieldNode, Array[ArrowBuf]) = { - valueMutator.setValueCount(count) - val fieldNode = new ArrowFieldNode(count, nullCount) - val valueBuffers = valueVector.getBuffers(true) - (fieldNode, valueBuffers) - } -} - -private[arrow] class BooleanColumnWriter(dtype: ArrowType, ordinal: Int, allocator: BufferAllocator) - extends PrimitiveColumnWriter(ordinal) { - override val valueVector: NullableBitVector - = new NullableBitVector("BooleanValue", getFieldType(dtype), allocator) - override val valueMutator: NullableBitVector#Mutator = valueVector.getMutator - - override def setNull(): Unit = valueMutator.setNull(count) - override def setValue(row: InternalRow): Unit - = valueMutator.setSafe(count, if (row.getBoolean(ordinal)) 1 else 0 ) -} - -private[arrow] class ShortColumnWriter(dtype: ArrowType, ordinal: Int, allocator: BufferAllocator) - extends PrimitiveColumnWriter(ordinal) { - override val valueVector: NullableSmallIntVector - = new NullableSmallIntVector("ShortValue", getFieldType(dtype: ArrowType), allocator) - override val valueMutator: NullableSmallIntVector#Mutator = valueVector.getMutator - - override def setNull(): Unit = valueMutator.setNull(count) - override def setValue(row: InternalRow): Unit - = valueMutator.setSafe(count, row.getShort(ordinal)) -} - -private[arrow] class IntegerColumnWriter(dtype: ArrowType, ordinal: Int, allocator: BufferAllocator) - extends PrimitiveColumnWriter(ordinal) { - override val valueVector: NullableIntVector - = new NullableIntVector("IntValue", getFieldType(dtype), allocator) - override val valueMutator: NullableIntVector#Mutator = valueVector.getMutator - - override def setNull(): Unit = valueMutator.setNull(count) - override def setValue(row: InternalRow): Unit - = valueMutator.setSafe(count, row.getInt(ordinal)) -} - -private[arrow] class LongColumnWriter(dtype: ArrowType, ordinal: Int, allocator: BufferAllocator) - extends PrimitiveColumnWriter(ordinal) { - override val valueVector: NullableBigIntVector - = new NullableBigIntVector("LongValue", getFieldType(dtype), allocator) - override val valueMutator: NullableBigIntVector#Mutator = valueVector.getMutator - - override def setNull(): Unit = valueMutator.setNull(count) - override def setValue(row: InternalRow): Unit - = valueMutator.setSafe(count, row.getLong(ordinal)) -} - -private[arrow] class FloatColumnWriter(dtype: ArrowType, ordinal: Int, allocator: BufferAllocator) - extends PrimitiveColumnWriter(ordinal) { - override val valueVector: NullableFloat4Vector - = new NullableFloat4Vector("FloatValue", getFieldType(dtype), allocator) - override val valueMutator: NullableFloat4Vector#Mutator = valueVector.getMutator - - override def setNull(): Unit = valueMutator.setNull(count) - override def setValue(row: InternalRow): Unit - = valueMutator.setSafe(count, row.getFloat(ordinal)) -} - -private[arrow] class DoubleColumnWriter(dtype: ArrowType, ordinal: Int, allocator: BufferAllocator) - extends PrimitiveColumnWriter(ordinal) { - override val valueVector: NullableFloat8Vector - = new NullableFloat8Vector("DoubleValue", getFieldType(dtype), allocator) - override val valueMutator: NullableFloat8Vector#Mutator = valueVector.getMutator - - override def setNull(): Unit = valueMutator.setNull(count) - override def setValue(row: InternalRow): Unit - = valueMutator.setSafe(count, row.getDouble(ordinal)) -} - -private[arrow] class ByteColumnWriter(dtype: ArrowType, ordinal: Int, allocator: BufferAllocator) - extends PrimitiveColumnWriter(ordinal) { - override val valueVector: NullableUInt1Vector - = new NullableUInt1Vector("ByteValue", getFieldType(dtype), allocator) - override val valueMutator: NullableUInt1Vector#Mutator = valueVector.getMutator - - override def setNull(): Unit = valueMutator.setNull(count) - override def setValue(row: InternalRow): Unit - = valueMutator.setSafe(count, row.getByte(ordinal)) -} - -private[arrow] class UTF8StringColumnWriter( - dtype: ArrowType, - ordinal: Int, - allocator: BufferAllocator) - extends PrimitiveColumnWriter(ordinal) { - override val valueVector: NullableVarCharVector - = new NullableVarCharVector("UTF8StringValue", getFieldType(dtype), allocator) - override val valueMutator: NullableVarCharVector#Mutator = valueVector.getMutator - - override def setNull(): Unit = valueMutator.setNull(count) - override def setValue(row: InternalRow): Unit = { - val str = row.getUTF8String(ordinal) - valueMutator.setSafe(count, str.getByteBuffer, 0, str.numBytes) - } -} - -private[arrow] class BinaryColumnWriter(dtype: ArrowType, ordinal: Int, allocator: BufferAllocator) - extends PrimitiveColumnWriter(ordinal) { - override val valueVector: NullableVarBinaryVector - = new NullableVarBinaryVector("BinaryValue", getFieldType(dtype), allocator) - override val valueMutator: NullableVarBinaryVector#Mutator = valueVector.getMutator - - override def setNull(): Unit = valueMutator.setNull(count) - override def setValue(row: InternalRow): Unit = { - val bytes = row.getBinary(ordinal) - valueMutator.setSafe(count, bytes, 0, bytes.length) - } -} - -private[arrow] class DateColumnWriter(dtype: ArrowType, ordinal: Int, allocator: BufferAllocator) - extends PrimitiveColumnWriter(ordinal) { - override val valueVector: NullableDateDayVector - = new NullableDateDayVector("DateValue", getFieldType(dtype), allocator) - override val valueMutator: NullableDateDayVector#Mutator = valueVector.getMutator - - override def setNull(): Unit = valueMutator.setNull(count) - override def setValue(row: InternalRow): Unit = { - valueMutator.setSafe(count, row.getInt(ordinal)) - } -} - -private[arrow] class TimeStampColumnWriter( - dtype: ArrowType, - ordinal: Int, - allocator: BufferAllocator) - extends PrimitiveColumnWriter(ordinal) { - override val valueVector: NullableTimeStampMicroVector - = new NullableTimeStampMicroVector("TimeStampValue", getFieldType(dtype), allocator) - override val valueMutator: NullableTimeStampMicroVector#Mutator = valueVector.getMutator - - override def setNull(): Unit = valueMutator.setNull(count) - override def setValue(row: InternalRow): Unit = { - valueMutator.setSafe(count, row.getLong(ordinal)) - } -} - -private[arrow] object ColumnWriter { - - /** - * Create an Arrow ColumnWriter given the type and ordinal of row. - */ - def apply(dataType: DataType, ordinal: Int, allocator: BufferAllocator): ColumnWriter = { - val dtype = ArrowConverters.sparkTypeToArrowType(dataType) - dataType match { - case BooleanType => new BooleanColumnWriter(dtype, ordinal, allocator) - case ShortType => new ShortColumnWriter(dtype, ordinal, allocator) - case IntegerType => new IntegerColumnWriter(dtype, ordinal, allocator) - case LongType => new LongColumnWriter(dtype, ordinal, allocator) - case FloatType => new FloatColumnWriter(dtype, ordinal, allocator) - case DoubleType => new DoubleColumnWriter(dtype, ordinal, allocator) - case ByteType => new ByteColumnWriter(dtype, ordinal, allocator) - case StringType => new UTF8StringColumnWriter(dtype, ordinal, allocator) - case BinaryType => new BinaryColumnWriter(dtype, ordinal, allocator) - case DateType => new DateColumnWriter(dtype, ordinal, allocator) - case TimestampType => new TimeStampColumnWriter(dtype, ordinal, allocator) - case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType") - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala deleted file mode 100644 index 159328cc0d95812e0f2cbda0a7e2ca06e6952ca0..0000000000000000000000000000000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala +++ /dev/null @@ -1,1222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.arrow - -import java.io.File -import java.nio.charset.StandardCharsets -import java.sql.{Date, Timestamp} -import java.text.SimpleDateFormat -import java.util.Locale - -import com.google.common.io.Files -import org.apache.arrow.memory.RootAllocator -import org.apache.arrow.vector.{VectorLoader, VectorSchemaRoot} -import org.apache.arrow.vector.file.json.JsonFileReader -import org.apache.arrow.vector.util.Validator -import org.scalatest.BeforeAndAfterAll - -import org.apache.spark.SparkException -import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{BinaryType, StructField, StructType} -import org.apache.spark.util.Utils - - -class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { - import testImplicits._ - - private var tempDataPath: String = _ - - override def beforeAll(): Unit = { - super.beforeAll() - tempDataPath = Utils.createTempDir(namePrefix = "arrow").getAbsolutePath - } - - test("collect to arrow record batch") { - val indexData = (1 to 6).toDF("i") - val arrowPayloads = indexData.toArrowPayload.collect() - assert(arrowPayloads.nonEmpty) - assert(arrowPayloads.length == indexData.rdd.getNumPartitions) - val allocator = new RootAllocator(Long.MaxValue) - val arrowRecordBatches = arrowPayloads.map(_.loadBatch(allocator)) - val rowCount = arrowRecordBatches.map(_.getLength).sum - assert(rowCount === indexData.count()) - arrowRecordBatches.foreach(batch => assert(batch.getNodes.size() > 0)) - arrowRecordBatches.foreach(_.close()) - allocator.close() - } - - test("short conversion") { - val json = - s""" - |{ - | "schema" : { - | "fields" : [ { - | "name" : "a_s", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 16 - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 16 - | } ] - | } - | }, { - | "name" : "b_s", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 16 - | }, - | "nullable" : true, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 16 - | } ] - | } - | } ] - | }, - | "batches" : [ { - | "count" : 6, - | "columns" : [ { - | "name" : "a_s", - | "count" : 6, - | "VALIDITY" : [ 1, 1, 1, 1, 1, 1 ], - | "DATA" : [ 1, -1, 2, -2, 32767, -32768 ] - | }, { - | "name" : "b_s", - | "count" : 6, - | "VALIDITY" : [ 1, 0, 0, 1, 0, 1 ], - | "DATA" : [ 1, 0, 0, -2, 0, -32768 ] - | } ] - | } ] - |} - """.stripMargin - - val a_s = List[Short](1, -1, 2, -2, 32767, -32768) - val b_s = List[Option[Short]](Some(1), None, None, Some(-2), None, Some(-32768)) - val df = a_s.zip(b_s).toDF("a_s", "b_s") - - collectAndValidate(df, json, "integer-16bit.json") - } - - test("int conversion") { - val json = - s""" - |{ - | "schema" : { - | "fields" : [ { - | "name" : "a_i", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 32 - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 32 - | } ] - | } - | }, { - | "name" : "b_i", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 32 - | }, - | "nullable" : true, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 32 - | } ] - | } - | } ] - | }, - | "batches" : [ { - | "count" : 6, - | "columns" : [ { - | "name" : "a_i", - | "count" : 6, - | "VALIDITY" : [ 1, 1, 1, 1, 1, 1 ], - | "DATA" : [ 1, -1, 2, -2, 2147483647, -2147483648 ] - | }, { - | "name" : "b_i", - | "count" : 6, - | "VALIDITY" : [ 1, 0, 0, 1, 0, 1 ], - | "DATA" : [ 1, 0, 0, -2, 0, -2147483648 ] - | } ] - | } ] - |} - """.stripMargin - - val a_i = List[Int](1, -1, 2, -2, 2147483647, -2147483648) - val b_i = List[Option[Int]](Some(1), None, None, Some(-2), None, Some(-2147483648)) - val df = a_i.zip(b_i).toDF("a_i", "b_i") - - collectAndValidate(df, json, "integer-32bit.json") - } - - test("long conversion") { - val json = - s""" - |{ - | "schema" : { - | "fields" : [ { - | "name" : "a_l", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 64 - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 64 - | } ] - | } - | }, { - | "name" : "b_l", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 64 - | }, - | "nullable" : true, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 64 - | } ] - | } - | } ] - | }, - | "batches" : [ { - | "count" : 6, - | "columns" : [ { - | "name" : "a_l", - | "count" : 6, - | "VALIDITY" : [ 1, 1, 1, 1, 1, 1 ], - | "DATA" : [ 1, -1, 2, -2, 9223372036854775807, -9223372036854775808 ] - | }, { - | "name" : "b_l", - | "count" : 6, - | "VALIDITY" : [ 1, 0, 0, 1, 0, 1 ], - | "DATA" : [ 1, 0, 0, -2, 0, -9223372036854775808 ] - | } ] - | } ] - |} - """.stripMargin - - val a_l = List[Long](1, -1, 2, -2, 9223372036854775807L, -9223372036854775808L) - val b_l = List[Option[Long]](Some(1), None, None, Some(-2), None, Some(-9223372036854775808L)) - val df = a_l.zip(b_l).toDF("a_l", "b_l") - - collectAndValidate(df, json, "integer-64bit.json") - } - - test("float conversion") { - val json = - s""" - |{ - | "schema" : { - | "fields" : [ { - | "name" : "a_f", - | "type" : { - | "name" : "floatingpoint", - | "precision" : "SINGLE" - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 32 - | } ] - | } - | }, { - | "name" : "b_f", - | "type" : { - | "name" : "floatingpoint", - | "precision" : "SINGLE" - | }, - | "nullable" : true, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 32 - | } ] - | } - | } ] - | }, - | "batches" : [ { - | "count" : 6, - | "columns" : [ { - | "name" : "a_f", - | "count" : 6, - | "VALIDITY" : [ 1, 1, 1, 1, 1, 1 ], - | "DATA" : [ 1.0, 2.0, 0.01, 200.0, 0.0001, 20000.0 ] - | }, { - | "name" : "b_f", - | "count" : 6, - | "VALIDITY" : [ 1, 0, 0, 1, 0, 1 ], - | "DATA" : [ 1.1, 0.0, 0.0, 2.2, 0.0, 3.3 ] - | } ] - | } ] - |} - """.stripMargin - - val a_f = List(1.0f, 2.0f, 0.01f, 200.0f, 0.0001f, 20000.0f) - val b_f = List[Option[Float]](Some(1.1f), None, None, Some(2.2f), None, Some(3.3f)) - val df = a_f.zip(b_f).toDF("a_f", "b_f") - - collectAndValidate(df, json, "floating_point-single_precision.json") - } - - test("double conversion") { - val json = - s""" - |{ - | "schema" : { - | "fields" : [ { - | "name" : "a_d", - | "type" : { - | "name" : "floatingpoint", - | "precision" : "DOUBLE" - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 64 - | } ] - | } - | }, { - | "name" : "b_d", - | "type" : { - | "name" : "floatingpoint", - | "precision" : "DOUBLE" - | }, - | "nullable" : true, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 64 - | } ] - | } - | } ] - | }, - | "batches" : [ { - | "count" : 6, - | "columns" : [ { - | "name" : "a_d", - | "count" : 6, - | "VALIDITY" : [ 1, 1, 1, 1, 1, 1 ], - | "DATA" : [ 1.0, 2.0, 0.01, 200.0, 1.0E-4, 20000.0 ] - | }, { - | "name" : "b_d", - | "count" : 6, - | "VALIDITY" : [ 1, 0, 0, 1, 0, 1 ], - | "DATA" : [ 1.1, 0.0, 0.0, 2.2, 0.0, 3.3 ] - | } ] - | } ] - |} - """.stripMargin - - val a_d = List(1.0, 2.0, 0.01, 200.0, 0.0001, 20000.0) - val b_d = List[Option[Double]](Some(1.1), None, None, Some(2.2), None, Some(3.3)) - val df = a_d.zip(b_d).toDF("a_d", "b_d") - - collectAndValidate(df, json, "floating_point-double_precision.json") - } - - test("index conversion") { - val data = List[Int](1, 2, 3, 4, 5, 6) - val json = - s""" - |{ - | "schema" : { - | "fields" : [ { - | "name" : "i", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 32 - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 32 - | } ] - | } - | } ] - | }, - | "batches" : [ { - | "count" : 6, - | "columns" : [ { - | "name" : "i", - | "count" : 6, - | "VALIDITY" : [ 1, 1, 1, 1, 1, 1 ], - | "DATA" : [ 1, 2, 3, 4, 5, 6 ] - | } ] - | } ] - |} - """.stripMargin - val df = data.toDF("i") - - collectAndValidate(df, json, "indexData-ints.json") - } - - test("mixed numeric type conversion") { - val json = - s""" - |{ - | "schema" : { - | "fields" : [ { - | "name" : "a", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 16 - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 16 - | } ] - | } - | }, { - | "name" : "b", - | "type" : { - | "name" : "floatingpoint", - | "precision" : "SINGLE" - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 32 - | } ] - | } - | }, { - | "name" : "c", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 32 - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 32 - | } ] - | } - | }, { - | "name" : "d", - | "type" : { - | "name" : "floatingpoint", - | "precision" : "DOUBLE" - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 64 - | } ] - | } - | }, { - | "name" : "e", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 64 - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 64 - | } ] - | } - | } ] - | }, - | "batches" : [ { - | "count" : 6, - | "columns" : [ { - | "name" : "a", - | "count" : 6, - | "VALIDITY" : [ 1, 1, 1, 1, 1, 1 ], - | "DATA" : [ 1, 2, 3, 4, 5, 6 ] - | }, { - | "name" : "b", - | "count" : 6, - | "VALIDITY" : [ 1, 1, 1, 1, 1, 1 ], - | "DATA" : [ 1.0, 2.0, 3.0, 4.0, 5.0, 6.0 ] - | }, { - | "name" : "c", - | "count" : 6, - | "VALIDITY" : [ 1, 1, 1, 1, 1, 1 ], - | "DATA" : [ 1, 2, 3, 4, 5, 6 ] - | }, { - | "name" : "d", - | "count" : 6, - | "VALIDITY" : [ 1, 1, 1, 1, 1, 1 ], - | "DATA" : [ 1.0, 2.0, 3.0, 4.0, 5.0, 6.0 ] - | }, { - | "name" : "e", - | "count" : 6, - | "VALIDITY" : [ 1, 1, 1, 1, 1, 1 ], - | "DATA" : [ 1, 2, 3, 4, 5, 6 ] - | } ] - | } ] - |} - """.stripMargin - - val data = List(1, 2, 3, 4, 5, 6) - val data_tuples = for (d <- data) yield { - (d.toShort, d.toFloat, d.toInt, d.toDouble, d.toLong) - } - val df = data_tuples.toDF("a", "b", "c", "d", "e") - - collectAndValidate(df, json, "mixed_numeric_types.json") - } - - test("string type conversion") { - val json = - s""" - |{ - | "schema" : { - | "fields" : [ { - | "name" : "upper_case", - | "type" : { - | "name" : "utf8" - | }, - | "nullable" : true, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "OFFSET", - | "typeBitWidth" : 32 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 8 - | } ] - | } - | }, { - | "name" : "lower_case", - | "type" : { - | "name" : "utf8" - | }, - | "nullable" : true, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "OFFSET", - | "typeBitWidth" : 32 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 8 - | } ] - | } - | }, { - | "name" : "null_str", - | "type" : { - | "name" : "utf8" - | }, - | "nullable" : true, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "OFFSET", - | "typeBitWidth" : 32 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 8 - | } ] - | } - | } ] - | }, - | "batches" : [ { - | "count" : 3, - | "columns" : [ { - | "name" : "upper_case", - | "count" : 3, - | "VALIDITY" : [ 1, 1, 1 ], - | "OFFSET" : [ 0, 1, 2, 3 ], - | "DATA" : [ "A", "B", "C" ] - | }, { - | "name" : "lower_case", - | "count" : 3, - | "VALIDITY" : [ 1, 1, 1 ], - | "OFFSET" : [ 0, 1, 2, 3 ], - | "DATA" : [ "a", "b", "c" ] - | }, { - | "name" : "null_str", - | "count" : 3, - | "VALIDITY" : [ 1, 1, 0 ], - | "OFFSET" : [ 0, 2, 5, 5 ], - | "DATA" : [ "ab", "CDE", "" ] - | } ] - | } ] - |} - """.stripMargin - - val upperCase = Seq("A", "B", "C") - val lowerCase = Seq("a", "b", "c") - val nullStr = Seq("ab", "CDE", null) - val df = (upperCase, lowerCase, nullStr).zipped.toList - .toDF("upper_case", "lower_case", "null_str") - - collectAndValidate(df, json, "stringData.json") - } - - test("boolean type conversion") { - val json = - s""" - |{ - | "schema" : { - | "fields" : [ { - | "name" : "a_bool", - | "type" : { - | "name" : "bool" - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 1 - | } ] - | } - | } ] - | }, - | "batches" : [ { - | "count" : 4, - | "columns" : [ { - | "name" : "a_bool", - | "count" : 4, - | "VALIDITY" : [ 1, 1, 1, 1 ], - | "DATA" : [ true, true, false, true ] - | } ] - | } ] - |} - """.stripMargin - val df = Seq(true, true, false, true).toDF("a_bool") - collectAndValidate(df, json, "boolData.json") - } - - test("byte type conversion") { - val json = - s""" - |{ - | "schema" : { - | "fields" : [ { - | "name" : "a_byte", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 8 - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 8 - | } ] - | } - | } ] - | }, - | "batches" : [ { - | "count" : 4, - | "columns" : [ { - | "name" : "a_byte", - | "count" : 4, - | "VALIDITY" : [ 1, 1, 1, 1 ], - | "DATA" : [ 1, -1, 64, 127 ] - | } ] - | } ] - |} - | - """.stripMargin - val df = List[Byte](1.toByte, (-1).toByte, 64.toByte, Byte.MaxValue).toDF("a_byte") - collectAndValidate(df, json, "byteData.json") - } - - test("binary type conversion") { - val json = - s""" - |{ - | "schema" : { - | "fields" : [ { - | "name" : "a_binary", - | "type" : { - | "name" : "binary" - | }, - | "nullable" : true, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "OFFSET", - | "typeBitWidth" : 32 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 8 - | } ] - | } - | } ] - | }, - | "batches" : [ { - | "count" : 3, - | "columns" : [ { - | "name" : "a_binary", - | "count" : 3, - | "VALIDITY" : [ 1, 1, 1 ], - | "OFFSET" : [ 0, 3, 4, 6 ], - | "DATA" : [ "616263", "64", "6566" ] - | } ] - | } ] - |} - """.stripMargin - - val data = Seq("abc", "d", "ef") - val rdd = sparkContext.parallelize(data.map(s => Row(s.getBytes("utf-8")))) - val df = spark.createDataFrame(rdd, StructType(Seq(StructField("a_binary", BinaryType)))) - - collectAndValidate(df, json, "binaryData.json") - } - - test("floating-point NaN") { - val json = - s""" - |{ - | "schema" : { - | "fields" : [ { - | "name" : "NaN_f", - | "type" : { - | "name" : "floatingpoint", - | "precision" : "SINGLE" - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 32 - | } ] - | } - | }, { - | "name" : "NaN_d", - | "type" : { - | "name" : "floatingpoint", - | "precision" : "DOUBLE" - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 64 - | } ] - | } - | } ] - | }, - | "batches" : [ { - | "count" : 2, - | "columns" : [ { - | "name" : "NaN_f", - | "count" : 2, - | "VALIDITY" : [ 1, 1 ], - | "DATA" : [ 1.2000000476837158, "NaN" ] - | }, { - | "name" : "NaN_d", - | "count" : 2, - | "VALIDITY" : [ 1, 1 ], - | "DATA" : [ "NaN", 1.2 ] - | } ] - | } ] - |} - """.stripMargin - - val fnan = Seq(1.2F, Float.NaN) - val dnan = Seq(Double.NaN, 1.2) - val df = fnan.zip(dnan).toDF("NaN_f", "NaN_d") - - collectAndValidate(df, json, "nanData-floating_point.json") - } - - test("partitioned DataFrame") { - val json1 = - s""" - |{ - | "schema" : { - | "fields" : [ { - | "name" : "a", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 32 - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 32 - | } ] - | } - | }, { - | "name" : "b", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 32 - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 32 - | } ] - | } - | } ] - | }, - | "batches" : [ { - | "count" : 3, - | "columns" : [ { - | "name" : "a", - | "count" : 3, - | "VALIDITY" : [ 1, 1, 1 ], - | "DATA" : [ 1, 1, 2 ] - | }, { - | "name" : "b", - | "count" : 3, - | "VALIDITY" : [ 1, 1, 1 ], - | "DATA" : [ 1, 2, 1 ] - | } ] - | } ] - |} - """.stripMargin - val json2 = - s""" - |{ - | "schema" : { - | "fields" : [ { - | "name" : "a", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 32 - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 32 - | } ] - | } - | }, { - | "name" : "b", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 32 - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 32 - | } ] - | } - | } ] - | }, - | "batches" : [ { - | "count" : 3, - | "columns" : [ { - | "name" : "a", - | "count" : 3, - | "VALIDITY" : [ 1, 1, 1 ], - | "DATA" : [ 2, 3, 3 ] - | }, { - | "name" : "b", - | "count" : 3, - | "VALIDITY" : [ 1, 1, 1 ], - | "DATA" : [ 2, 1, 2 ] - | } ] - | } ] - |} - """.stripMargin - - val arrowPayloads = testData2.toArrowPayload.collect() - // NOTE: testData2 should have 2 partitions -> 2 arrow batches in payload - assert(arrowPayloads.length === 2) - val schema = testData2.schema - - val tempFile1 = new File(tempDataPath, "testData2-ints-part1.json") - val tempFile2 = new File(tempDataPath, "testData2-ints-part2.json") - Files.write(json1, tempFile1, StandardCharsets.UTF_8) - Files.write(json2, tempFile2, StandardCharsets.UTF_8) - - validateConversion(schema, arrowPayloads(0), tempFile1) - validateConversion(schema, arrowPayloads(1), tempFile2) - } - - test("empty frame collect") { - val arrowPayload = spark.emptyDataFrame.toArrowPayload.collect() - assert(arrowPayload.isEmpty) - - val filteredDF = List[Int](1, 2, 3, 4, 5, 6).toDF("i") - val filteredArrowPayload = filteredDF.filter("i < 0").toArrowPayload.collect() - assert(filteredArrowPayload.isEmpty) - } - - test("empty partition collect") { - val emptyPart = spark.sparkContext.parallelize(Seq(1), 2).toDF("i") - val arrowPayloads = emptyPart.toArrowPayload.collect() - assert(arrowPayloads.length === 1) - val allocator = new RootAllocator(Long.MaxValue) - val arrowRecordBatches = arrowPayloads.map(_.loadBatch(allocator)) - assert(arrowRecordBatches.head.getLength == 1) - arrowRecordBatches.foreach(_.close()) - allocator.close() - } - - test("max records in batch conf") { - val totalRecords = 10 - val maxRecordsPerBatch = 3 - spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", maxRecordsPerBatch) - val df = spark.sparkContext.parallelize(1 to totalRecords, 2).toDF("i") - val arrowPayloads = df.toArrowPayload.collect() - val allocator = new RootAllocator(Long.MaxValue) - val arrowRecordBatches = arrowPayloads.map(_.loadBatch(allocator)) - var recordCount = 0 - arrowRecordBatches.foreach { batch => - assert(batch.getLength > 0) - assert(batch.getLength <= maxRecordsPerBatch) - recordCount += batch.getLength - batch.close() - } - assert(recordCount == totalRecords) - allocator.close() - spark.conf.unset("spark.sql.execution.arrow.maxRecordsPerBatch") - } - - testQuietly("unsupported types") { - def runUnsupported(block: => Unit): Unit = { - val msg = intercept[SparkException] { - block - } - assert(msg.getMessage.contains("Unsupported data type")) - assert(msg.getCause.getClass === classOf[UnsupportedOperationException]) - } - - runUnsupported { decimalData.toArrowPayload.collect() } - runUnsupported { arrayData.toDF().toArrowPayload.collect() } - runUnsupported { mapData.toDF().toArrowPayload.collect() } - runUnsupported { complexData.toArrowPayload.collect() } - - val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS z", Locale.US) - val d1 = new Date(sdf.parse("2015-04-08 13:10:15.000 UTC").getTime) - val d2 = new Date(sdf.parse("2016-05-09 13:10:15.000 UTC").getTime) - runUnsupported { Seq(d1, d2).toDF("date").toArrowPayload.collect() } - - val ts1 = new Timestamp(sdf.parse("2013-04-08 01:10:15.567 UTC").getTime) - val ts2 = new Timestamp(sdf.parse("2013-04-08 13:10:10.789 UTC").getTime) - runUnsupported { Seq(ts1, ts2).toDF("timestamp").toArrowPayload.collect() } - } - - test("test Arrow Validator") { - val json = - s""" - |{ - | "schema" : { - | "fields" : [ { - | "name" : "a_i", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 32 - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 32 - | } ] - | } - | }, { - | "name" : "b_i", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 32 - | }, - | "nullable" : true, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 32 - | } ] - | } - | } ] - | }, - | "batches" : [ { - | "count" : 6, - | "columns" : [ { - | "name" : "a_i", - | "count" : 6, - | "VALIDITY" : [ 1, 1, 1, 1, 1, 1 ], - | "DATA" : [ 1, -1, 2, -2, 2147483647, -2147483648 ] - | }, { - | "name" : "b_i", - | "count" : 6, - | "VALIDITY" : [ 1, 0, 0, 1, 0, 1 ], - | "DATA" : [ 1, 0, 0, -2, 0, -2147483648 ] - | } ] - | } ] - |} - """.stripMargin - val json_diff_col_order = - s""" - |{ - | "schema" : { - | "fields" : [ { - | "name" : "b_i", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 32 - | }, - | "nullable" : true, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 32 - | } ] - | } - | }, { - | "name" : "a_i", - | "type" : { - | "name" : "int", - | "isSigned" : true, - | "bitWidth" : 32 - | }, - | "nullable" : false, - | "children" : [ ], - | "typeLayout" : { - | "vectors" : [ { - | "type" : "VALIDITY", - | "typeBitWidth" : 1 - | }, { - | "type" : "DATA", - | "typeBitWidth" : 32 - | } ] - | } - | } ] - | }, - | "batches" : [ { - | "count" : 6, - | "columns" : [ { - | "name" : "a_i", - | "count" : 6, - | "VALIDITY" : [ 1, 1, 1, 1, 1, 1 ], - | "DATA" : [ 1, -1, 2, -2, 2147483647, -2147483648 ] - | }, { - | "name" : "b_i", - | "count" : 6, - | "VALIDITY" : [ 1, 0, 0, 1, 0, 1 ], - | "DATA" : [ 1, 0, 0, -2, 0, -2147483648 ] - | } ] - | } ] - |} - """.stripMargin - - val a_i = List[Int](1, -1, 2, -2, 2147483647, -2147483648) - val b_i = List[Option[Int]](Some(1), None, None, Some(-2), None, Some(-2147483648)) - val df = a_i.zip(b_i).toDF("a_i", "b_i") - - // Different schema - intercept[IllegalArgumentException] { - collectAndValidate(df, json_diff_col_order, "validator_diff_schema.json") - } - - // Different values - intercept[IllegalArgumentException] { - collectAndValidate(df.sort($"a_i".desc), json, "validator_diff_values.json") - } - } - - /** Test that a converted DataFrame to Arrow record batch equals batch read from JSON file */ - private def collectAndValidate(df: DataFrame, json: String, file: String): Unit = { - // NOTE: coalesce to single partition because can only load 1 batch in validator - val arrowPayload = df.coalesce(1).toArrowPayload.collect().head - val tempFile = new File(tempDataPath, file) - Files.write(json, tempFile, StandardCharsets.UTF_8) - validateConversion(df.schema, arrowPayload, tempFile) - } - - private def validateConversion( - sparkSchema: StructType, - arrowPayload: ArrowPayload, - jsonFile: File): Unit = { - val allocator = new RootAllocator(Long.MaxValue) - val jsonReader = new JsonFileReader(jsonFile, allocator) - - val arrowSchema = ArrowConverters.schemaToArrowSchema(sparkSchema) - val jsonSchema = jsonReader.start() - Validator.compareSchemas(arrowSchema, jsonSchema) - - val arrowRoot = VectorSchemaRoot.create(arrowSchema, allocator) - val vectorLoader = new VectorLoader(arrowRoot) - val arrowRecordBatch = arrowPayload.loadBatch(allocator) - vectorLoader.load(arrowRecordBatch) - val jsonRoot = jsonReader.read() - Validator.compareVectorSchemaRoot(arrowRoot, jsonRoot) - - jsonRoot.close() - jsonReader.close() - arrowRecordBatch.close() - arrowRoot.close() - allocator.close() - } -}