diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 9d997d628579ccac122437d1dd493b0e672c89ec..2c44b399cb95f5c77c489298591700fb14c57650 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -133,23 +133,37 @@ abstract class PartitioningAwareFileCatalog(
   /**
    * Contains a set of paths that are considered as the base dirs of the input datasets.
    * The partitioning discovery logic will make sure it will stop when it reaches any
-   * base path. By default, the paths of the dataset provided by users will be base paths.
-   * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path
-   * will be `/path/something=true/`, and the returned DataFrame will not contain a column of
-   * `something`. If users want to override the basePath. They can set `basePath` in the options
-   * to pass the new base path to the data source.
-   * For the above example, if the user-provided base path is `/path/`, the returned
+   * base path.
+   *
+   * By default, the paths of the dataset provided by users will be base paths.
+   * Below are three typical examples,
+   * Case 1) `sqlContext.read.parquet("/path/something=true/")`: the base path will be
+   * `/path/something=true/`, and the returned DataFrame will not contain a column of `something`.
+   * Case 2) `sqlContext.read.parquet("/path/something=true/a.parquet")`: the base path will be
+   * still `/path/something=true/`, and the returned DataFrame will also not contain a column of
+   * `something`.
+   * Case 3) `sqlContext.read.parquet("/path/")`: the base path will be `/path/`, and the returned
    * DataFrame will have the column of `something`.
+   *
+   * Users also can override the basePath by setting `basePath` in the options to pass the new base
+   * path to the data source.
+   * For example, `sqlContext.read.option("basePath", "/path/").parquet("/path/something=true/")`,
+   * and the returned DataFrame will have the column of `something`.
    */
   private def basePaths: Set[Path] = {
-    val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath)))
-    userDefinedBasePath.getOrElse {
-      // If the user does not provide basePath, we will just use paths.
-      paths.toSet
-    }.map { hdfsPath =>
-      // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
-      val fs = hdfsPath.getFileSystem(hadoopConf)
-      hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+    parameters.get("basePath").map(new Path(_)) match {
+      case Some(userDefinedBasePath) =>
+        val fs = userDefinedBasePath.getFileSystem(hadoopConf)
+        if (!fs.isDirectory(userDefinedBasePath)) {
+          throw new IllegalArgumentException("Option 'basePath' must be a directory")
+        }
+        Set(fs.makeQualified(userDefinedBasePath))
+
+      case None =>
+        paths.map { path =>
+          // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
+          val qualifiedPath = path.getFileSystem(hadoopConf).makeQualified(path)
+          if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet
     }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 5bffb307ec80e97dc0ec82ea17148adc1ffbff8f..cb2c2522b20cb73be76349dd56348d489f8c24ae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -191,6 +191,29 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
     checkThrows[AssertionError]("file://path/a=", "Empty partition column value")
   }
 
+  test("parse partition with base paths") {
+    // when the basePaths is the same as the path to a leaf directory
+    val partitionSpec1: Option[PartitionValues] = parsePartition(
+      path = new Path("file://path/a=10"),
+      defaultPartitionName = defaultPartitionName,
+      typeInference = true,
+      basePaths = Set(new Path("file://path/a=10")))._1
+
+    assert(partitionSpec1.isEmpty)
+
+    // when the basePaths is the path to a base directory of leaf directories
+    val partitionSpec2: Option[PartitionValues] = parsePartition(
+      path = new Path("file://path/a=10"),
+      defaultPartitionName = defaultPartitionName,
+      typeInference = true,
+      basePaths = Set(new Path("file://path")))._1
+
+    assert(partitionSpec2 ==
+      Option(PartitionValues(
+        ArrayBuffer("a"),
+        ArrayBuffer(Literal.create(10, IntegerType)))))
+  }
+
   test("parse partitions") {
     def check(
         paths: Seq[String],
@@ -413,6 +436,43 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
     }
   }
 
+  test("read partitioned table using different path options") {
+    withTempDir { base =>
+      val pi = 1
+      val ps = "foo"
+      val path = makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)
+      makeParquetFile(
+        (1 to 10).map(i => ParquetData(i, i.toString)), path)
+
+      // when the input is the base path containing partitioning directories
+      val baseDf = sqlContext.read.parquet(base.getCanonicalPath)
+      assert(baseDf.schema.map(_.name) === Seq("intField", "stringField", "pi", "ps"))
+
+      // when the input is a path to the leaf directory containing a parquet file
+      val partDf = sqlContext.read.parquet(path.getCanonicalPath)
+      assert(partDf.schema.map(_.name) === Seq("intField", "stringField"))
+
+      path.listFiles().foreach { f =>
+        if (f.getName.toLowerCase().endsWith(".parquet")) {
+          // when the input is a path to a parquet file
+          val df = sqlContext.read.parquet(f.getCanonicalPath)
+          assert(df.schema.map(_.name) === Seq("intField", "stringField"))
+        }
+      }
+
+      path.listFiles().foreach { f =>
+        if (f.getName.toLowerCase().endsWith(".parquet")) {
+          // when the input is a path to a parquet file but `basePath` is overridden to
+          // the base path containing partitioning directories
+          val df = sqlContext
+            .read.option("basePath", base.getCanonicalPath)
+            .parquet(f.getCanonicalPath)
+          assert(df.schema.map(_.name) === Seq("intField", "stringField", "pi", "ps"))
+        }
+      }
+    }
+  }
+
   test("read partitioned table - partition key included in Parquet file") {
     withTempDir { base =>
       for {