Skip to content
Snippets Groups Projects
Commit c26d1742 authored by Nong's avatar Nong Committed by Davies Liu
Browse files

[SPARK-12636] [SQL] Update UnsafeRowParquetRecordReader to support reading files directly.

As noted in the code, this change is to make this component easier to test in isolation.

Author: Nong <nongli@gmail.com>

Closes #10581 from nongli/spark-12636.
parent 13a3b636
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
package org.apache.spark.sql.execution.datasources.parquet; package org.apache.spark.sql.execution.datasources.parquet;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
...@@ -36,6 +37,7 @@ import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; ...@@ -36,6 +37,7 @@ import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordReader;
...@@ -56,6 +58,8 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData; ...@@ -56,6 +58,8 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.apache.spark.sql.types.StructType;
/** /**
* Base class for custom RecordReaaders for Parquet that directly materialize to `T`. * Base class for custom RecordReaaders for Parquet that directly materialize to `T`.
...@@ -69,7 +73,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo ...@@ -69,7 +73,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
protected Path file; protected Path file;
protected MessageType fileSchema; protected MessageType fileSchema;
protected MessageType requestedSchema; protected MessageType requestedSchema;
protected ReadSupport<T> readSupport; protected StructType sparkSchema;
/** /**
* The total number of rows this RecordReader will eventually read. The sum of the * The total number of rows this RecordReader will eventually read. The sum of the
...@@ -125,20 +129,80 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo ...@@ -125,20 +129,80 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
+ " in range " + split.getStart() + ", " + split.getEnd()); + " in range " + split.getStart() + ", " + split.getEnd());
} }
} }
MessageType fileSchema = footer.getFileMetaData().getSchema(); this.fileSchema = footer.getFileMetaData().getSchema();
Map<String, String> fileMetadata = footer.getFileMetaData().getKeyValueMetaData(); Map<String, String> fileMetadata = footer.getFileMetaData().getKeyValueMetaData();
this.readSupport = getReadSupportInstance( ReadSupport<T> readSupport = getReadSupportInstance(
(Class<? extends ReadSupport<T>>) getReadSupportClass(configuration)); (Class<? extends ReadSupport<T>>) getReadSupportClass(configuration));
ReadSupport.ReadContext readContext = readSupport.init(new InitContext( ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema)); taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
this.requestedSchema = readContext.getRequestedSchema(); this.requestedSchema = readContext.getRequestedSchema();
this.fileSchema = fileSchema; this.sparkSchema = new CatalystSchemaConverter(configuration).convert(requestedSchema);
this.reader = new ParquetFileReader(configuration, file, blocks, requestedSchema.getColumns()); this.reader = new ParquetFileReader(configuration, file, blocks, requestedSchema.getColumns());
for (BlockMetaData block : blocks) { for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount(); this.totalRowCount += block.getRowCount();
} }
} }
/**
* Returns the list of files at 'path' recursively. This skips files that are ignored normally
* by MapReduce.
*/
public static List<String> listDirectory(File path) throws IOException {
List<String> result = new ArrayList<String>();
if (path.isDirectory()) {
for (File f: path.listFiles()) {
result.addAll(listDirectory(f));
}
} else {
char c = path.getName().charAt(0);
if (c != '.' && c != '_') {
result.add(path.getAbsolutePath());
}
}
return result;
}
/**
* Initializes the reader to read the file at `path` with `columns` projected. If columns is
* null, all the columns are projected.
*
* This is exposed for testing to be able to create this reader without the rest of the Hadoop
* split machinery. It is not intended for general use and those not support all the
* configurations.
*/
protected void initialize(String path, List<String> columns) throws IOException {
Configuration config = new Configuration();
config.set("spark.sql.parquet.binaryAsString", "false");
config.set("spark.sql.parquet.int96AsTimestamp", "false");
config.set("spark.sql.parquet.writeLegacyFormat", "false");
this.file = new Path(path);
long length = FileSystem.get(config).getFileStatus(this.file).getLen();
ParquetMetadata footer = readFooter(config, file, range(0, length));
List<BlockMetaData> blocks = footer.getBlocks();
this.fileSchema = footer.getFileMetaData().getSchema();
if (columns == null) {
this.requestedSchema = fileSchema;
} else {
Types.MessageTypeBuilder builder = Types.buildMessage();
for (String s: columns) {
if (!fileSchema.containsField(s)) {
throw new IOException("Can only project existing columns. Unknown field: " + s +
" File schema:\n" + fileSchema);
}
builder.addFields(fileSchema.getType(s));
}
this.requestedSchema = builder.named("spark_schema");
}
this.sparkSchema = new CatalystSchemaConverter(config).convert(requestedSchema);
this.reader = new ParquetFileReader(config, file, blocks, requestedSchema.getColumns());
for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount();
}
}
@Override @Override
public Void getCurrentKey() throws IOException, InterruptedException { public Void getCurrentKey() throws IOException, InterruptedException {
return null; return null;
......
...@@ -21,6 +21,7 @@ import java.io.IOException; ...@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.parquet.Preconditions; import org.apache.parquet.Preconditions;
...@@ -121,14 +122,42 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas ...@@ -121,14 +122,42 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException { throws IOException, InterruptedException {
super.initialize(inputSplit, taskAttemptContext); super.initialize(inputSplit, taskAttemptContext);
initializeInternal();
}
/**
* Utility API that will read all the data in path. This circumvents the need to create Hadoop
* objects to use this class. `columns` can contain the list of columns to project.
*/
@Override
public void initialize(String path, List<String> columns) throws IOException {
super.initialize(path, columns);
initializeInternal();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (batchIdx >= numBatched) {
if (!loadBatch()) return false;
}
++batchIdx;
return true;
}
@Override
public UnsafeRow getCurrentValue() throws IOException, InterruptedException {
return rows[batchIdx - 1];
}
@Override
public float getProgress() throws IOException, InterruptedException {
return (float) rowsReturned / totalRowCount;
}
private void initializeInternal() throws IOException {
/** /**
* Check that the requested schema is supported. * Check that the requested schema is supported.
*/ */
if (requestedSchema.getFieldCount() == 0) {
// TODO: what does this mean?
throw new IOException("Empty request schema not supported.");
}
int numVarLenFields = 0; int numVarLenFields = 0;
originalTypes = new OriginalType[requestedSchema.getFieldCount()]; originalTypes = new OriginalType[requestedSchema.getFieldCount()];
for (int i = 0; i < requestedSchema.getFieldCount(); ++i) { for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
...@@ -182,25 +211,6 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas ...@@ -182,25 +211,6 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
} }
} }
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (batchIdx >= numBatched) {
if (!loadBatch()) return false;
}
++batchIdx;
return true;
}
@Override
public UnsafeRow getCurrentValue() throws IOException, InterruptedException {
return rows[batchIdx - 1];
}
@Override
public float getProgress() throws IOException, InterruptedException {
return (float) rowsReturned / totalRowCount;
}
/** /**
* Decodes a batch of values into `rows`. This function is the hot path. * Decodes a batch of values into `rows`. This function is the hot path.
*/ */
...@@ -253,10 +263,11 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas ...@@ -253,10 +263,11 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
case INT96: case INT96:
throw new IOException("Unsupported " + columnReaders[i].descriptor.getType()); throw new IOException("Unsupported " + columnReaders[i].descriptor.getType());
} }
numBatched = num;
batchIdx = 0;
} }
numBatched = num;
batchIdx = 0;
// Update the total row lengths if the schema contained variable length. We did not maintain // Update the total row lengths if the schema contained variable length. We did not maintain
// this as we populated the columns. // this as we populated the columns.
if (containsVarLenFields) { if (containsVarLenFields) {
......
...@@ -18,8 +18,11 @@ ...@@ -18,8 +18,11 @@
package org.apache.spark.sql.execution.datasources.parquet package org.apache.spark.sql.execution.datasources.parquet
import org.apache.parquet.column.{Encoding, ParquetProperties} import org.apache.parquet.column.{Encoding, ParquetProperties}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.util.Utils
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag import scala.reflect.runtime.universe.TypeTag
...@@ -642,6 +645,77 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { ...@@ -642,6 +645,77 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
} }
} }
} }
test("UnsafeRowParquetRecordReader - direct path read") {
val data = (0 to 10).map(i => (i, ((i + 'a').toChar.toString)))
withTempPath { dir =>
sqlContext.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0);
{
val reader = new UnsafeRowParquetRecordReader
try {
reader.initialize(file, null)
val result = mutable.ArrayBuffer.empty[(Int, String)]
while (reader.nextKeyValue()) {
val row = reader.getCurrentValue
val v = (row.getInt(0), row.getString(1))
result += v
}
assert(data == result)
} finally {
reader.close()
}
}
// Project just one column
{
val reader = new UnsafeRowParquetRecordReader
try {
reader.initialize(file, ("_2" :: Nil).asJava)
val result = mutable.ArrayBuffer.empty[(String)]
while (reader.nextKeyValue()) {
val row = reader.getCurrentValue
result += row.getString(0)
}
assert(data.map(_._2) == result)
} finally {
reader.close()
}
}
// Project columns in opposite order
{
val reader = new UnsafeRowParquetRecordReader
try {
reader.initialize(file, ("_2" :: "_1" :: Nil).asJava)
val result = mutable.ArrayBuffer.empty[(String, Int)]
while (reader.nextKeyValue()) {
val row = reader.getCurrentValue
val v = (row.getString(0), row.getInt(1))
result += v
}
assert(data.map { x => (x._2, x._1) } == result)
} finally {
reader.close()
}
}
// Empty projection
{
val reader = new UnsafeRowParquetRecordReader
try {
reader.initialize(file, List[String]().asJava)
var result = 0
while (reader.nextKeyValue()) {
result += 1
}
assert(result == data.length)
} finally {
reader.close()
}
}
}
}
} }
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment