From b4819404a65f9b97c1f8deb1fcb8419969831574 Mon Sep 17 00:00:00 2001
From: hyukjinkwon <gurwls223@gmail.com>
Date: Thu, 14 Apr 2016 15:43:44 +0800
Subject: [PATCH] [SPARK-14596][SQL] Remove not used SqlNewHadoopRDD and some
 more unused imports

## What changes were proposed in this pull request?

Old `HadoopFsRelation` API includes `buildInternalScan()` which uses `SqlNewHadoopRDD` in `ParquetRelation`.
Because now the old API is removed, `SqlNewHadoopRDD` is not used anymore.

So, this PR removes `SqlNewHadoopRDD` and several unused imports.

This was discussed in https://github.com/apache/spark/pull/12326.

## How was this patch tested?

Several related existing unit tests and `sbt scalastyle`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #12354 from HyukjinKwon/SPARK-14596.
---
 .../org/apache/spark/rdd/HadoopRDD.scala      |   8 +-
 ...DState.scala => InputFileNameHolder.scala} |   6 +-
 project/MimaExcludes.scala                    |   5 -
 .../catalyst/expressions/InputFileName.scala  |   8 +-
 .../execution/datasources/FileScanRDD.scala   |  11 +-
 .../datasources/SqlNewHadoopRDD.scala         | 282 ------------------
 .../apache/spark/sql/internal/SQLConf.scala   |   1 -
 .../datasources/FileSourceStrategySuite.scala |   5 +-
 8 files changed, 16 insertions(+), 310 deletions(-)
 rename core/src/main/scala/org/apache/spark/rdd/{SqlNewHadoopRDDState.scala => InputFileNameHolder.scala} (86%)
 delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala

diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 08db96edd6..ac5ba9e79f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -213,15 +213,13 @@ class HadoopRDD[K, V](
       logInfo("Input split: " + split.inputSplit)
       val jobConf = getJobConf()
 
-      // TODO: there is a lot of duplicate code between this and NewHadoopRDD and SqlNewHadoopRDD
-
       val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
       val existingBytesRead = inputMetrics.bytesRead
 
       // Sets the thread local variable for the file's name
       split.inputSplit.value match {
-        case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString)
-        case _ => SqlNewHadoopRDDState.unsetInputFileName()
+        case fs: FileSplit => InputFileNameHolder.setInputFileName(fs.getPath.toString)
+        case _ => InputFileNameHolder.unsetInputFileName()
       }
 
       // Find a function that will return the FileSystem bytes read by this thread. Do this before
@@ -271,7 +269,7 @@ class HadoopRDD[K, V](
 
       override def close() {
         if (reader != null) {
-          SqlNewHadoopRDDState.unsetInputFileName()
+          InputFileNameHolder.unsetInputFileName()
           // Close the reader and release it. Note: it's very important that we don't close the
           // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
           // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDDState.scala b/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala
similarity index 86%
rename from core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDDState.scala
rename to core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala
index 3f15fff793..108e9d2558 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDDState.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala
@@ -20,10 +20,10 @@ package org.apache.spark.rdd
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
- * State for SqlNewHadoopRDD objects. This is split this way because of the package splits.
- * TODO: Move/Combine this with org.apache.spark.sql.datasources.SqlNewHadoopRDD
+ * This holds file names of the current Spark task. This is used in HadoopRDD,
+ * FileScanRDD and InputFileName function in Spark SQL.
  */
-private[spark] object SqlNewHadoopRDDState {
+private[spark] object InputFileNameHolder {
   /**
    * The thread variable for the name of the current file being read. This is used by
    * the InputFileName function in Spark SQL.
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a30581eb48..313bf93b5d 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -847,7 +847,6 @@ object MimaExcludes {
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreInsertCastAndRename$"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsingAsSelect$"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoDataSource$"),
-        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopPartition"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$PartitionValues$"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DefaultWriterContainer"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$PartitionValues"),
@@ -856,10 +855,8 @@ object MimaExcludes {
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DynamicPartitionWriterContainer"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsingAsSelect"),
-        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand$"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$"),
-        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreInsertCastAndRename"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.Partition$"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.LogicalRelation$"),
@@ -870,7 +867,6 @@ object MimaExcludes {
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreWriteCheck"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsing"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.RefreshTable"),
-        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DataSourceStrategy$"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsing"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsingAsSelect$"),
@@ -884,7 +880,6 @@ object MimaExcludes {
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CaseInsensitiveMap"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DataSourceStrategy"),
-        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD$"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec$"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand"),
         ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
index dbd0acf06c..2ed6fc0d38 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
@@ -17,14 +17,14 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.apache.spark.rdd.SqlNewHadoopRDDState
+import org.apache.spark.rdd.InputFileNameHolder
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
 import org.apache.spark.sql.types.{DataType, StringType}
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
- * Expression that returns the name of the current file being read in using [[SqlNewHadoopRDD]]
+ * Expression that returns the name of the current file being read.
  */
 @ExpressionDescription(
   usage = "_FUNC_() - Returns the name of the current file being read if available",
@@ -40,12 +40,12 @@ case class InputFileName() extends LeafExpression with Nondeterministic {
   override protected def initInternal(): Unit = {}
 
   override protected def evalInternal(input: InternalRow): UTF8String = {
-    SqlNewHadoopRDDState.getInputFileName()
+    InputFileNameHolder.getInputFileName()
   }
 
   override def genCode(ctx: CodegenContext, ev: ExprCode): String = {
     ev.isNull = "false"
     s"final ${ctx.javaType(dataType)} ${ev.value} = " +
-      "org.apache.spark.rdd.SqlNewHadoopRDDState.getInputFileName();"
+      "org.apache.spark.rdd.InputFileNameHolder.getInputFileName();"
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 988c785dbe..468e101fed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.{Partition, TaskContext}
-import org.apache.spark.rdd.{RDD, SqlNewHadoopRDDState}
+import org.apache.spark.rdd.{InputFileNameHolder, RDD}
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
 
@@ -37,7 +37,6 @@ case class PartitionedFile(
   }
 }
 
-
 /**
  * A collection of files that should be read as a single task possibly from multiple partitioned
  * directories.
@@ -50,7 +49,7 @@ class FileScanRDD(
     @transient val sqlContext: SQLContext,
     readFunction: (PartitionedFile) => Iterator[InternalRow],
     @transient val filePartitions: Seq[FilePartition])
-    extends RDD[InternalRow](sqlContext.sparkContext, Nil) {
+  extends RDD[InternalRow](sqlContext.sparkContext, Nil) {
 
   override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
     val iterator = new Iterator[Object] with AutoCloseable {
@@ -65,17 +64,17 @@ class FileScanRDD(
         if (files.hasNext) {
           val nextFile = files.next()
           logInfo(s"Reading File $nextFile")
-          SqlNewHadoopRDDState.setInputFileName(nextFile.filePath)
+          InputFileNameHolder.setInputFileName(nextFile.filePath)
           currentIterator = readFunction(nextFile)
           hasNext
         } else {
-          SqlNewHadoopRDDState.unsetInputFileName()
+          InputFileNameHolder.unsetInputFileName()
           false
         }
       }
 
       override def close() = {
-        SqlNewHadoopRDDState.unsetInputFileName()
+        InputFileNameHolder.unsetInputFileName()
       }
     }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
deleted file mode 100644
index 4d6864d8ba..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import java.text.SimpleDateFormat
-import java.util.Date
-
-import scala.reflect.ClassTag
-
-import org.apache.hadoop.conf.{Configurable, Configuration}
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
-import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
-
-import org.apache.spark.{Partition => SparkPartition, _}
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.executor.DataReadMethod
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
-
-private[spark] class SqlNewHadoopPartition(
-    rddId: Int,
-    val index: Int,
-    rawSplit: InputSplit with Writable)
-  extends SparkPartition {
-
-  val serializableHadoopSplit = new SerializableWritable(rawSplit)
-
-  override def hashCode(): Int = 41 * (41 + rddId) + index
-}
-
-/**
- * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
- * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`).
- * It is based on [[org.apache.spark.rdd.NewHadoopRDD]]. It has three additions.
- * 1. A shared broadcast Hadoop Configuration.
- * 2. An optional closure `initDriverSideJobFuncOpt` that set configurations at the driver side
- *    to the shared Hadoop Configuration.
- * 3. An optional closure `initLocalJobFuncOpt` that set configurations at both the driver side
- *    and the executor side to the shared Hadoop Configuration.
- *
- * Note: This is RDD is basically a cloned version of [[org.apache.spark.rdd.NewHadoopRDD]] with
- * changes based on [[org.apache.spark.rdd.HadoopRDD]].
- */
-private[spark] class SqlNewHadoopRDD[V: ClassTag](
-    sqlContext: SQLContext,
-    broadcastedConf: Broadcast[SerializableConfiguration],
-    @transient private val initDriverSideJobFuncOpt: Option[Job => Unit],
-    initLocalJobFuncOpt: Option[Job => Unit],
-    inputFormatClass: Class[_ <: InputFormat[Void, V]],
-    valueClass: Class[V])
-    extends RDD[V](sqlContext.sparkContext, Nil) with Logging {
-
-  protected def getJob(): Job = {
-    val conf = broadcastedConf.value.value
-    // "new Job" will make a copy of the conf. Then, it is
-    // safe to mutate conf properties with initLocalJobFuncOpt
-    // and initDriverSideJobFuncOpt.
-    val newJob = Job.getInstance(conf)
-    initLocalJobFuncOpt.map(f => f(newJob))
-    newJob
-  }
-
-  def getConf(isDriverSide: Boolean): Configuration = {
-    val job = getJob()
-    if (isDriverSide) {
-      initDriverSideJobFuncOpt.map(f => f(job))
-    }
-    job.getConfiguration
-  }
-
-  private val jobTrackerId: String = {
-    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
-    formatter.format(new Date())
-  }
-
-  @transient protected val jobId = new JobID(jobTrackerId, id)
-
-  override def getPartitions: Array[SparkPartition] = {
-    val conf = getConf(isDriverSide = true)
-    val inputFormat = inputFormatClass.newInstance
-    inputFormat match {
-      case configurable: Configurable =>
-        configurable.setConf(conf)
-      case _ =>
-    }
-    val jobContext = new JobContextImpl(conf, jobId)
-    val rawSplits = inputFormat.getSplits(jobContext).toArray
-    val result = new Array[SparkPartition](rawSplits.size)
-    for (i <- 0 until rawSplits.size) {
-      result(i) =
-        new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
-    }
-    result
-  }
-
-  override def compute(
-    theSplit: SparkPartition,
-    context: TaskContext): Iterator[V] = {
-    val iter = new Iterator[V] {
-      val split = theSplit.asInstanceOf[SqlNewHadoopPartition]
-      logInfo("Input split: " + split.serializableHadoopSplit)
-      val conf = getConf(isDriverSide = false)
-
-      val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
-      val existingBytesRead = inputMetrics.bytesRead
-
-      // Sets the thread local variable for the file's name
-      split.serializableHadoopSplit.value match {
-        case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString)
-        case _ => SqlNewHadoopRDDState.unsetInputFileName()
-      }
-
-      // Find a function that will return the FileSystem bytes read by this thread. Do this before
-      // creating RecordReader, because RecordReader's constructor might read some bytes
-      val getBytesReadCallback: Option[() => Long] = split.serializableHadoopSplit.value match {
-        case _: FileSplit | _: CombineFileSplit =>
-              SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
-        case _ => None
-      }
-
-      // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
-      // If we do a coalesce, however, we are likely to compute multiple partitions in the same
-      // task and in the same thread, in which case we need to avoid override values written by
-      // previous partitions (SPARK-13071).
-      def updateBytesRead(): Unit = {
-        getBytesReadCallback.foreach { getBytesRead =>
-          inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
-        }
-      }
-
-      val format = inputFormatClass.newInstance
-      format match {
-        case configurable: Configurable =>
-          configurable.setConf(conf)
-        case _ =>
-      }
-      val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
-      val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
-      private[this] var reader: RecordReader[Void, V] = format.createRecordReader(
-          split.serializableHadoopSplit.value, hadoopAttemptContext)
-      reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
-
-      // Register an on-task-completion callback to close the input stream.
-      context.addTaskCompletionListener(context => close())
-
-      private[this] var havePair = false
-      private[this] var finished = false
-
-      override def hasNext: Boolean = {
-        if (context.isInterrupted()) {
-          throw new TaskKilledException
-        }
-        if (!finished && !havePair) {
-          finished = !reader.nextKeyValue
-          if (finished) {
-            // Close and release the reader here; close() will also be called when the task
-            // completes, but for tasks that read from many files, it helps to release the
-            // resources early.
-            close()
-          }
-          havePair = !finished
-        }
-        !finished
-      }
-
-      override def next(): V = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        if (!finished) {
-          inputMetrics.incRecordsReadInternal(1)
-        }
-        if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
-          updateBytesRead()
-        }
-        reader.getCurrentValue
-      }
-
-      private def close() {
-        if (reader != null) {
-          SqlNewHadoopRDDState.unsetInputFileName()
-          // Close the reader and release it. Note: it's very important that we don't close the
-          // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
-          // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
-          // corruption issues when reading compressed input.
-          try {
-            reader.close()
-          } catch {
-            case e: Exception =>
-              if (!ShutdownHookManager.inShutdown()) {
-                logWarning("Exception in RecordReader.close()", e)
-              }
-          } finally {
-            reader = null
-          }
-          if (getBytesReadCallback.isDefined) {
-            updateBytesRead()
-          } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
-            split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
-            // If we can't get the bytes read from the FS stats, fall back to the split size,
-            // which may be inaccurate.
-            try {
-              inputMetrics.incBytesReadInternal(split.serializableHadoopSplit.value.getLength)
-            } catch {
-              case e: java.io.IOException =>
-                logWarning("Unable to get input size to set InputMetrics for task", e)
-            }
-          }
-        }
-      }
-    }
-    iter
-  }
-
-  override def getPreferredLocations(hsplit: SparkPartition): Seq[String] = {
-    val split = hsplit.asInstanceOf[SqlNewHadoopPartition].serializableHadoopSplit.value
-    val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
-      case Some(c) =>
-        try {
-          val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
-          Some(HadoopRDD.convertSplitLocationInfo(infos))
-        } catch {
-          case e : Exception =>
-            logDebug("Failed to use InputSplit#getLocationInfo.", e)
-            None
-        }
-      case None => None
-    }
-    locs.getOrElse(split.getLocations.filter(_ != "localhost"))
-  }
-
-  override def persist(storageLevel: StorageLevel): this.type = {
-    if (storageLevel.deserialized) {
-      logWarning("Caching NewHadoopRDDs as deserialized objects usually leads to undesired" +
-        " behavior because Hadoop's RecordReader reuses the same Writable object for all records." +
-        " Use a map transformation to make copies of the records.")
-    }
-    super.persist(storageLevel)
-  }
-
-  /**
-   * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
-   * the given function rather than the index of the partition.
-   */
-  private[spark] class NewHadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag](
-      prev: RDD[T],
-      f: (InputSplit, Iterator[T]) => Iterator[U],
-      preservesPartitioning: Boolean = false)
-    extends RDD[U](prev) {
-
-    override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
-
-    override def getPartitions: Array[SparkPartition] = firstParent[T].partitions
-
-    override def compute(split: SparkPartition, context: TaskContext): Iterator[U] = {
-      val partition = split.asInstanceOf[SqlNewHadoopPartition]
-      val inputSplit = partition.serializableHadoopSplit.value
-      f(inputSplit, firstParent[T].iterator(split, context))
-    }
-  }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e74fb00cb2..2f9d63c2e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -28,7 +28,6 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.sql.catalyst.CatalystConf
-import org.apache.spark.util.Utils
 
 ////////////////////////////////////////////////////////////////////////////////////////////////////
 // This file defines the configuration options for Spark SQL.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 0b74f07540..dac56d3936 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -22,8 +22,6 @@ import java.io.File
 import org.apache.hadoop.fs.FileStatus
 import org.apache.hadoop.mapreduce.Job
 
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper}
@@ -34,8 +32,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, StructType}
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-import org.apache.spark.util.collection.BitSet
+import org.apache.spark.util.Utils
 
 class FileSourceStrategySuite extends QueryTest with SharedSQLContext with PredicateHelper {
   import testImplicits._
-- 
GitLab