diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index f07374ac0c79b02711b1f93faf964fb589a16c49..ba2e1e2bc269d8d02a80a9561208df072c2633a9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -181,20 +181,20 @@ class DefaultSource extends FileFormat with DataSourceRegister { partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], - options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { + options: Map[String, String], + hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { verifySchema(dataSchema) val numFeatures = options("numFeatures").toInt assert(numFeatures > 0) val sparse = options.getOrElse("vectorType", "sparse") == "sparse" - val broadcastedConf = sparkSession.sparkContext.broadcast( - new SerializableConfiguration( - new Configuration(sparkSession.sparkContext.hadoopConfiguration))) + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) (file: PartitionedFile) => { val points = - new HadoopFileLinesReader(file, broadcastedConf.value.value) + new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value) .map(_.toString.trim) .filterNot(line => line.isEmpty || line.startsWith("#")) .map { line => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 9e1308bed5107dc97b878213ed0f95d4ece3881a..c26cae84d7190ba761e992e0f3c636fe6c978d9c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources import scala.collection.mutable.ArrayBuffer +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} import org.apache.spark.internal.Logging @@ -106,13 +107,17 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}") + val hadoopConf = new Configuration(files.sparkSession.sessionState.hadoopConf) + files.options.foreach { case (k, v) => if (v ne null) hadoopConf.set(k, v) } + val readFile = files.fileFormat.buildReader( sparkSession = files.sparkSession, dataSchema = files.dataSchema, partitionSchema = files.partitionSchema, requiredSchema = prunedDataSchema, filters = pushedDownFilters, - options = files.options) + options = files.options, + hadoopConf = hadoopConf) val plannedPartitions = files.bucketSpec match { case Some(bucketing) if files.sparkSession.sessionState.conf.bucketingEnabled => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index b2483e69a674abb4a92818f2cecbf1a14e32a492..fa954975b8a6e1fdaaec29ab35721817cd715f46 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -106,6 +106,10 @@ private[sql] case class InsertIntoHadoopFsRelation( val job = Job.getInstance(hadoopConf) job.setOutputKeyClass(classOf[Void]) job.setOutputValueClass(classOf[InternalRow]) + + // Also set the options in Hadoop Configuration + options.foreach { case (k, v) => if (v ne null) job.getConfiguration.set(k, v) } + FileOutputFormat.setOutputPath(job, qualifiedOutputPath) val partitionSet = AttributeSet(partitionColumns) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala index fb047ff867cdc6e65747e4bc6e92db36b0f575ca..8ca105d92375f8bc945f9454535e09d380bdded4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala @@ -99,16 +99,17 @@ class DefaultSource extends FileFormat with DataSourceRegister { partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], - options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { + options: Map[String, String], + hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { val csvOptions = new CSVOptions(options) val headers = requiredSchema.fields.map(_.name) - val conf = new Configuration(sparkSession.sessionState.hadoopConf) - val broadcastedConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf)) + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) (file: PartitionedFile) => { val lineIterator = { - val conf = broadcastedConf.value.value + val conf = broadcastedHadoopConf.value.value new HadoopFileLinesReader(file, conf).map { line => new String(line.getBytes, 0, line.getLength, csvOptions.charset) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 2628788ad37130c4df862093eb1dc9c354dfe180..3058e79201787c5a6e1817baeb586f2f8594f5ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -217,7 +217,8 @@ trait FileFormat { partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], - options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = { + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { // TODO: Remove this default implementation when the other formats have been ported // Until then we guard in [[FileSourceStrategy]] to only call this method on supported formats. throw new UnsupportedOperationException(s"buildReader is not supported for $this") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index f9c34c6bb5dd8c349b5849338087d36522a4e12b..b6b3907e3e5f00e6b691093de84f6e8ad7a45e21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -97,10 +97,10 @@ class DefaultSource extends FileFormat with DataSourceRegister { partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], - options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = { - val conf = new Configuration(sparkSession.sessionState.hadoopConf) - val broadcastedConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf)) + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) val parsedOptions: JSONOptions = new JSONOptions(options) val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord @@ -109,8 +109,8 @@ class DefaultSource extends FileFormat with DataSourceRegister { val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes val joinedRow = new JoinedRow() - file => { - val lines = new HadoopFileLinesReader(file, broadcastedConf.value.value).map(_.toString) + (file: PartitionedFile) => { + val lines = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map(_.toString) val rows = JacksonParser.parseJson( lines, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index b156581564f0b06e0e41621277298dc76b1e67b4..5be8770790f1035e112115c60c349efec3e8074f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -262,13 +262,13 @@ private[sql] class DefaultSource partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], - options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = { - val parquetConf = new Configuration(sparkSession.sessionState.hadoopConf) - parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) - parquetConf.set( + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) + hadoopConf.set( CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, CatalystSchemaConverter.checkFieldNames(requiredSchema).json) - parquetConf.set( + hadoopConf.set( CatalystWriteSupport.SPARK_ROW_SCHEMA, CatalystSchemaConverter.checkFieldNames(requiredSchema).json) @@ -276,13 +276,13 @@ private[sql] class DefaultSource // This metadata is only useful for detecting optional columns when pushdowning filters. val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField, requiredSchema).asInstanceOf[StructType] - CatalystWriteSupport.setSchema(dataSchemaToWrite, parquetConf) + CatalystWriteSupport.setSchema(dataSchemaToWrite, hadoopConf) // Sets flags for `CatalystSchemaConverter` - parquetConf.setBoolean( + hadoopConf.setBoolean( SQLConf.PARQUET_BINARY_AS_STRING.key, sparkSession.getConf(SQLConf.PARQUET_BINARY_AS_STRING)) - parquetConf.setBoolean( + hadoopConf.setBoolean( SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sparkSession.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP)) @@ -298,8 +298,8 @@ private[sql] class DefaultSource None } - val broadcastedConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(parquetConf)) + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) // TODO: if you move this into the closure it reverts to the default values. // If true, enable using the custom RecordReader for parquet. This only works for @@ -327,7 +327,8 @@ private[sql] class DefaultSource null) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedConf.value.value, attemptId) + val hadoopAttemptContext = + new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) val parquetReader = if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index a0d680c7080dbc5876083ade9666dbb0086f6593..348edfcf7a851ce4dcc921e0ce47d57a2804a837 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -89,17 +89,17 @@ class DefaultSource extends FileFormat with DataSourceRegister { partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], - options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = { - val conf = new Configuration(sparkSession.sessionState.hadoopConf) - val broadcastedConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf)) + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - file => { + (file: PartitionedFile) => { val unsafeRow = new UnsafeRow(1) val bufferHolder = new BufferHolder(unsafeRow) val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1) - new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line => + new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map { line => // Writes to an UnsafeRow directly bufferHolder.reset() unsafeRowWriter.write(0, line.getBytes, 0, line.getLength) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 9da0af3a760756daa3598482848cbf9e6735b0a4..f73d485acf31b8a8b8b9e22969a5324943d1bd9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources import java.io.File import java.util.concurrent.atomic.AtomicInteger +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{BlockLocation, FileStatus, RawLocalFileSystem} import org.apache.hadoop.mapreduce.Job @@ -476,7 +477,8 @@ class TestFileFormat extends FileFormat { partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], - options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = { + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { // Record the arguments so they can be checked in the test case. LastArguments.partitionSchema = partitionSchema diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index cb49fc910b78600a3253fdc263f70f6e32c02f5c..4f81967a5beeb206e756da1dd464084aeb399abe 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -114,22 +114,21 @@ private[sql] class DefaultSource partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], - options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { - val orcConf = new Configuration(sparkSession.sessionState.hadoopConf) - + options: Map[String, String], + hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { if (sparkSession.sessionState.conf.orcFilterPushDown) { // Sets pushed predicates OrcFilters.createFilter(filters.toArray).foreach { f => - orcConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo) - orcConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) + hadoopConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo) + hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) } } - val broadcastedConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(orcConf)) + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) (file: PartitionedFile) => { - val conf = broadcastedConf.value.value + val conf = broadcastedHadoopConf.value.value // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file @@ -154,7 +153,7 @@ private[sql] class DefaultSource // Specifically would be helpful for partitioned datasets. val orcReader = OrcFile.createReader( new Path(new URI(file.filePath)), OrcFile.readerOptions(conf)) - new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart(), fileSplit.getLength()) + new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart, fileSplit.getLength) } // Unwraps `OrcStruct`s to `UnsafeRow`s diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index 71e3457d2596a92a1e3435a15d7048f2a5258177..9ad0887609ed9774162005c75c8580bb8ff943c6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -65,4 +65,27 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat .load(file.getCanonicalPath)) } } + + test("test hadoop conf option propagation") { + withTempPath { file => + // Test write side + val df = sqlContext.range(10).selectExpr("cast(id as string)") + df.write + .option("some-random-write-option", "hahah-WRITE") + .option("some-null-value-option", null) // test null robustness + .option("dataSchema", df.schema.json) + .format(dataSourceName).save(file.getAbsolutePath) + assert(SimpleTextRelation.lastHadoopConf.get.get("some-random-write-option") == "hahah-WRITE") + + // Test read side + val df1 = sqlContext.read + .option("some-random-read-option", "hahah-READ") + .option("some-null-value-option", null) // test null robustness + .option("dataSchema", df.schema.json) + .format(dataSourceName) + .load(file.getAbsolutePath) + df1.count() + assert(SimpleTextRelation.lastHadoopConf.get.get("some-random-read-option") == "hahah-READ") + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index e4bd1f93c59f16ddeb8dfb22b819ba27a4e5cde5..0fa18414154ce32c0b07fda18f91ebbac55ec0bb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -47,13 +47,16 @@ class SimpleTextSource extends FileFormat with DataSourceRegister { sparkSession: SparkSession, job: Job, options: Map[String, String], - dataSchema: StructType): OutputWriterFactory = new OutputWriterFactory { - override def newInstance( - path: String, - bucketId: Option[Int], - dataSchema: StructType, - context: TaskAttemptContext): OutputWriter = { - new SimpleTextOutputWriter(path, context) + dataSchema: StructType): OutputWriterFactory = { + SimpleTextRelation.lastHadoopConf = Option(job.getConfiguration) + new OutputWriterFactory { + override def newInstance( + path: String, + bucketId: Option[Int], + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new SimpleTextOutputWriter(path, context) + } } } @@ -63,8 +66,9 @@ class SimpleTextSource extends FileFormat with DataSourceRegister { partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], - options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { - + options: Map[String, String], + hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { + SimpleTextRelation.lastHadoopConf = Option(hadoopConf) SimpleTextRelation.requiredColumns = requiredSchema.fieldNames SimpleTextRelation.pushedFilters = filters.toSet @@ -74,9 +78,8 @@ class SimpleTextSource extends FileFormat with DataSourceRegister { inputAttributes.find(_.name == field.name) } - val conf = new Configuration(sparkSession.sessionState.hadoopConf) - val broadcastedConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf)) + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) (file: PartitionedFile) => { val predicate = { @@ -95,7 +98,7 @@ class SimpleTextSource extends FileFormat with DataSourceRegister { val projection = new InterpretedProjection(outputAttributes, inputAttributes) val unsafeRowIterator = - new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line => + new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map { line => val record = line.toString new GenericInternalRow(record.split(",", -1).zip(fieldTypes).map { case (v, dataType) => @@ -164,4 +167,7 @@ object SimpleTextRelation { // Used to test failure callback var callbackCalled = false + + // Used by the test case to check the value propagated in the hadoop confs. + var lastHadoopConf: Option[Configuration] = None }