diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
similarity index 95%
rename from sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
rename to sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 4fc72b9e47597f973ad38dcee53487d312b3d808..9b65419dba234988fa4f054d4de7b91b27cfed0f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -29,17 +29,25 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.apache.spark.sql.execution.joins.SortMergeJoinExec
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 import org.apache.spark.util.Utils
 import org.apache.spark.util.collection.BitSet
 
-class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedSQLContext {
+  protected override def beforeAll(): Unit = {
+    super.beforeAll()
+    assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
+  }
+}
+
+
+abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
   import testImplicits._
 
-  private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
-  private val nullDF = (for {
+  private lazy val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
+  private lazy val nullDF = (for {
     i <- 0 to 50
     s <- Seq(null, "a", "b", "c", "d", "e", "f", null, "g")
   } yield (i % 5, s, i % 13)).toDF("i", "j", "k")
@@ -224,8 +232,10 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
     }
   }
 
-  private val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1")
-  private val df2 = (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2")
+  private lazy val df1 =
+    (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1")
+  private lazy val df2 =
+    (0 until 50).map(i => (i % 7, i % 11, i.toString)).toDF("i", "j", "k").as("df2")
 
   case class BucketedTableTestSpec(
       bucketSpec: Option[BucketSpec],
@@ -535,7 +545,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
   test("error if there exists any malformed bucket files") {
     withTable("bucketed_table") {
       df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
-      val warehouseFilePath = new URI(hiveContext.sparkSession.getWarehousePath).getPath
+      val warehouseFilePath = new URI(spark.sessionState.conf.warehousePath).getPath
       val tableDir = new File(warehouseFilePath, "bucketed_table")
       Utils.deleteRecursively(tableDir)
       df1.write.parquet(tableDir.getAbsolutePath)
@@ -553,9 +563,9 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
     withTable("bucketed_table") {
       df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
 
-      checkAnswer(hiveContext.table("bucketed_table").select("j"), df1.select("j"))
+      checkAnswer(spark.table("bucketed_table").select("j"), df1.select("j"))
 
-      checkAnswer(hiveContext.table("bucketed_table").groupBy("j").agg(max("k")),
+      checkAnswer(spark.table("bucketed_table").groupBy("j").agg(max("k")),
         df1.groupBy("j").agg(max("k")))
     }
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
similarity index 88%
rename from sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
rename to sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 61cef2a8008f23aa203a4d2aa32def86cbb5caa7..9082261af7b0067d313666d465c2c4c06b2e0a7f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -20,19 +20,29 @@ package org.apache.spark.sql.sources
 import java.io.File
 import java.net.URI
 
-import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, QueryTest}
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.execution.datasources.BucketingUtils
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 
-class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+class BucketedWriteWithoutHiveSupportSuite extends BucketedWriteSuite with SharedSQLContext {
+  protected override def beforeAll(): Unit = {
+    super.beforeAll()
+    assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
+  }
+
+  override protected def fileFormatsToTest: Seq[String] = Seq("parquet", "json")
+}
+
+abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils {
   import testImplicits._
 
+  protected def fileFormatsToTest: Seq[String]
+
   test("bucketed by non-existing column") {
     val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
     intercept[AnalysisException](df.write.bucketBy(2, "k").saveAsTable("tt"))
@@ -76,11 +86,13 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
     assert(e.getMessage == "'insertInto' does not support bucketing right now;")
   }
 
-  private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
+  private lazy val df = {
+    (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
+  }
 
   def tableDir: File = {
     val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table")
-    new File(URI.create(hiveContext.sessionState.catalog.hiveDefaultTableFilePath(identifier)))
+    new File(URI.create(spark.sessionState.catalog.defaultTablePath(identifier)))
   }
 
   /**
@@ -141,7 +153,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
   }
 
   test("write bucketed data") {
-    for (source <- Seq("parquet", "json", "orc")) {
+    for (source <- fileFormatsToTest) {
       withTable("bucketed_table") {
         df.write
           .format(source)
@@ -157,7 +169,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
   }
 
   test("write bucketed data with sortBy") {
-    for (source <- Seq("parquet", "json", "orc")) {
+    for (source <- fileFormatsToTest) {
       withTable("bucketed_table") {
         df.write
           .format(source)
@@ -190,7 +202,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
   }
 
   test("write bucketed data without partitionBy") {
-    for (source <- Seq("parquet", "json", "orc")) {
+    for (source <- fileFormatsToTest) {
       withTable("bucketed_table") {
         df.write
           .format(source)
@@ -203,7 +215,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
   }
 
   test("write bucketed data without partitionBy with sortBy") {
-    for (source <- Seq("parquet", "json", "orc")) {
+    for (source <- fileFormatsToTest) {
       withTable("bucketed_table") {
         df.write
           .format(source)
@@ -219,7 +231,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
   test("write bucketed data with bucketing disabled") {
     // The configuration BUCKETING_ENABLED does not affect the writing path
     withSQLConf(SQLConf.BUCKETING_ENABLED.key -> "false") {
-      for (source <- Seq("parquet", "json", "orc")) {
+      for (source <- fileFormatsToTest) {
         withTable("bucketed_table") {
           df.write
             .format(source)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f277f99805a4a6c86083af66f506c9e98b859d5a
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadWithHiveSupportSuite.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+
+class BucketedReadWithHiveSupportSuite extends BucketedReadSuite with TestHiveSingleton {
+  protected override def beforeAll(): Unit = {
+    super.beforeAll()
+    assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive")
+  }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..454e2f65d5d882eeeaea2e3b8eb8fc6207539924
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+
+class BucketedWriteWithHiveSupportSuite extends BucketedWriteSuite with TestHiveSingleton {
+  protected override def beforeAll(): Unit = {
+    super.beforeAll()
+    assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive")
+  }
+
+  override protected def fileFormatsToTest: Seq[String] = Seq("parquet", "orc")
+}