Skip to content
Snippets Groups Projects
Commit 8014a516 authored by Michael Armbrust's avatar Michael Armbrust
Browse files

[SPARK-13883][SQL] Parquet Implementation of FileFormat.buildReader

This PR add implements the new `buildReader` interface for the Parquet `FileFormat`.  An simple implementation of `FileScanRDD` is also included.

This code should be tested by the many existing tests for parquet.

Author: Michael Armbrust <michael@databricks.com>
Author: Sameer Agarwal <sameer@databricks.com>
Author: Nong Li <nong@databricks.com>

Closes #11709 from marmbrus/parquetReader.
parent 72999616
No related branches found
No related tags found
No related merge requests found
Showing
with 366 additions and 46 deletions
...@@ -29,8 +29,10 @@ import org.apache.parquet.schema.PrimitiveType; ...@@ -29,8 +29,10 @@ import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Type;
import org.apache.spark.memory.MemoryMode; import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
import org.apache.spark.sql.execution.vectorized.ColumnarBatch; import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.*;
/** /**
* A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
...@@ -52,7 +54,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa ...@@ -52,7 +54,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
private int numBatched = 0; private int numBatched = 0;
/** /**
* For each request column, the reader to read this column. * For each request column, the reader to read this column. This is NULL if this column
* is missing from the file, in which case we populate the attribute with NULL.
*/ */
private VectorizedColumnReader[] columnReaders; private VectorizedColumnReader[] columnReaders;
...@@ -66,6 +69,11 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa ...@@ -66,6 +69,11 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
*/ */
private long totalCountLoadedSoFar = 0; private long totalCountLoadedSoFar = 0;
/**
* For each column, true if the column is missing in the file and we'll instead return NULLs.
*/
private boolean[] missingColumns;
/** /**
* columnBatch object that is used for batch decoding. This is created on first use and triggers * columnBatch object that is used for batch decoding. This is created on first use and triggers
* batched decoding. It is not valid to interleave calls to the batched interface with the row * batched decoding. It is not valid to interleave calls to the batched interface with the row
...@@ -163,14 +171,53 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa ...@@ -163,14 +171,53 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
* This object is reused. Calling this enables the vectorized reader. This should be called * This object is reused. Calling this enables the vectorized reader. This should be called
* before any calls to nextKeyValue/nextBatch. * before any calls to nextKeyValue/nextBatch.
*/ */
public ColumnarBatch resultBatch() {
return resultBatch(DEFAULT_MEMORY_MODE);
}
public ColumnarBatch resultBatch(MemoryMode memMode) { // Creates a columnar batch that includes the schema from the data files and the additional
if (columnarBatch == null) { // partition columns appended to the end of the batch.
columnarBatch = ColumnarBatch.allocate(sparkSchema, memMode); // For example, if the data contains two columns, with 2 partition columns:
// Columns 0,1: data columns
// Column 2: partitionValues[0]
// Column 3: partitionValues[1]
public void initBatch(MemoryMode memMode, StructType partitionColumns,
InternalRow partitionValues) {
StructType batchSchema = new StructType();
for (StructField f: sparkSchema.fields()) {
batchSchema = batchSchema.add(f);
}
if (partitionColumns != null) {
for (StructField f : partitionColumns.fields()) {
batchSchema = batchSchema.add(f);
}
}
columnarBatch = ColumnarBatch.allocate(batchSchema);
if (partitionColumns != null) {
int partitionIdx = sparkSchema.fields().length;
for (int i = 0; i < partitionColumns.fields().length; i++) {
ColumnVectorUtils.populate(columnarBatch.column(i + partitionIdx), partitionValues, i);
columnarBatch.column(i + partitionIdx).setIsConstant();
}
}
// Initialize missing columns with nulls.
for (int i = 0; i < missingColumns.length; i++) {
if (missingColumns[i]) {
columnarBatch.column(i).putNulls(0, columnarBatch.capacity());
columnarBatch.column(i).setIsConstant();
}
} }
}
public void initBatch() {
initBatch(DEFAULT_MEMORY_MODE, null, null);
}
public void initBatch(StructType partitionColumns, InternalRow partitionValues) {
initBatch(DEFAULT_MEMORY_MODE, partitionColumns, partitionValues);
}
public ColumnarBatch resultBatch() {
if (columnarBatch == null) initBatch();
return columnarBatch; return columnarBatch;
} }
...@@ -191,6 +238,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa ...@@ -191,6 +238,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
int num = (int) Math.min((long) columnarBatch.capacity(), totalCountLoadedSoFar - rowsReturned); int num = (int) Math.min((long) columnarBatch.capacity(), totalCountLoadedSoFar - rowsReturned);
for (int i = 0; i < columnReaders.length; ++i) { for (int i = 0; i < columnReaders.length; ++i) {
if (columnReaders[i] == null) continue;
columnReaders[i].readBatch(num, columnarBatch.column(i)); columnReaders[i].readBatch(num, columnarBatch.column(i));
} }
rowsReturned += num; rowsReturned += num;
...@@ -205,6 +253,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa ...@@ -205,6 +253,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
* Check that the requested schema is supported. * Check that the requested schema is supported.
*/ */
OriginalType[] originalTypes = new OriginalType[requestedSchema.getFieldCount()]; OriginalType[] originalTypes = new OriginalType[requestedSchema.getFieldCount()];
missingColumns = new boolean[requestedSchema.getFieldCount()];
for (int i = 0; i < requestedSchema.getFieldCount(); ++i) { for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
Type t = requestedSchema.getFields().get(i); Type t = requestedSchema.getFields().get(i);
if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) { if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
...@@ -223,9 +272,19 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa ...@@ -223,9 +272,19 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
throw new IOException("Int96 not supported."); throw new IOException("Int96 not supported.");
} }
ColumnDescriptor fd = fileSchema.getColumnDescription(requestedSchema.getPaths().get(i)); String[] colPath = requestedSchema.getPaths().get(i);
if (!fd.equals(requestedSchema.getColumns().get(i))) { if (fileSchema.containsPath(colPath)) {
throw new IOException("Schema evolution not supported."); ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
if (!fd.equals(requestedSchema.getColumns().get(i))) {
throw new IOException("Schema evolution not supported.");
}
missingColumns[i] = false;
} else {
if (requestedSchema.getColumns().get(i).getMaxDefinitionLevel() == 0) {
// Column is missing in data but the required data is non-nullable. This file is invalid.
throw new IOException("Required column is missing in data file. Col: " + colPath);
}
missingColumns[i] = true;
} }
} }
} }
...@@ -240,6 +299,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa ...@@ -240,6 +299,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
List<ColumnDescriptor> columns = requestedSchema.getColumns(); List<ColumnDescriptor> columns = requestedSchema.getColumns();
columnReaders = new VectorizedColumnReader[columns.size()]; columnReaders = new VectorizedColumnReader[columns.size()];
for (int i = 0; i < columns.size(); ++i) { for (int i = 0; i < columns.size(); ++i) {
if (missingColumns[i]) continue;
columnReaders[i] = new VectorizedColumnReader(columns.get(i), columnReaders[i] = new VectorizedColumnReader(columns.get(i),
pages.getPageReader(columns.get(i))); pages.getPageReader(columns.get(i)));
} }
......
...@@ -256,6 +256,8 @@ public abstract class ColumnVector { ...@@ -256,6 +256,8 @@ public abstract class ColumnVector {
* Resets this column for writing. The currently stored values are no longer accessible. * Resets this column for writing. The currently stored values are no longer accessible.
*/ */
public void reset() { public void reset() {
if (isConstant) return;
if (childColumns != null) { if (childColumns != null) {
for (ColumnVector c: childColumns) { for (ColumnVector c: childColumns) {
c.reset(); c.reset();
...@@ -822,6 +824,11 @@ public abstract class ColumnVector { ...@@ -822,6 +824,11 @@ public abstract class ColumnVector {
*/ */
public final boolean isArray() { return resultArray != null; } public final boolean isArray() { return resultArray != null; }
/**
* Marks this column as being constant.
*/
public final void setIsConstant() { isConstant = true; }
/** /**
* Maximum number of rows that can be stored in this column. * Maximum number of rows that can be stored in this column.
*/ */
...@@ -843,6 +850,12 @@ public abstract class ColumnVector { ...@@ -843,6 +850,12 @@ public abstract class ColumnVector {
*/ */
protected boolean anyNullsSet; protected boolean anyNullsSet;
/**
* True if this column's values are fixed. This means the column values never change, even
* across resets.
*/
protected boolean isConstant;
/** /**
* Default size of each array length value. This grows as necessary. * Default size of each array length value. This grows as necessary.
*/ */
......
...@@ -33,7 +33,7 @@ import org.apache.spark.unsafe.types.UTF8String; ...@@ -33,7 +33,7 @@ import org.apache.spark.unsafe.types.UTF8String;
/** /**
* This class is the in memory representation of rows as they are streamed through operators. It * This class is the in memory representation of rows as they are streamed through operators. It
* is designed to maximize CPU efficiency and not storage footprint. Since it is expected that * is designed to maximize CPU efficiency and not storage footprint. Since it is expected that
* each operator allocates one of thee objects, the storage footprint on the task is negligible. * each operator allocates one of these objects, the storage footprint on the task is negligible.
* *
* The layout is a columnar with values encoded in their native format. Each RowBatch contains * The layout is a columnar with values encoded in their native format. Each RowBatch contains
* a horizontal partitioning of the data, split into columns. * a horizontal partitioning of the data, split into columns.
......
...@@ -233,7 +233,6 @@ case class DataSource( ...@@ -233,7 +233,6 @@ case class DataSource(
"It must be specified manually") "It must be specified manually")
} }
HadoopFsRelation( HadoopFsRelation(
sqlContext, sqlContext,
fileCatalog, fileCatalog,
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources package org.apache.spark.sql.execution.datasources
import org.apache.spark.{Partition, TaskContext} import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.{RDD, SqlNewHadoopRDDState}
import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
...@@ -31,13 +31,17 @@ case class PartitionedFile( ...@@ -31,13 +31,17 @@ case class PartitionedFile(
partitionValues: InternalRow, partitionValues: InternalRow,
filePath: String, filePath: String,
start: Long, start: Long,
length: Long) length: Long) {
override def toString(): String = {
s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues"
}
}
/** /**
* A collection of files that should be read as a single task possibly from multiple partitioned * A collection of files that should be read as a single task possibly from multiple partitioned
* directories. * directories.
* *
* IMPLEMENT ME: This is just a placeholder for a future implementation.
* TODO: This currently does not take locality information about the files into account. * TODO: This currently does not take locality information about the files into account.
*/ */
case class FilePartition(val index: Int, files: Seq[PartitionedFile]) extends Partition case class FilePartition(val index: Int, files: Seq[PartitionedFile]) extends Partition
...@@ -48,10 +52,38 @@ class FileScanRDD( ...@@ -48,10 +52,38 @@ class FileScanRDD(
@transient val filePartitions: Seq[FilePartition]) @transient val filePartitions: Seq[FilePartition])
extends RDD[InternalRow](sqlContext.sparkContext, Nil) { extends RDD[InternalRow](sqlContext.sparkContext, Nil) {
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
throw new NotImplementedError("Not Implemented Yet") val iterator = new Iterator[Object] with AutoCloseable {
private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
private[this] var currentIterator: Iterator[Object] = null
def hasNext = (currentIterator != null && currentIterator.hasNext) || nextIterator()
def next() = currentIterator.next()
/** Advances to the next file. Returns true if a new non-empty iterator is available. */
private def nextIterator(): Boolean = {
if (files.hasNext) {
val nextFile = files.next()
logInfo(s"Reading File $nextFile")
SqlNewHadoopRDDState.setInputFileName(nextFile.filePath)
currentIterator = readFunction(nextFile)
hasNext
} else {
SqlNewHadoopRDDState.unsetInputFileName()
false
}
}
override def close() = {
SqlNewHadoopRDDState.unsetInputFileName()
}
}
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => iterator.close())
iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack.
} }
override protected def getPartitions: Array[Partition] = Array.empty override protected def getPartitions: Array[Partition] = filePartitions.toArray
} }
...@@ -57,7 +57,9 @@ import org.apache.spark.sql.types._ ...@@ -57,7 +57,9 @@ import org.apache.spark.sql.types._
private[sql] object FileSourceStrategy extends Strategy with Logging { private[sql] object FileSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projects, filters, l@LogicalRelation(files: HadoopFsRelation, _, _)) case PhysicalOperation(projects, filters, l@LogicalRelation(files: HadoopFsRelation, _, _))
if files.fileFormat.toString == "TestFileFormat" => if (files.fileFormat.toString == "TestFileFormat" ||
files.fileFormat.isInstanceOf[parquet.DefaultSource]) &&
files.sqlContext.conf.parquetFileScan =>
// Filters on this relation fall into four categories based on where we can use them to avoid // Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data: // reading unneeded data:
// - partition keys only - used to prune directories to read // - partition keys only - used to prune directories to read
...@@ -67,12 +69,15 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { ...@@ -67,12 +69,15 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val filterSet = ExpressionSet(filters) val filterSet = ExpressionSet(filters)
val partitionColumns = val partitionColumns =
AttributeSet( l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver)
l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver)) val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters = val partitionKeyFilters =
ExpressionSet(filters.filter(_.references.subsetOf(partitionColumns))) ExpressionSet(filters.filter(_.references.subsetOf(partitionSet)))
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
val dataColumns =
l.resolve(files.dataSchema, files.sqlContext.sessionState.analyzer.resolver)
val bucketColumns = val bucketColumns =
AttributeSet( AttributeSet(
files.bucketSpec files.bucketSpec
...@@ -82,7 +87,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { ...@@ -82,7 +87,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
.getOrElse(sys.error("")))) .getOrElse(sys.error(""))))
// Partition keys are not available in the statistics of the files. // Partition keys are not available in the statistics of the files.
val dataFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty) val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty)
// Predicates with both partition keys and attributes need to be evaluated after the scan. // Predicates with both partition keys and attributes need to be evaluated after the scan.
val afterScanFilters = filterSet -- partitionKeyFilters val afterScanFilters = filterSet -- partitionKeyFilters
...@@ -92,11 +97,13 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { ...@@ -92,11 +97,13 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val filterAttributes = AttributeSet(afterScanFilters) val filterAttributes = AttributeSet(afterScanFilters)
val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects
val requiredAttributes = AttributeSet(requiredExpressions).map(_.name).toSet val requiredAttributes = AttributeSet(requiredExpressions)
val prunedDataSchema = val readDataColumns =
StructType( dataColumns
files.dataSchema.filter(f => requiredAttributes.contains(f.name))) .filter(requiredAttributes.contains)
.filterNot(partitionColumns.contains)
val prunedDataSchema = readDataColumns.toStructType
logInfo(s"Pruned Data Schema: ${prunedDataSchema.simpleString(5)}") logInfo(s"Pruned Data Schema: ${prunedDataSchema.simpleString(5)}")
val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
...@@ -132,7 +139,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { ...@@ -132,7 +139,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val splitFiles = selectedPartitions.flatMap { partition => val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file => partition.files.flatMap { file =>
assert(file.getLen != 0) assert(file.getLen != 0, file.toString)
(0L to file.getLen by maxSplitBytes).map { offset => (0L to file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
...@@ -180,17 +187,20 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { ...@@ -180,17 +187,20 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val scan = val scan =
DataSourceScan( DataSourceScan(
l.output, readDataColumns ++ partitionColumns,
new FileScanRDD( new FileScanRDD(
files.sqlContext, files.sqlContext,
readFile, readFile,
plannedPartitions), plannedPartitions),
files, files,
Map("format" -> files.fileFormat.toString)) Map(
"Format" -> files.fileFormat.toString,
"PushedFilters" -> pushedDownFilters.mkString("[", ", ", "]"),
"ReadSchema" -> prunedDataSchema.simpleString))
val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And) val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
val withFilter = afterScanFilter.map(execution.Filter(_, scan)).getOrElse(scan) val withFilter = afterScanFilter.map(execution.Filter(_, scan)).getOrElse(scan)
val withProjections = if (projects.forall(_.isInstanceOf[AttributeReference])) { val withProjections = if (projects == withFilter.output) {
withFilter withFilter
} else { } else {
execution.Project(projects, withFilter) execution.Project(projects, withFilter)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.spark.sql.catalyst.InternalRow
/**
* An adaptor from a Hadoop [[RecordReader]] to an [[Iterator]] over the values returned.
*
* Note that this returns [[Object]]s instead of [[InternalRow]] because we rely on erasure to pass
* column batches by pretending they are rows.
*/
class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T] {
private[this] var havePair = false
private[this] var finished = false
override def hasNext: Boolean = {
if (!finished && !havePair) {
finished = !rowReader.nextKeyValue
if (finished) {
// Close and release the reader here; close() will also be called when the task
// completes, but for tasks that read from many files, it helps to release the
// resources early.
rowReader.close()
}
havePair = !finished
}
!finished
}
override def next(): T = {
if (!hasNext) {
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
rowReader.getCurrentValue
}
}
...@@ -24,14 +24,16 @@ import java.util.logging.{Logger => JLogger} ...@@ -24,14 +24,16 @@ import java.util.logging.{Logger => JLogger}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable import scala.collection.mutable
import scala.util.{Failure, Try} import scala.util.{Failure, Try}
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Writable import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.hadoop.mapreduce.task.JobContextImpl import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
import org.apache.parquet.{Log => ApacheParquetLog} import org.apache.parquet.{Log => ApacheParquetLog}
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.hadoop.metadata.CompressionCodecName
...@@ -45,16 +47,21 @@ import org.apache.spark.internal.Logging ...@@ -45,16 +47,21 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD} import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD}
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.execution.datasources.{PartitionSpec, _} import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils} import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.util.collection.BitSet import org.apache.spark.util.collection.BitSet
private[sql] class DefaultSource
private[sql] class DefaultSource extends FileFormat with DataSourceRegister with Logging { extends FileFormat
with DataSourceRegister
with Logging
with Serializable {
override def shortName(): String = "parquet" override def shortName(): String = "parquet"
...@@ -269,6 +276,137 @@ private[sql] class DefaultSource extends FileFormat with DataSourceRegister with ...@@ -269,6 +276,137 @@ private[sql] class DefaultSource extends FileFormat with DataSourceRegister with
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
} }
/**
* Returns a function that can be used to read a single file in as an Iterator of InternalRow.
*
* @param partitionSchema The schema of the partition column row that will be present in each
* PartitionedFile. These columns should be prepended to the rows that
* are produced by the iterator.
* @param dataSchema The schema of the data that should be output for each row. This may be a
* subset of the columns that are present in the file if column pruning has
* occurred.
* @param filters A set of filters than can optionally be used to reduce the number of rows output
* @param options A set of string -> string configuration options.
* @return
*/
override def buildReader(
sqlContext: SQLContext,
partitionSchema: StructType,
dataSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
val parquetConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
parquetConf.set(
CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
CatalystSchemaConverter.checkFieldNames(dataSchema).json)
parquetConf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
CatalystSchemaConverter.checkFieldNames(dataSchema).json)
// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
dataSchema).asInstanceOf[StructType]
CatalystWriteSupport.setSchema(dataSchemaToWrite, parquetConf)
// Sets flags for `CatalystSchemaConverter`
parquetConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sqlContext.conf.getConf(SQLConf.PARQUET_BINARY_AS_STRING))
parquetConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sqlContext.conf.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
// Try to push down filters when filter push-down is enabled.
val pushed = if (sqlContext.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(ParquetFilters.createFilter(dataSchema, _))
.reduceOption(FilterApi.and)
} else {
None
}
val broadcastedConf =
sqlContext.sparkContext.broadcast(new SerializableConfiguration(parquetConf))
// TODO: if you move this into the closure it reverts to the default values.
// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
val enableVectorizedParquetReader: Boolean =
sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean
val enableWholestageCodegen: Boolean =
sqlContext.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key).toBoolean
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
val fileSplit =
new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
val split =
new org.apache.parquet.hadoop.ParquetInputSplit(
fileSplit.getPath,
fileSplit.getStart,
fileSplit.getStart + fileSplit.getLength,
fileSplit.getLength,
fileSplit.getLocations,
null)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedConf.value.value, attemptId)
val parquetReader = try {
if (!enableVectorizedParquetReader) sys.error("Vectorized reader turned off.")
val vectorizedReader = new VectorizedParquetRecordReader()
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
// TODO: fix column appending
if (enableWholestageCodegen) {
logDebug(s"Enabling batch returning")
vectorizedReader.enableReturningBatches()
}
vectorizedReader
} catch {
case NonFatal(e) =>
logDebug(s"Falling back to parquet-mr: $e", e)
val reader = pushed match {
case Some(filter) =>
new ParquetRecordReader[InternalRow](
new CatalystReadSupport,
FilterCompat.get(filter, null))
case _ =>
new ParquetRecordReader[InternalRow](new CatalystReadSupport)
}
reader.initialize(split, hadoopAttemptContext)
reader
}
val iter = new RecordReaderIterator(parquetReader)
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] &&
enableVectorizedParquetReader) {
iter.asInstanceOf[Iterator[InternalRow]]
} else {
val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
// This is a horrible erasure hack... if we type the iterator above, then it actually check
// the type in next() and we get a class cast exception. If we make that function return
// Object, then we can defer the cast until later!
iter.asInstanceOf[Iterator[InternalRow]]
.map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
}
}
}
override def buildInternalScan( override def buildInternalScan(
sqlContext: SQLContext, sqlContext: SQLContext,
dataSchema: StructType, dataSchema: StructType,
......
...@@ -288,6 +288,11 @@ object SQLConf { ...@@ -288,6 +288,11 @@ object SQLConf {
defaultValue = Some(true), defaultValue = Some(true),
doc = "Whether the query analyzer should be case sensitive or not.") doc = "Whether the query analyzer should be case sensitive or not.")
val PARQUET_FILE_SCAN = booleanConf("spark.sql.parquet.fileScan",
defaultValue = Some(true),
doc = "Use the new FileScanRDD path for reading parquet data.",
isPublic = false)
val PARQUET_SCHEMA_MERGING_ENABLED = booleanConf("spark.sql.parquet.mergeSchema", val PARQUET_SCHEMA_MERGING_ENABLED = booleanConf("spark.sql.parquet.mergeSchema",
defaultValue = Some(false), defaultValue = Some(false),
doc = "When true, the Parquet data source merges schemas collected from all data files, " + doc = "When true, the Parquet data source merges schemas collected from all data files, " +
...@@ -555,6 +560,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin ...@@ -555,6 +560,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin
def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION) def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)
def parquetFileScan: Boolean = getConf(PARQUET_FILE_SCAN)
def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA) def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA)
def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
......
...@@ -415,7 +415,7 @@ case class HadoopFsRelation( ...@@ -415,7 +415,7 @@ case class HadoopFsRelation(
def refresh(): Unit = location.refresh() def refresh(): Unit = location.refresh()
override def toString: String = override def toString: String =
s"$fileFormat part: ${partitionSchema.simpleString}, data: ${dataSchema.simpleString}" s"HadoopFiles"
/** Returns the list of files that will be read when scanning this relation. */ /** Returns the list of files that will be read when scanning this relation. */
override def inputFiles: Array[String] = override def inputFiles: Array[String] =
...@@ -551,10 +551,13 @@ class HDFSFileCatalog( ...@@ -551,10 +551,13 @@ class HDFSFileCatalog(
override def listFiles(filters: Seq[Expression]): Seq[Partition] = { override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
if (partitionSpec().partitionColumns.isEmpty) { if (partitionSpec().partitionColumns.isEmpty) {
Partition(InternalRow.empty, allFiles()) :: Nil Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil
} else { } else {
prunePartitions(filters, partitionSpec()).map { prunePartitions(filters, partitionSpec()).map {
case PartitionDirectory(values, path) => Partition(values, getStatus(path)) case PartitionDirectory(values, path) =>
Partition(
values,
getStatus(path).filterNot(_.getPath.getName startsWith "_"))
} }
} }
} }
......
...@@ -445,7 +445,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { ...@@ -445,7 +445,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
} }
} }
test("SPARK-6352 DirectParquetOutputCommitter") { testQuietly("SPARK-6352 DirectParquetOutputCommitter") {
val clonedConf = new Configuration(hadoopConfiguration) val clonedConf = new Configuration(hadoopConfiguration)
// Write to a parquet file and let it fail. // Write to a parquet file and let it fail.
...@@ -469,7 +469,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { ...@@ -469,7 +469,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
} }
} }
test("SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible") { testQuietly("SPARK-9849 DirectParquetOutputCommitter qualified name backwards compatiblity") {
val clonedConf = new Configuration(hadoopConfiguration) val clonedConf = new Configuration(hadoopConfiguration)
// Write to a parquet file and let it fail. // Write to a parquet file and let it fail.
......
...@@ -31,6 +31,7 @@ import org.apache.spark.sql._ ...@@ -31,6 +31,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.Filter
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
/** /**
...@@ -204,10 +205,11 @@ private[sql] trait SQLTestUtils ...@@ -204,10 +205,11 @@ private[sql] trait SQLTestUtils
*/ */
protected def stripSparkFilter(df: DataFrame): DataFrame = { protected def stripSparkFilter(df: DataFrame): DataFrame = {
val schema = df.schema val schema = df.schema
val childRDD = df val withoutFilters = df.queryExecution.sparkPlan transform {
.queryExecution case Filter(_, child) => child
.sparkPlan.asInstanceOf[org.apache.spark.sql.execution.Filter] }
.child
val childRDD = withoutFilters
.execute() .execute()
.map(row => Row.fromSeq(row.copy().toSeq(schema))) .map(row => Row.fromSeq(row.copy().toSeq(schema)))
......
...@@ -1724,6 +1724,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { ...@@ -1724,6 +1724,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
withTable("tbl10562") { withTable("tbl10562") {
val df = Seq(2012 -> "a").toDF("Year", "val") val df = Seq(2012 -> "a").toDF("Year", "val")
df.write.partitionBy("Year").saveAsTable("tbl10562") df.write.partitionBy("Year").saveAsTable("tbl10562")
checkAnswer(sql("SELECT year FROM tbl10562"), Row(2012))
checkAnswer(sql("SELECT Year FROM tbl10562"), Row(2012)) checkAnswer(sql("SELECT Year FROM tbl10562"), Row(2012))
checkAnswer(sql("SELECT yEAr FROM tbl10562"), Row(2012)) checkAnswer(sql("SELECT yEAr FROM tbl10562"), Row(2012))
checkAnswer(sql("SELECT val FROM tbl10562 WHERE Year > 2015"), Nil) checkAnswer(sql("SELECT val FROM tbl10562 WHERE Year > 2015"), Nil)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment