diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index de2503a87ab7d6e5de54783e29918c59c37b7602..83b7c779ab8187aecbe1524a96eb9e574ba509d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
 
 /** Holds a cached logical plan and its data */
-private[sql] case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
+case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
 
 /**
  * Provides support in a SQLContext for caching query results and automatically using these cached
@@ -41,7 +41,7 @@ private[sql] case class CachedData(plan: LogicalPlan, cachedRepresentation: InMe
  *
  * Internal to Spark SQL.
  */
-private[sql] class CacheManager extends Logging {
+class CacheManager extends Logging {
 
   @transient
   private val cachedData = new scala.collection.mutable.ArrayBuffer[CachedData]
@@ -68,13 +68,13 @@ private[sql] class CacheManager extends Logging {
   }
 
   /** Clears all cached tables. */
-  private[sql] def clearCache(): Unit = writeLock {
+  def clearCache(): Unit = writeLock {
     cachedData.foreach(_.cachedRepresentation.cachedColumnBuffers.unpersist())
     cachedData.clear()
   }
 
   /** Checks if the cache is empty. */
-  private[sql] def isEmpty: Boolean = readLock {
+  def isEmpty: Boolean = readLock {
     cachedData.isEmpty
   }
 
@@ -83,7 +83,7 @@ private[sql] class CacheManager extends Logging {
    * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
    * recomputing the in-memory columnar representation of the underlying table is expensive.
    */
-  private[sql] def cacheQuery(
+  def cacheQuery(
       query: Dataset[_],
       tableName: Option[String] = None,
       storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
@@ -108,7 +108,7 @@ private[sql] class CacheManager extends Logging {
    * Tries to remove the data for the given [[Dataset]] from the cache.
    * No operation, if it's already uncached.
    */
-  private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Boolean = writeLock {
+  def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Boolean = writeLock {
     val planToCache = query.queryExecution.analyzed
     val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
     val found = dataIndex >= 0
@@ -120,17 +120,17 @@ private[sql] class CacheManager extends Logging {
   }
 
   /** Optionally returns cached data for the given [[Dataset]] */
-  private[sql] def lookupCachedData(query: Dataset[_]): Option[CachedData] = readLock {
+  def lookupCachedData(query: Dataset[_]): Option[CachedData] = readLock {
     lookupCachedData(query.queryExecution.analyzed)
   }
 
   /** Optionally returns cached data for the given [[LogicalPlan]]. */
-  private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock {
+  def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock {
     cachedData.find(cd => plan.sameResult(cd.plan))
   }
 
   /** Replaces segments of the given logical plan with cached versions where possible. */
-  private[sql] def useCachedData(plan: LogicalPlan): LogicalPlan = {
+  def useCachedData(plan: LogicalPlan): LogicalPlan = {
     plan transformDown {
       case currentFragment =>
         lookupCachedData(currentFragment)
@@ -143,7 +143,7 @@ private[sql] class CacheManager extends Logging {
    * Invalidates the cache of any data that contains `plan`. Note that it is possible that this
    * function will over invalidate.
    */
-  private[sql] def invalidateCache(plan: LogicalPlan): Unit = writeLock {
+  def invalidateCache(plan: LogicalPlan): Unit = writeLock {
     cachedData.foreach {
       case data if data.plan.collect { case p if p.sameResult(plan) => p }.nonEmpty =>
         data.cachedRepresentation.recache()
@@ -155,7 +155,7 @@ private[sql] class CacheManager extends Logging {
    * Invalidates the cache of any data that contains `resourcePath` in one or more
    * `HadoopFsRelation` node(s) as part of its logical plan.
    */
-  private[sql] def invalidateCachedPath(
+  def invalidateCachedPath(
       sparkSession: SparkSession, resourcePath: String): Unit = writeLock {
     val (fs, qualifiedPath) = {
       val path = new Path(resourcePath)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 1e749b3dfcffb092153fff90bbfa948d32c661e9..1a8d0e310aec06b7833dc1d85b3b345e8d7d8f89 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.Utils
 
-private[sql] trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
+trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
   val relation: BaseRelation
   val metastoreTableIdentifier: Option[TableIdentifier]
 
@@ -48,7 +48,7 @@ private[sql] trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
 }
 
 /** Physical plan node for scanning data from a relation. */
-private[sql] case class RowDataSourceScanExec(
+case class RowDataSourceScanExec(
     output: Seq[Attribute],
     rdd: RDD[InternalRow],
     @transient relation: BaseRelation,
@@ -57,7 +57,7 @@ private[sql] case class RowDataSourceScanExec(
     override val metastoreTableIdentifier: Option[TableIdentifier])
   extends DataSourceScanExec {
 
-  private[sql] override lazy val metrics =
+  override lazy val metrics =
     Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   val outputUnsafeRows = relation match {
@@ -138,7 +138,7 @@ private[sql] case class RowDataSourceScanExec(
  * @param dataFilters Data source filters to use for filtering data within partitions.
  * @param metastoreTableIdentifier
  */
-private[sql] case class FileSourceScanExec(
+case class FileSourceScanExec(
     @transient relation: HadoopFsRelation,
     output: Seq[Attribute],
     outputSchema: StructType,
@@ -211,7 +211,7 @@ private[sql] case class FileSourceScanExec(
     inputRDD :: Nil
   }
 
-  private[sql] override lazy val metrics =
+  override lazy val metrics =
     Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
       "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index b762c1691488d47cec846e25c13982f13b561fef..6c4248c60e8931c4bbaaf6dce67cdb73cdcbbf76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -67,7 +67,7 @@ object RDDConversions {
   }
 }
 
-private[sql] object ExternalRDD {
+object ExternalRDD {
 
   def apply[T: Encoder](rdd: RDD[T], session: SparkSession): LogicalPlan = {
     val externalRdd = ExternalRDD(CatalystSerde.generateObjAttr[T], rdd)(session)
@@ -76,7 +76,7 @@ private[sql] object ExternalRDD {
 }
 
 /** Logical plan node for scanning data from an RDD. */
-private[sql] case class ExternalRDD[T](
+case class ExternalRDD[T](
     outputObjAttr: Attribute,
     rdd: RDD[T])(session: SparkSession)
   extends LeafNode with ObjectProducer with MultiInstanceRelation {
@@ -103,11 +103,11 @@ private[sql] case class ExternalRDD[T](
 }
 
 /** Physical plan node for scanning data from an RDD. */
-private[sql] case class ExternalRDDScanExec[T](
+case class ExternalRDDScanExec[T](
     outputObjAttr: Attribute,
     rdd: RDD[T]) extends LeafExecNode with ObjectProducerExec {
 
-  private[sql] override lazy val metrics = Map(
+  override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   protected override def doExecute(): RDD[InternalRow] = {
@@ -128,7 +128,7 @@ private[sql] case class ExternalRDDScanExec[T](
 }
 
 /** Logical plan node for scanning data from an RDD of InternalRow. */
-private[sql] case class LogicalRDD(
+case class LogicalRDD(
     output: Seq[Attribute],
     rdd: RDD[InternalRow])(session: SparkSession)
   extends LeafNode with MultiInstanceRelation {
@@ -155,12 +155,12 @@ private[sql] case class LogicalRDD(
 }
 
 /** Physical plan node for scanning data from an RDD of InternalRow. */
-private[sql] case class RDDScanExec(
+case class RDDScanExec(
     output: Seq[Attribute],
     rdd: RDD[InternalRow],
     override val nodeName: String) extends LeafExecNode {
 
-  private[sql] override lazy val metrics = Map(
+  override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   protected override def doExecute(): RDD[InternalRow] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
index 4c046f7bdca4898002446550c02935f42b4f5bcf..d5603b3b00914b235f627351abf44acc584b42ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala
@@ -39,7 +39,7 @@ case class ExpandExec(
     child: SparkPlan)
   extends UnaryExecNode with CodegenSupport {
 
-  private[sql] override lazy val metrics = Map(
+  override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   // The GroupExpressions can output data with arbitrary partitioning, so set it
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala
index 7a2a9eed5807d1552a9882339a461eb9af044352..a299fed7fd14a641355c330ecb8a0e50bcfcf7d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala
@@ -22,7 +22,7 @@ package org.apache.spark.sql.execution
  * the list of paths that it returns will be returned to a user who calls `inputPaths` on any
  * DataFrame that queries this relation.
  */
-private[sql] trait FileRelation {
+trait FileRelation {
   /** Returns the list of files that will be read when scanning this relation. */
   def inputFiles: Array[String]
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
index 8b62c5507c0c818e190262b5320e28d33830a46c..39189a2b0c72ccd32a94aba0a55cea2b3b61d70e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -55,7 +55,7 @@ case class GenerateExec(
     child: SparkPlan)
   extends UnaryExecNode {
 
-  private[sql] override lazy val metrics = Map(
+  override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   override def producedAttributes: AttributeSet = AttributeSet(output)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
index f86f42b1f80e8886f484b11ba864b1d615493a39..556f482f4b472192ea4251e4dffff9962f61ffa4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
@@ -26,11 +26,11 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
 /**
  * Physical plan node for scanning data from a local collection.
  */
-private[sql] case class LocalTableScanExec(
+case class LocalTableScanExec(
     output: Seq[Attribute],
     rows: Seq[InternalRow]) extends LeafExecNode {
 
-  private[sql] override lazy val metrics = Map(
+  override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   private val unsafeRows: Array[InternalRow] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/RowIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/RowIterator.scala
index 7462dbc4eba3abc3452ee961c2ae3254f78a76b5..717ff93eab5d4729672965dea1caed0c725838dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/RowIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/RowIterator.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.InternalRow
  * iterator to consume the next row, whereas RowIterator combines these calls into a single
  * [[advanceNext()]] method.
  */
-private[sql] abstract class RowIterator {
+abstract class RowIterator {
   /**
    * Advance this iterator by a single row. Returns `false` if this iterator has no more rows
    * and `true` otherwise. If this returns `true`, then the new row can be retrieved by calling
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 6cb1a44a2044ae17f8d13067628c004bffa8279b..ec07aab359ac6d493c81a6dc9ae49730d613034a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd,
   SparkListenerSQLExecutionStart}
 
-private[sql] object SQLExecution {
+object SQLExecution {
 
   val EXECUTION_ID_KEY = "spark.sql.execution.id"
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
index 6db7f45cfdf2cf800756228d5f58d84c258c0e41..d8e0675e3eb6590370f193ad5468d1f85c8dee03 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
@@ -22,11 +22,9 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, GenerateUnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
 import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution}
 import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types._
-import org.apache.spark.util.collection.unsafe.sort.RadixSort;
 
 /**
  * Performs (external) sorting.
@@ -52,7 +50,7 @@ case class SortExec(
 
   private val enableRadixSort = sqlContext.conf.enableRadixSort
 
-  override private[sql] lazy val metrics = Map(
+  override lazy val metrics = Map(
     "sortTime" -> SQLMetrics.createTimingMetric(sparkContext, "sort time"),
     "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
     "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 045ccc7bd6eae3838fb3a9bfdef60eee3bcfcfd3..79cb40948b982352b57492111b100a847f1cf03c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -72,24 +72,24 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   /**
    * Return all metadata that describes more details of this SparkPlan.
    */
-  private[sql] def metadata: Map[String, String] = Map.empty
+  def metadata: Map[String, String] = Map.empty
 
   /**
    * Return all metrics containing metrics of this SparkPlan.
    */
-  private[sql] def metrics: Map[String, SQLMetric] = Map.empty
+  def metrics: Map[String, SQLMetric] = Map.empty
 
   /**
    * Reset all the metrics.
    */
-  private[sql] def resetMetrics(): Unit = {
+  def resetMetrics(): Unit = {
     metrics.valuesIterator.foreach(_.reset())
   }
 
   /**
    * Return a LongSQLMetric according to the name.
    */
-  private[sql] def longMetric(name: String): SQLMetric = metrics(name)
+  def longMetric(name: String): SQLMetric = metrics(name)
 
   // TODO: Move to `DistributedPlan`
   /** Specifies how data is partitioned across different nodes in the cluster. */
@@ -395,7 +395,7 @@ object SparkPlan {
     ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
 }
 
-private[sql] trait LeafExecNode extends SparkPlan {
+trait LeafExecNode extends SparkPlan {
   override def children: Seq[SparkPlan] = Nil
   override def producedAttributes: AttributeSet = outputSet
 }
@@ -407,7 +407,7 @@ object UnaryExecNode {
   }
 }
 
-private[sql] trait UnaryExecNode extends SparkPlan {
+trait UnaryExecNode extends SparkPlan {
   def child: SparkPlan
 
   override def children: Seq[SparkPlan] = child :: Nil
@@ -415,7 +415,7 @@ private[sql] trait UnaryExecNode extends SparkPlan {
   override def outputPartitioning: Partitioning = child.outputPartitioning
 }
 
-private[sql] trait BinaryExecNode extends SparkPlan {
+trait BinaryExecNode extends SparkPlan {
   def left: SparkPlan
   def right: SparkPlan
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
index f84070a0c4bcbc45861dfb54ff410f4516d877a7..7aa93126fdabd9a7671968db2fd80c9980439094 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
@@ -47,7 +47,7 @@ class SparkPlanInfo(
   }
 }
 
-private[sql] object SparkPlanInfo {
+private[execution] object SparkPlanInfo {
 
   def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
     val children = plan match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 4dfec3ec854856096f8351ee1c4291496624f870..4aaf454285f4f397fef59e128052bab017a9c882 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{execution, SaveMode, Strategy}
 import org.apache.spark.sql.catalyst.InternalRow
@@ -43,13 +42,12 @@ import org.apache.spark.sql.streaming.StreamingQuery
  * writing libraries should instead consider using the stable APIs provided in
  * [[org.apache.spark.sql.sources]]
  */
-@DeveloperApi
 abstract class SparkStrategy extends GenericStrategy[SparkPlan] {
 
   override protected def planLater(plan: LogicalPlan): SparkPlan = PlanLater(plan)
 }
 
-private[sql] case class PlanLater(plan: LogicalPlan) extends LeafExecNode {
+case class PlanLater(plan: LogicalPlan) extends LeafExecNode {
 
   override def output: Seq[Attribute] = plan.output
 
@@ -58,7 +56,7 @@ private[sql] case class PlanLater(plan: LogicalPlan) extends LeafExecNode {
   }
 }
 
-private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
+abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
   self: SparkPlanner =>
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
index 484923428f4adcfb35fc24f61e1d96d8f4e796b2..8ab553369de6dfd455ba91ab09fdb0b17d476402 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
@@ -40,12 +40,12 @@ import org.apache.spark.unsafe.Platform
  *
  * @param numFields the number of fields in the row being serialized.
  */
-private[sql] class UnsafeRowSerializer(
+class UnsafeRowSerializer(
     numFields: Int,
     dataSize: SQLMetric = null) extends Serializer with Serializable {
   override def newInstance(): SerializerInstance =
     new UnsafeRowSerializerInstance(numFields, dataSize)
-  override private[spark] def supportsRelocationOfSerializedObjects: Boolean = true
+  override def supportsRelocationOfSerializedObjects: Boolean = true
 }
 
 private class UnsafeRowSerializerInstance(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index ac4c3aae5f8ee2ce24525dd33e7d4300f040378e..fb57ed7692de4373c4d960fe3f20180fdb274375 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -295,7 +295,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
   override def outputPartitioning: Partitioning = child.outputPartitioning
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override private[sql] lazy val metrics = Map(
+  override lazy val metrics = Map(
     "pipelineTime" -> SQLMetrics.createTimingMetric(sparkContext,
       WholeStageCodegenExec.PIPELINE_DURATION_METRIC))
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 54d7340d8acd01be1c9a46a132dfe3752d6ee32f..cfc47aba889aac0a98cfc35fa8be128adc658647 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -54,7 +54,7 @@ case class HashAggregateExec(
     child.output ++ aggregateBufferAttributes ++ aggregateAttributes ++
       aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes)
 
-  override private[sql] lazy val metrics = Map(
+  override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
     "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
     "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
index 00e45256c413143e411559a37950626f409b2e6e..2a81a823c44b3c18f4d0017b8516493e54a7d4db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
@@ -49,7 +49,7 @@ case class SortAggregateExec(
       AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++
       AttributeSet(aggregateBufferAttributes)
 
-  override private[sql] lazy val metrics = Map(
+  override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
index b047bc0641dd2d80234769352da809fdde73de21..586e1456ac69e271d7f5ea6676f2dfb47d57ee9b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
@@ -204,7 +204,7 @@ sealed trait BufferSetterGetterUtils {
 /**
  * A Mutable [[Row]] representing a mutable aggregation buffer.
  */
-private[sql] class MutableAggregationBufferImpl (
+private[aggregate] class MutableAggregationBufferImpl(
     schema: StructType,
     toCatalystConverters: Array[Any => Any],
     toScalaConverters: Array[Any => Any],
@@ -266,7 +266,7 @@ private[sql] class MutableAggregationBufferImpl (
 /**
  * A [[Row]] representing an immutable aggregation buffer.
  */
-private[sql] class InputAggregationBuffer private[sql] (
+private[aggregate] class InputAggregationBuffer(
     schema: StructType,
     toCatalystConverters: Array[Any => Any],
     toScalaConverters: Array[Any => Any],
@@ -319,7 +319,7 @@ private[sql] class InputAggregationBuffer private[sql] (
  * The internal wrapper used to hook a [[UserDefinedAggregateFunction]] `udaf` in the
  * internal aggregation code path.
  */
-private[sql] case class ScalaUDAF(
+case class ScalaUDAF(
     children: Seq[Expression],
     udaf: UserDefinedAggregateFunction,
     mutableAggBufferOffset: Int = 0,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 185c79f899e683b74f9b8eeda7f551a1307ff522..e6f7081f2916d1b154c4c84c7e8b98fc3a9b1bc7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -102,7 +102,7 @@ case class FilterExec(condition: Expression, child: SparkPlan)
     }
   }
 
-  private[sql] override lazy val metrics = Map(
+  override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = {
@@ -228,7 +228,7 @@ case class SampleExec(
     child: SparkPlan) extends UnaryExecNode with CodegenSupport {
   override def output: Seq[Attribute] = child.output
 
-  private[sql] override lazy val metrics = Map(
+  override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   protected override def doExecute(): RDD[InternalRow] = {
@@ -317,7 +317,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
 
   override val output: Seq[Attribute] = range.output
 
-  private[sql] override lazy val metrics = Map(
+  override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   // output attributes should not affect the results
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 079e122a5a85ab962bc876edc74faddfdf855248..479934a7afc75fd840baea4b0ad59799ef79cab6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -34,7 +34,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.CollectionAccumulator
 
 
-private[sql] object InMemoryRelation {
+object InMemoryRelation {
   def apply(
       useCompression: Boolean,
       batchSize: Int,
@@ -55,15 +55,15 @@ private[sql] object InMemoryRelation {
 private[columnar]
 case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
 
-private[sql] case class InMemoryRelation(
+case class InMemoryRelation(
     output: Seq[Attribute],
     useCompression: Boolean,
     batchSize: Int,
     storageLevel: StorageLevel,
     @transient child: SparkPlan,
     tableName: Option[String])(
-    @transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null,
-    private[sql] val batchStats: CollectionAccumulator[InternalRow] =
+    @transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
+    val batchStats: CollectionAccumulator[InternalRow] =
       child.sqlContext.sparkContext.collectionAccumulator[InternalRow])
   extends logical.LeafNode with MultiInstanceRelation {
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 67a410f539b60d5349fda3dd1f49597f30e196d5..b86825902ab3d8100d15dbf2b2d554f20af5122e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.UserDefinedType
 
 
-private[sql] case class InMemoryTableScanExec(
+case class InMemoryTableScanExec(
     attributes: Seq[Attribute],
     predicates: Seq[Expression],
     @transient relation: InMemoryRelation)
@@ -36,7 +36,7 @@ private[sql] case class InMemoryTableScanExec(
 
   override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren
 
-  private[sql] override lazy val metrics = Map(
+  override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   override def output: Seq[Attribute] = attributes
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 7eaad81a81615241f2f6c20460a5f8f6926c21d1..cce1489abd301fafac9af06c4554294fe758b414 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.types._
  * A logical command that is executed for its side-effects.  `RunnableCommand`s are
  * wrapped in `ExecutedCommand` during execution.
  */
-private[sql] trait RunnableCommand extends LogicalPlan with logical.Command {
+trait RunnableCommand extends LogicalPlan with logical.Command {
   override def output: Seq[Attribute] = Seq.empty
   override def children: Seq[LogicalPlan] = Seq.empty
   def run(sparkSession: SparkSession): Seq[Row]
@@ -45,7 +45,7 @@ private[sql] trait RunnableCommand extends LogicalPlan with logical.Command {
  * A physical operator that executes the run method of a `RunnableCommand` and
  * saves the result to prevent multiple executions.
  */
-private[sql] case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan {
+case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan {
   /**
    * A concrete command should override this lazy field to wrap up any side effects caused by the
    * command or any other computation that should be evaluated exactly once. The value of this field
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala
index 377b818096757a089da5d6f182f4a7f30afb4d12..ea4fe9c8ade5fda84f13f1868b6042cd9c43b6db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources
 
-private[sql] object BucketingUtils {
+object BucketingUtils {
   // The file name of bucketed data should have 3 parts:
   //   1. some other information in the head of file name
   //   2. bucket id part, some numbers, starts with "_"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index ed8ccca6dee24fc324a2168e192745f058133a68..733ba185287e18fc966b64162fddab12ba5db38e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -43,7 +43,7 @@ import org.apache.spark.unsafe.types.UTF8String
  * Replaces generic operations with specific variants that are designed to work with Spark
  * SQL Data Sources.
  */
-private[sql] case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
+case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
 
   def resolver: Resolver = {
     if (conf.caseSensitiveAnalysis) {
@@ -53,8 +53,8 @@ private[sql] case class DataSourceAnalysis(conf: CatalystConf) extends Rule[Logi
     }
   }
 
-  // The access modifier is used to expose this method to tests.
-  private[sql] def convertStaticPartitions(
+  // Visible for testing.
+  def convertStaticPartitions(
       sourceAttributes: Seq[Attribute],
       providedPartitions: Map[String, Option[String]],
       targetAttributes: Seq[Attribute],
@@ -202,7 +202,7 @@ private[sql] case class DataSourceAnalysis(conf: CatalystConf) extends Rule[Logi
  * Replaces [[SimpleCatalogRelation]] with data source table if its table property contains data
  * source information.
  */
-private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
   private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = {
     val schema = DDLUtils.getSchemaFromTableProperties(table)
 
@@ -242,7 +242,7 @@ private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[
 /**
  * A Strategy for planning scans over data sources defined using the sources API.
  */
-private[sql] object DataSourceStrategy extends Strategy with Logging {
+object DataSourceStrategy extends Strategy with Logging {
   def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
     case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _)) =>
       pruneFilterProjectRaw(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 3ac09d99c7a33392352dfd884680bb3cbe3e1c82..8b36caf6f1e087935a84cc8297d2b2984097d750 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -18,14 +18,11 @@
 package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{expressions, InternalRow}
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.SparkPlan
 
@@ -52,7 +49,7 @@ import org.apache.spark.sql.execution.SparkPlan
  *     is under the threshold with the addition of the next file, add it.  If not, open a new bucket
  *     and add it.  Proceed to the next file.
  */
-private[sql] object FileSourceStrategy extends Strategy with Logging {
+object FileSourceStrategy extends Strategy with Logging {
   def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
     case PhysicalOperation(projects, filters,
       l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table)) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
index 8549ae96e2f39c5afb9c13760a372b5139708d29..b2ff68a833fea5631286ad04cfda372866c20d06 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.sources.InsertableRelation
 /**
  * Inserts the results of `query` in to a relation that extends [[InsertableRelation]].
  */
-private[sql] case class InsertIntoDataSourceCommand(
+case class InsertIntoDataSourceCommand(
     logicalRelation: LogicalRelation,
     query: LogicalPlan,
     overwrite: Boolean)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index b49525c8ceda9aaa054ca970ee722282ff54c88f..de822180ab5fac78e1a7b0234fdc73e8319f2eba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -55,7 +55,7 @@ import org.apache.spark.sql.internal.SQLConf
  *   4. If all tasks are committed, commit the job, otherwise aborts the job;  If any exception is
  *      thrown during job commitment, also aborts the job.
  */
-private[sql] case class InsertIntoHadoopFsRelationCommand(
+case class InsertIntoHadoopFsRelationCommand(
     outputPath: Path,
     partitionColumns: Seq[Attribute],
     bucketSpec: Option[BucketSpec],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index c3561099d6842419ae2b58b230277f7a7996fc16..504464216e5a41f9a327fe99296297f095c4b558 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
 import org.apache.spark.sql.types._
 
+// TODO: We should tighten up visibility of the classes here once we clean up Hive coupling.
 
 object PartitionDirectory {
   def apply(values: InternalRow, path: String): PartitionDirectory =
@@ -41,22 +42,23 @@ object PartitionDirectory {
  * Holds a directory in a partitioned collection of files as well as as the partition values
  * in the form of a Row.  Before scanning, the files at `path` need to be enumerated.
  */
-private[sql] case class PartitionDirectory(values: InternalRow, path: Path)
+case class PartitionDirectory(values: InternalRow, path: Path)
 
-private[sql] case class PartitionSpec(
+case class PartitionSpec(
     partitionColumns: StructType,
     partitions: Seq[PartitionDirectory])
 
-private[sql] object PartitionSpec {
+object PartitionSpec {
   val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[PartitionDirectory])
 }
 
-private[sql] object PartitioningUtils {
+object PartitioningUtils {
   // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't
   // depend on Hive.
-  private[sql] val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__"
+  val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__"
 
-  private[sql] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
+  private[datasources] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal])
+  {
     require(columnNames.size == literals.size)
   }
 
@@ -83,7 +85,7 @@ private[sql] object PartitioningUtils {
    *         path = "hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28")))
    * }}}
    */
-  private[sql] def parsePartitions(
+  private[datasources] def parsePartitions(
       paths: Seq[Path],
       defaultPartitionName: String,
       typeInference: Boolean,
@@ -166,7 +168,7 @@ private[sql] object PartitioningUtils {
    *   hdfs://<host>:<port>/path/to/partition
    * }}}
    */
-  private[sql] def parsePartition(
+  private[datasources] def parsePartition(
       path: Path,
       defaultPartitionName: String,
       typeInference: Boolean,
@@ -249,7 +251,7 @@ private[sql] object PartitioningUtils {
    *   DoubleType -> StringType
    * }}}
    */
-  private[sql] def resolvePartitions(
+  def resolvePartitions(
       pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
     if (pathsWithPartitionValues.isEmpty) {
       Seq.empty
@@ -275,7 +277,7 @@ private[sql] object PartitioningUtils {
     }
   }
 
-  private[sql] def listConflictingPartitionColumns(
+  private[datasources] def listConflictingPartitionColumns(
       pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = {
     val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct
 
@@ -308,7 +310,7 @@ private[sql] object PartitioningUtils {
    * [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.SYSTEM_DEFAULT]], and
    * [[StringType]].
    */
-  private[sql] def inferPartitionColumnValue(
+  private[datasources] def inferPartitionColumnValue(
       raw: String,
       defaultPartitionName: String,
       typeInference: Boolean): Literal = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index c801436b0a6435ef1e045467f340b3885f0bbd6b..447c237e3a1b03a689cbccfe8f24f9347055db72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -41,14 +41,14 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
 
 
 /** A container for all the details required when writing to a table. */
-case class WriteRelation(
+private[datasources] case class WriteRelation(
     sparkSession: SparkSession,
     dataSchema: StructType,
     path: String,
     prepareJobForWrite: Job => OutputWriterFactory,
     bucketSpec: Option[BucketSpec])
 
-private[sql] abstract class BaseWriterContainer(
+private[datasources] abstract class BaseWriterContainer(
     @transient val relation: WriteRelation,
     @transient private val job: Job,
     isAppend: Boolean)
@@ -235,7 +235,7 @@ private[sql] abstract class BaseWriterContainer(
 /**
  * A writer that writes all of the rows in a partition to a single file.
  */
-private[sql] class DefaultWriterContainer(
+private[datasources] class DefaultWriterContainer(
     relation: WriteRelation,
     job: Job,
     isAppend: Boolean)
@@ -294,7 +294,7 @@ private[sql] class DefaultWriterContainer(
  * done by maintaining a HashMap of open files until `maxFiles` is reached.  If this occurs, the
  * writer externally sorts the remaining rows and then writes out them out one file at a time.
  */
-private[sql] class DynamicPartitionWriterContainer(
+private[datasources] class DynamicPartitionWriterContainer(
     relation: WriteRelation,
     job: Job,
     partitionColumns: Seq[Attribute],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 22fb8163b1c0a671a71cb02342ae6a6978b05f2a..10fe541a2c575ad08183495f82a42ea2f8fa409a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
 
-private[sql] class CSVOptions(@transient private val parameters: Map[String, String])
+private[csv] class CSVOptions(@transient private val parameters: Map[String, String])
   extends Logging with Serializable {
 
   private def getChar(paramName: String, default: Char): Char = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala
index 13ae76d49893a4a1dfa9cca190e43ffa39326a33..64bdd6f4643dce603f4b457d3c8201c65844d5c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala
@@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
  *
  * @param params Parameters object
  */
-private[sql] class CsvReader(params: CSVOptions) {
+private[csv] class CsvReader(params: CSVOptions) {
 
   private val parser: CsvParser = {
     val settings = new CsvParserSettings()
@@ -65,7 +65,7 @@ private[sql] class CsvReader(params: CSVOptions) {
  * @param params Parameters object for configuration
  * @param headers headers for columns
  */
-private[sql] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) extends Logging {
+private[csv] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) extends Logging {
   private val writerSettings = new CsvWriterSettings
   private val format = writerSettings.getFormat
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index c6ba424d8687595c9b3675c4ab44523e4a04a7c1..6b2f9fc61e6776812b205f359e869c1249b9f9f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -168,7 +168,7 @@ object CSVRelation extends Logging {
   }
 }
 
-private[sql] class CSVOutputWriterFactory(params: CSVOptions) extends OutputWriterFactory {
+private[csv] class CSVOutputWriterFactory(params: CSVOptions) extends OutputWriterFactory {
   override def newInstance(
       path: String,
       bucketId: Option[Int],
@@ -179,7 +179,7 @@ private[sql] class CSVOutputWriterFactory(params: CSVOptions) extends OutputWrit
   }
 }
 
-private[sql] class CsvOutputWriter(
+private[csv] class CsvOutputWriter(
     path: String,
     dataSchema: StructType,
     context: TaskAttemptContext,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 5ce8350de207fd18ac58cfc1aee465e407d15426..f068779b3e047d563bbc6a7c8cf75584f61405c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -76,7 +76,7 @@ abstract class OutputWriterFactory extends Serializable {
    * through the [[OutputWriterFactory]] implementation.
    * @since 2.0.0
    */
-  private[sql] def newWriter(path: String): OutputWriter = {
+  def newWriter(path: String): OutputWriter = {
     throw new UnsupportedOperationException("newInstance with just path not supported")
   }
 }
@@ -263,7 +263,7 @@ trait FileFormat {
    * appends partition values to [[InternalRow]]s produced by the reader function [[buildReader]]
    * returns.
    */
-  private[sql] def buildReaderWithPartitionValues(
+  def buildReaderWithPartitionValues(
       sparkSession: SparkSession,
       dataSchema: StructType,
       partitionSchema: StructType,
@@ -357,7 +357,7 @@ trait FileCatalog {
 /**
  * Helper methods for gathering metadata from HDFS.
  */
-private[sql] object HadoopFsRelation extends Logging {
+object HadoopFsRelation extends Logging {
 
   /** Checks if we should filter out this path name. */
   def shouldFilterOut(pathName: String): Boolean = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index e267e77c527fa3c26f7c5bd670eb06dc1a4ae71b..6dad8cbef720020ac71a9be25b8247b9657a58e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -38,11 +38,11 @@ import org.apache.spark.unsafe.types.UTF8String
 /**
  * Data corresponding to one partition of a JDBCRDD.
  */
-private[sql] case class JDBCPartition(whereClause: String, idx: Int) extends Partition {
+case class JDBCPartition(whereClause: String, idx: Int) extends Partition {
   override def index: Int = idx
 }
 
-private[sql] object JDBCRDD extends Logging {
+object JDBCRDD extends Logging {
 
   /**
    * Maps a JDBC type to a Catalyst type.  This function is called only when
@@ -192,7 +192,7 @@ private[sql] object JDBCRDD extends Logging {
    * Turns a single Filter into a String representing a SQL expression.
    * Returns None for an unhandled filter.
    */
-  private[jdbc] def compileFilter(f: Filter): Option[String] = {
+  def compileFilter(f: Filter): Option[String] = {
     Option(f match {
       case EqualTo(attr, value) => s"$attr = ${compileValue(value)}"
       case EqualNullSafe(attr, value) =>
@@ -275,7 +275,7 @@ private[sql] object JDBCRDD extends Logging {
  * driver code and the workers must be able to access the database; the driver
  * needs to fetch the schema while the workers need to fetch the data.
  */
-private[sql] class JDBCRDD(
+private[jdbc] class JDBCRDD(
     sc: SparkContext,
     getConnection: () => Connection,
     schema: StructType,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index ea32506c09d57c80eaecc3b948c2e960706499ee..612a295c0e3138ff6900a45dc30eaef28cf129e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -51,7 +51,7 @@ import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.SerializableConfiguration
 
-private[sql] class ParquetFileFormat
+class ParquetFileFormat
   extends FileFormat
   with DataSourceRegister
   with Logging
@@ -268,7 +268,7 @@ private[sql] class ParquetFileFormat
     true
   }
 
-  override private[sql] def buildReaderWithPartitionValues(
+  override def buildReaderWithPartitionValues(
       sparkSession: SparkSession,
       dataSchema: StructType,
       partitionSchema: StructType,
@@ -424,7 +424,7 @@ private[sql] class ParquetFileFormat
  * writes the data to the path used to generate the output writer. Callers of this factory
  * has to ensure which files are to be considered as committed.
  */
-private[sql] class ParquetOutputWriterFactory(
+private[parquet] class ParquetOutputWriterFactory(
     sqlConf: SQLConf,
     dataSchema: StructType,
     hadoopConf: Configuration,
@@ -473,7 +473,7 @@ private[sql] class ParquetOutputWriterFactory(
    * Returns a [[OutputWriter]] that writes data to the give path without using
    * [[OutputCommitter]].
    */
-  override private[sql] def newWriter(path: String): OutputWriter = new OutputWriter {
+  override def newWriter(path: String): OutputWriter = new OutputWriter {
 
     // Create TaskAttemptContext that is used to pass on Configuration to the ParquetRecordWriter
     private val hadoopTaskAttemptId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0)
@@ -520,7 +520,7 @@ private[sql] class ParquetOutputWriterFactory(
 
 
 // NOTE: This class is instantiated and used on executor side only, no need to be serializable.
-private[sql] class ParquetOutputWriter(
+private[parquet] class ParquetOutputWriter(
     path: String,
     bucketId: Option[Int],
     context: TaskAttemptContext)
@@ -558,12 +558,13 @@ private[sql] class ParquetOutputWriter(
 
   override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
 
-  override protected[sql] def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row)
+  override def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row)
 
   override def close(): Unit = recordWriter.close(context)
 }
 
-private[sql] object ParquetFileFormat extends Logging {
+
+object ParquetFileFormat extends Logging {
   /**
    * If parquet's block size (row group size) setting is larger than the min split size,
    * we use parquet's block size setting as the min split size. Otherwise, we will create
@@ -710,7 +711,7 @@ private[sql] object ParquetFileFormat extends Logging {
    * distinguish binary and string).  This method generates a correct schema by merging Metastore
    * schema data types and Parquet schema field names.
    */
-  private[sql] def mergeMetastoreParquetSchema(
+  def mergeMetastoreParquetSchema(
       metastoreSchema: StructType,
       parquetSchema: StructType): StructType = {
     def schemaConflictMessage: String =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 426263fa445a0b2fc93045ce39570afa7cefe8e1..a6e978809772871a5a7516d3fc9e52e1d71a7022 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.types._
 /**
  * Some utility function to convert Spark data source filters to Parquet filters.
  */
-private[sql] object ParquetFilters {
+private[parquet] object ParquetFilters {
 
   private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
     case BooleanType =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index dd2e915e7b7f97fdd1d28f64923fdf1b144711a6..3eec582714e1582a90585732ff214b446fea36fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.internal.SQLConf
 /**
  * Options for the Parquet data source.
  */
-private[sql] class ParquetOptions(
+private[parquet] class ParquetOptions(
     @transient private val parameters: Map[String, String],
     @transient private val sqlConf: SQLConf)
   extends Serializable {
@@ -56,8 +56,8 @@ private[sql] class ParquetOptions(
 }
 
 
-private[sql] object ParquetOptions {
-  private[sql] val MERGE_SCHEMA = "mergeSchema"
+object ParquetOptions {
+  val MERGE_SCHEMA = "mergeSchema"
 
   // The parquet compression short names
   private val shortParquetCompressionCodecNames = Map(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index d5b92323d44181f750ed0fc365fc9e856690b3a4..c133dda13e3fa0c8c946223f574ce49d2712d6ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.types.{AtomicType, StructType}
 /**
  * Try to replaces [[UnresolvedRelation]]s with [[ResolveDataSource]].
  */
-private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case u: UnresolvedRelation if u.tableIdentifier.database.isDefined =>
       try {
@@ -195,7 +195,7 @@ case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
  * table. It also does data type casting and field renaming, to make sure that the columns to be
  * inserted have the correct data type and fields have the correct names.
  */
-private[sql] case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
+case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
   private def preprocess(
       insert: InsertIntoTable,
       tblName: String,
@@ -275,7 +275,7 @@ private[sql] case class PreprocessTableInsertion(conf: SQLConf) extends Rule[Log
 /**
  * A rule to do various checks before inserting into or writing to a data source table.
  */
-private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
+case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
   extends (LogicalPlan => Unit) {
 
   def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index e89f792496d6a4696b2d2a3ead46189abc880b53..082f97a8808fad96e26d0eaf5885c275501e8da5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -104,7 +104,7 @@ package object debug {
     }
   }
 
-  private[sql] case class DebugExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
+  case class DebugExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
     def output: Seq[Attribute] = child.output
 
     class SetAccumulator[T] extends AccumulatorV2[T, HashSet[T]] {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index bd0841db7e8ab8ead61f7ddd8cdff18242b68d89..a809076de5419882f0ead846c3dbfa5c3d840d88 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -38,7 +38,7 @@ case class BroadcastExchangeExec(
     mode: BroadcastMode,
     child: SparkPlan) extends Exchange {
 
-  override private[sql] lazy val metrics = Map(
+  override lazy val metrics = Map(
     "dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"),
     "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)"),
     "buildTime" -> SQLMetrics.createMetric(sparkContext, "time to build (ms)"),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
index 2ea6ee38a932a442c3c1ee065727ffd88bf2d6c1..57da85fa84f9914b5fa742278c1d23a98de9bd4f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
@@ -79,7 +79,7 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan}
  *  - post-shuffle partition 1: pre-shuffle partition 2
  *  - post-shuffle partition 2: pre-shuffle partition 3 and 4
  */
-private[sql] class ExchangeCoordinator(
+class ExchangeCoordinator(
     numExchanges: Int,
     advisoryTargetPostShuffleInputSize: Long,
     minNumPostShufflePartitions: Option[Int] = None)
@@ -112,7 +112,7 @@ private[sql] class ExchangeCoordinator(
    * Estimates partition start indices for post-shuffle partitions based on
    * mapOutputStatistics provided by all pre-shuffle stages.
    */
-  private[sql] def estimatePartitionStartIndices(
+  def estimatePartitionStartIndices(
       mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = {
     // If we have mapOutputStatistics.length < numExchange, it is because we do not submit
     // a stage when the number of partitions of this dependency is 0.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
index afe0fbea73bd9c639ed5645b19a41824da9e1442..7a4a251370706b7f064d446fd9e9f2a32605663c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
@@ -40,7 +40,7 @@ case class ShuffleExchange(
     child: SparkPlan,
     @transient coordinator: Option[ExchangeCoordinator]) extends Exchange {
 
-  override private[sql] lazy val metrics = Map(
+  override lazy val metrics = Map(
     "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"))
 
   override def nodeName: String = {
@@ -81,7 +81,8 @@ case class ShuffleExchange(
    * the partitioning scheme defined in `newPartitioning`. Those partitions of
    * the returned ShuffleDependency will be the input of shuffle.
    */
-  private[sql] def prepareShuffleDependency(): ShuffleDependency[Int, InternalRow, InternalRow] = {
+  private[exchange] def prepareShuffleDependency()
+    : ShuffleDependency[Int, InternalRow, InternalRow] = {
     ShuffleExchange.prepareShuffleDependency(
       child.execute(), child.output, newPartitioning, serializer)
   }
@@ -92,7 +93,7 @@ case class ShuffleExchange(
    * partition start indices array. If this optional array is defined, the returned
    * [[ShuffledRowRDD]] will fetch pre-shuffle partitions based on indices of this array.
    */
-  private[sql] def preparePostShuffleRDD(
+  private[exchange] def preparePostShuffleRDD(
       shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow],
       specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD = {
     // If an array of partition start indices is provided, we need to use this array
@@ -194,7 +195,7 @@ object ShuffleExchange {
    * the partitioning scheme defined in `newPartitioning`. Those partitions of
    * the returned ShuffleDependency will be the input of shuffle.
    */
-  private[sql] def prepareShuffleDependency(
+  def prepareShuffleDependency(
       rdd: RDD[InternalRow],
       outputAttributes: Seq[Attribute],
       newPartitioning: Partitioning,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index 7c194ab72643afbcc1d095a84faf55ba56ecfa75..0f24baacd18d6465e51fdfeb84f1e5d7abc44680 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -45,7 +45,7 @@ case class BroadcastHashJoinExec(
     right: SparkPlan)
   extends BinaryExecNode with HashJoin with CodegenSupport {
 
-  override private[sql] lazy val metrics = Map(
+  override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   override def requiredChildDistribution: Seq[Distribution] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
index 4d43765f8fcd36e306a36077cac2919b45ef2e9a..6a9965f1a24cd6d0fd0136314249600d231d9710 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
@@ -37,7 +37,7 @@ case class BroadcastNestedLoopJoinExec(
     condition: Option[Expression],
     withinBroadcastThreshold: Boolean = true) extends BinaryExecNode {
 
-  override private[sql] lazy val metrics = Map(
+  override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   /** BuildRight means the right relation <=> the broadcast relation. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
index 0553086a226e7d94411a0e800772ccb86c41d888..57866df90d27dfb0a2104c1e58eae01295164cf2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
@@ -34,7 +34,6 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
  * will be much faster than building the right partition for every row in left RDD, it also
  * materialize the right RDD (in case of the right RDD is nondeterministic).
  */
-private[spark]
 class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numFieldsOfRight: Int)
   extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) {
 
@@ -78,7 +77,7 @@ class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numField
       for (x <- rdd1.iterator(partition.s1, context);
            y <- createIter()) yield (x, y)
     CompletionIterator[(UnsafeRow, UnsafeRow), Iterator[(UnsafeRow, UnsafeRow)]](
-      resultIter, sorter.cleanupResources)
+      resultIter, sorter.cleanupResources())
   }
 }
 
@@ -89,7 +88,7 @@ case class CartesianProductExec(
     condition: Option[Expression]) extends BinaryExecNode {
   override def output: Seq[Attribute] = left.output ++ right.output
 
-  override private[sql] lazy val metrics = Map(
+  override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   protected override def doPrepare(): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
index 0036f9aadc5d9512651e19142b96c57113745c0f..afb6e5e3dd235f698ac939b6aded7a5ffe949164 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
@@ -39,7 +39,7 @@ case class ShuffledHashJoinExec(
     right: SparkPlan)
   extends BinaryExecNode with HashJoin {
 
-  override private[sql] lazy val metrics = Map(
+  override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
     "buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"),
     "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index fac6b8de8ed5e68f17c2ecb25c08bd57cae767da..5c9c1e6062f0d2eec8d1eabf44f0499d9abf8f20 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -40,7 +40,7 @@ case class SortMergeJoinExec(
     left: SparkPlan,
     right: SparkPlan) extends BinaryExecNode with CodegenSupport {
 
-  override private[sql] lazy val metrics = Map(
+  override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   override def output: Seq[Attribute] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 9817a56f499a55972e0302855df67495c775d98f..15afa0b1a5391cb1ebf3a91992d21f5825cd9906 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -55,17 +55,17 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato
   override def value: Long = _value
 
   // Provide special identifier as metadata so we can tell that this is a `SQLMetric` later
-  private[spark] override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
+  override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
     new AccumulableInfo(
       id, name, update, value, true, true, Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
   }
 }
 
 
-private[sql] object SQLMetrics {
-  private[sql] val SUM_METRIC = "sum"
-  private[sql] val SIZE_METRIC = "size"
-  private[sql] val TIMING_METRIC = "timing"
+object SQLMetrics {
+  private val SUM_METRIC = "sum"
+  private val SIZE_METRIC = "size"
+  private val TIMING_METRIC = "timing"
 
   def createMetric(sc: SparkContext, name: String): SQLMetric = {
     val acc = new SQLMetric(SUM_METRIC)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index 829bcae6f95d4f9f1318f66d89cb0092c20759bb..16e44845d52839e46d4fb9332681bac60cfc14a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.SparkPlan
  * Extracts all the Python UDFs in logical aggregate, which depends on aggregate expression or
  * grouping key, evaluate them after aggregate.
  */
-private[spark] object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
+object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
 
   /**
    * Returns whether the expression could only be evaluated within aggregate.
@@ -90,7 +90,7 @@ private[spark] object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
  * This has the limitation that the input to the Python UDF is not allowed include attributes from
  * multiple child operators.
  */
-private[spark] object ExtractPythonUDFs extends Rule[SparkPlan] {
+object ExtractPythonUDFs extends Rule[SparkPlan] {
 
   private def hasPythonUDF(e: Expression): Boolean = {
     e.find(_.isInstanceOf[PythonUDF]).isDefined
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala
index 70539da348b0e0113cbc86ee752be433773611e8..d2178e971ec20fbf3d6d3e56fcb3983e468714ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala
@@ -21,12 +21,12 @@ import org.apache.spark.api.r._
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.api.r.SQLUtils._
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
+import org.apache.spark.sql.types.StructType
 
 /**
  * A function wrapper that applies the given R function to each partition.
  */
-private[sql] case class MapPartitionsRWrapper(
+case class MapPartitionsRWrapper(
     func: Array[Byte],
     packageNames: Array[Byte],
     broadcastVars: Array[Broadcast[Object]],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
index b19344f04383f7dfabe17ccbf244af24a97fd8bd..b9dbfcf7734c3c0f7fdb2e822a386e8760b994fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.types._
 
-private[sql] object FrequentItems extends Logging {
+object FrequentItems extends Logging {
 
   /** A helper class wrapping `MutableMap[Any, Long]` for simplicity. */
   private class FreqItemCounter(size: Int) extends Serializable {
@@ -79,7 +79,7 @@ private[sql] object FrequentItems extends Logging {
    *                than 1e-4.
    * @return A Local DataFrame with the Array of frequent items for each column.
    */
-  private[sql] def singlePassFreqItems(
+  def singlePassFreqItems(
       df: DataFrame,
       cols: Seq[String],
       support: Double): DataFrame = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
index ea58df70b32522de4b10ce713f16de5a9d3353c5..50eecb409830fa901a2ef55de350626de8205787 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
-private[sql] object StatFunctions extends Logging {
+object StatFunctions extends Logging {
 
   import QuantileSummaries.Stats
 
@@ -337,7 +337,7 @@ private[sql] object StatFunctions extends Logging {
   }
 
   /** Calculate the Pearson Correlation Coefficient for the given columns */
-  private[sql] def pearsonCorrelation(df: DataFrame, cols: Seq[String]): Double = {
+  def pearsonCorrelation(df: DataFrame, cols: Seq[String]): Double = {
     val counts = collectStatisticalData(df, cols, "correlation")
     counts.Ck / math.sqrt(counts.MkX * counts.MkY)
   }
@@ -407,13 +407,13 @@ private[sql] object StatFunctions extends Logging {
    * @param cols the column names
    * @return the covariance of the two columns.
    */
-  private[sql] def calculateCov(df: DataFrame, cols: Seq[String]): Double = {
+  def calculateCov(df: DataFrame, cols: Seq[String]): Double = {
     val counts = collectStatisticalData(df, cols, "covariance")
     counts.cov
   }
 
   /** Generate a table of frequencies for the elements of two columns. */
-  private[sql] def crossTabulate(df: DataFrame, col1: String, col2: String): DataFrame = {
+  def crossTabulate(df: DataFrame, col1: String, col2: String): DataFrame = {
     val tableName = s"${col1}_$col2"
     val counts = df.groupBy(col1, col2).agg(count("*")).take(1e6.toInt)
     if (counts.length == 1e6.toInt) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 7367c68d0a0e53a0e7241b1a59c4e3aeaed8c3bd..05294df2673dc8525ded0a62a9ad285e9beb1381 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.streaming.OutputMode
  * A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]]
  * plan incrementally. Possibly preserving state in between each execution.
  */
-class IncrementalExecution private[sql](
+class IncrementalExecution(
     sparkSession: SparkSession,
     logicalPlan: LogicalPlan,
     val outputMode: OutputMode,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index af2229a46bebbf418a7ff1ee5c30a045c48e3221..66fb5a4bdeb7fe5c2cef469e8ed03ee0b7606806 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -49,10 +49,10 @@ class StreamExecution(
     override val id: Long,
     override val name: String,
     checkpointRoot: String,
-    private[sql] val logicalPlan: LogicalPlan,
+    val logicalPlan: LogicalPlan,
     val sink: Sink,
     val trigger: Trigger,
-    private[sql] val triggerClock: Clock,
+    val triggerClock: Clock,
     val outputMode: OutputMode)
   extends StreamingQuery with Logging {
 
@@ -74,7 +74,7 @@ class StreamExecution(
    * input source.
    */
   @volatile
-  private[sql] var committedOffsets = new StreamProgress
+  var committedOffsets = new StreamProgress
 
   /**
    * Tracks the offsets that are available to be processed, but have not yet be committed to the
@@ -102,10 +102,10 @@ class StreamExecution(
   private var state: State = INITIALIZED
 
   @volatile
-  private[sql] var lastExecution: QueryExecution = null
+  var lastExecution: QueryExecution = null
 
   @volatile
-  private[sql] var streamDeathCause: StreamingQueryException = null
+  var streamDeathCause: StreamingQueryException = null
 
   /* Get the call site in the caller thread; will pass this into the micro batch thread */
   private val callSite = Utils.getCallSite()
@@ -115,7 +115,7 @@ class StreamExecution(
    * [[org.apache.spark.util.UninterruptibleThread]] to avoid potential deadlocks in using
    * [[HDFSMetadataLog]]. See SPARK-14131 for more details.
    */
-  private[sql] val microBatchThread =
+  val microBatchThread =
     new UninterruptibleThread(s"stream execution thread for $name") {
       override def run(): Unit = {
         // To fix call site like "run at <unknown>:0", we bridge the call site from the caller
@@ -131,8 +131,7 @@ class StreamExecution(
    * processing is done.  Thus, the Nth record in this log indicated data that is currently being
    * processed and the N-1th entry indicates which offsets have been durably committed to the sink.
    */
-  private[sql] val offsetLog =
-    new HDFSMetadataLog[CompositeOffset](sparkSession, checkpointFile("offsets"))
+  val offsetLog = new HDFSMetadataLog[CompositeOffset](sparkSession, checkpointFile("offsets"))
 
   /** Whether the query is currently active or not */
   override def isActive: Boolean = state == ACTIVE
@@ -159,7 +158,7 @@ class StreamExecution(
    * Starts the execution. This returns only after the thread has started and [[QueryStarted]] event
    * has been posted to all the listeners.
    */
-  private[sql] def start(): Unit = {
+  def start(): Unit = {
     microBatchThread.setDaemon(true)
     microBatchThread.start()
     startLatch.await()  // Wait until thread started and QueryStart event has been posted
@@ -518,7 +517,7 @@ class StreamExecution(
   case object TERMINATED extends State
 }
 
-private[sql] object StreamExecution {
+object StreamExecution {
   private val _nextId = new AtomicLong(0)
 
   def nextId: Long = _nextId.getAndIncrement()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
index 405a5f0387a7e9ae0ded4484b2598c3683c06865..db0bd9e6bc6f0bef3966adb7dc027bd2d7220e6e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
@@ -26,7 +26,7 @@ class StreamProgress(
     val baseMap: immutable.Map[Source, Offset] = new immutable.HashMap[Source, Offset])
   extends scala.collection.immutable.Map[Source, Offset] {
 
-  private[sql] def toCompositeOffset(source: Seq[Source]): CompositeOffset = {
+  def toCompositeOffset(source: Seq[Source]): CompositeOffset = {
     CompositeOffset(source.map(get))
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 066765324ac9440e74ec7977d7bc39e8a5c6071d..a67fdceb3cee6a5cd04b40fbe378d69d88d87dfd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -113,7 +113,7 @@ case class KeyRemoved(key: UnsafeRow) extends StoreUpdate
  * the store is the active instance. Accordingly, it either keeps it loaded and performs
  * maintenance, or unloads the store.
  */
-private[sql] object StateStore extends Logging {
+object StateStore extends Logging {
 
   val MAINTENANCE_INTERVAL_CONFIG = "spark.sql.streaming.stateStore.maintenanceInterval"
   val MAINTENANCE_INTERVAL_DEFAULT_SECS = 60
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
index e418217238cca94c7695101e46d19ff9348d1eea..d945d7aff2da4ee1503959dc82e51f1a02465987 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
@@ -45,7 +45,7 @@ private object StopCoordinator
   extends StateStoreCoordinatorMessage
 
 /** Helper object used to create reference to [[StateStoreCoordinator]]. */
-private[sql] object StateStoreCoordinatorRef extends Logging {
+object StateStoreCoordinatorRef extends Logging {
 
   private val endpointName = "StateStoreCoordinator"
 
@@ -77,7 +77,7 @@ private[sql] object StateStoreCoordinatorRef extends Logging {
  * Reference to a [[StateStoreCoordinator]] that can be used to coordinate instances of
  * [[StateStore]]s across all the executors, and get their locations for job scheduling.
  */
-private[sql] class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) {
+class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) {
 
   private[state] def reportActiveInstance(
       storeId: StateStoreId,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
index 4b4fa126b85f36101ccb40e8e6c7e85dbe4e2d61..23fc0bd0bce13b76e2fed9ceefb52a6e6481531c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
@@ -24,7 +24,7 @@ import scala.xml.Node
 import org.apache.spark.internal.Logging
 import org.apache.spark.ui.{UIUtils, WebUIPage}
 
-private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging {
+class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging {
 
   private val listener = parent.listener
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 6e947919017623ed88d990d00fa634a21c6825fe..60f13432d78d2461ada4c1dba4ffb2151c8ab699 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -46,14 +46,14 @@ case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
 case class SparkListenerDriverAccumUpdates(executionId: Long, accumUpdates: Seq[(Long, Long)])
   extends SparkListenerEvent
 
-private[sql] class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {
+class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {
 
   override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = {
     List(new SQLHistoryListener(conf, sparkUI))
   }
 }
 
-private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Logging {
+class SQLListener(conf: SparkConf) extends SparkListener with Logging {
 
   private val retainedExecutions = conf.getInt("spark.sql.ui.retainedExecutions", 1000)
 
@@ -333,7 +333,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
 /**
  * A [[SQLListener]] for rendering the SQL UI in the history server.
  */
-private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
+class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
   extends SQLListener(conf) {
 
   private var sqlTabAttached = false
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
index e8675ce749a2baf384e0ee7922254a1cc486927a..d0376af3e31caa591344c36f9b1012d1438d86e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.ui
 import org.apache.spark.internal.Logging
 import org.apache.spark.ui.{SparkUI, SparkUITab}
 
-private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
+class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
   extends SparkUITab(sparkUI, "SQL") with Logging {
 
   val parent = sparkUI
@@ -32,6 +32,6 @@ private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
   parent.addStaticHandler(SQLTab.STATIC_RESOURCE_DIR, "/static/sql")
 }
 
-private[sql] object SQLTab {
+object SQLTab {
   private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static"
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 8f5681bfc7cc62ef8966d8414051970598ed593f..4bb9d6fef4c1dffbda1d980ac79388202021ebda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable
 import org.apache.commons.lang3.StringEscapeUtils
 
 import org.apache.spark.sql.execution.{SparkPlanInfo, WholeStageCodegenExec}
-import org.apache.spark.sql.execution.metric.SQLMetrics
+
 
 /**
  * A graph used for storing information of an executionPlan of DataFrame.
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
  * Each graph is defined with a set of nodes and a set of edges. Each node represents a node in the
  * SparkPlan tree, and each edge represents a parent-child relationship between two nodes.
  */
-private[ui] case class SparkPlanGraph(
+case class SparkPlanGraph(
     nodes: Seq[SparkPlanGraphNode], edges: Seq[SparkPlanGraphEdge]) {
 
   def makeDotFile(metrics: Map[Long, String]): String = {
@@ -55,7 +55,7 @@ private[ui] case class SparkPlanGraph(
   }
 }
 
-private[sql] object SparkPlanGraph {
+object SparkPlanGraph {
 
   /**
    * Build a SparkPlanGraph from the root of a SparkPlan tree.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 6c43fe3177d65da7893ac88776ed250f59e1552b..54aee5e02bb9c19cb27df457fe8236c3b81f9c8a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.internal
 
-import org.apache.hadoop.conf.Configuration
-
 import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{SparkSession, SQLContext}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index cc3e74b4e8cccec8985ae518b9f80e4da2d3037c..a716a3eab6219251bdf8351a118d0be4e0c39435 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -54,7 +54,7 @@ case class HiveTableScanExec(
   require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
     "Partition pruning predicates only supported for partitioned tables.")
 
-  private[sql] override lazy val metrics = Map(
+  override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   override def producedAttributes: AttributeSet = outputSet ++