diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index f3b209deaae5c776ebd79ad64ddf1bba910ced17..bb7d1f70b62d94259cc2aa8f4ef616422cc7b17e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -481,7 +481,7 @@ case class DataSource(
   }
 }
 
-object DataSource {
+object DataSource extends Logging {
 
   /** A map to maintain backward compatibility in case we move data sources around. */
   private val backwardCompatibilityMap: Map[String, String] = {
@@ -570,10 +570,19 @@ object DataSource {
           // there is exactly one registered alias
           head.getClass
         case sources =>
-          // There are multiple registered aliases for the input
-          sys.error(s"Multiple sources found for $provider1 " +
-            s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
-            "please specify the fully qualified class name.")
+          // There are multiple registered aliases for the input. If there is single datasource
+          // that has "org.apache.spark" package in the prefix, we use it considering it is an
+          // internal datasource within Spark.
+          val sourceNames = sources.map(_.getClass.getName)
+          val internalSources = sources.filter(_.getClass.getName.startsWith("org.apache.spark"))
+          if (internalSources.size == 1) {
+            logWarning(s"Multiple sources found for $provider1 (${sourceNames.mkString(", ")}), " +
+              s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).")
+            internalSources.head.getClass
+          } else {
+            throw new AnalysisException(s"Multiple sources found for $provider1 " +
+              s"(${sourceNames.mkString(", ")}), please specify the fully qualified class name.")
+          }
       }
     } catch {
       case e: ServiceConfigurationError if e.getCause.isInstanceOf[NoClassDefFoundError] =>
diff --git a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index cfd7889b4ac2ca180bcbc8c0cbf528ec66866dd5..c6973bf41d34b1e5c5702e88995fa29e9dd10f40 100644
--- a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1,3 +1,7 @@
 org.apache.spark.sql.sources.FakeSourceOne
 org.apache.spark.sql.sources.FakeSourceTwo
 org.apache.spark.sql.sources.FakeSourceThree
+org.apache.spark.sql.sources.FakeSourceFour
+org.apache.fakesource.FakeExternalSourceOne
+org.apache.fakesource.FakeExternalSourceTwo
+org.apache.fakesource.FakeExternalSourceThree
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
index 85ba33e58a7872f1eb4b1d544b42e07a1934d821..b5fb740b6eb7746a6bd1565413e55ca8d209c25b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
@@ -19,26 +19,39 @@ package org.apache.spark.sql.sources
 
 import org.apache.spark.sql.{AnalysisException, SQLContext}
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types._
 
 
 // please note that the META-INF/services had to be modified for the test directory for this to work
 class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext {
 
-  test("data sources with the same name") {
-    intercept[RuntimeException] {
+  test("data sources with the same name - internal data sources") {
+    val e = intercept[AnalysisException] {
       spark.read.format("Fluet da Bomb").load()
     }
+    assert(e.getMessage.contains("Multiple sources found for Fluet da Bomb"))
+  }
+
+  test("data sources with the same name - internal data source/external data source") {
+    assert(spark.read.format("datasource").load().schema ==
+      StructType(Seq(StructField("longType", LongType, nullable = false))))
+  }
+
+  test("data sources with the same name - external data sources") {
+    val e = intercept[AnalysisException] {
+      spark.read.format("Fake external source").load()
+    }
+    assert(e.getMessage.contains("Multiple sources found for Fake external source"))
   }
 
   test("load data source from format alias") {
-    spark.read.format("gathering quorum").load().schema ==
-      StructType(Seq(StructField("stringType", StringType, nullable = false)))
+    assert(spark.read.format("gathering quorum").load().schema ==
+      StructType(Seq(StructField("stringType", StringType, nullable = false))))
   }
 
   test("specify full classname with duplicate formats") {
-    spark.read.format("org.apache.spark.sql.sources.FakeSourceOne")
-      .load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false)))
+    assert(spark.read.format("org.apache.spark.sql.sources.FakeSourceOne")
+      .load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false))))
   }
 
   test("should fail to load ORC without Hive Support") {
@@ -63,7 +76,7 @@ class FakeSourceOne extends RelationProvider with DataSourceRegister {
     }
 }
 
-class FakeSourceTwo extends RelationProvider  with DataSourceRegister {
+class FakeSourceTwo extends RelationProvider with DataSourceRegister {
 
   def shortName(): String = "Fluet da Bomb"
 
@@ -72,7 +85,7 @@ class FakeSourceTwo extends RelationProvider  with DataSourceRegister {
       override def sqlContext: SQLContext = cont
 
       override def schema: StructType =
-        StructType(Seq(StructField("stringType", StringType, nullable = false)))
+        StructType(Seq(StructField("integerType", IntegerType, nullable = false)))
     }
 }
 
@@ -88,3 +101,16 @@ class FakeSourceThree extends RelationProvider with DataSourceRegister {
         StructType(Seq(StructField("stringType", StringType, nullable = false)))
     }
 }
+
+class FakeSourceFour extends RelationProvider with DataSourceRegister {
+
+  def shortName(): String = "datasource"
+
+  override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation =
+    new BaseRelation {
+      override def sqlContext: SQLContext = cont
+
+      override def schema: StructType =
+        StructType(Seq(StructField("longType", LongType, nullable = false)))
+    }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/fakeExternalSources.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/fakeExternalSources.scala
new file mode 100644
index 0000000000000000000000000000000000000000..0dfd75e709123d51e53fd8262920c4ac1d97f19e
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/fakeExternalSources.scala
@@ -0,0 +1,64 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.fakesource
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider}
+import org.apache.spark.sql.types._
+
+
+// Note that the package name is intendedly mismatched in order to resemble external data sources
+// and test the detection for them.
+class FakeExternalSourceOne extends RelationProvider with DataSourceRegister {
+
+  def shortName(): String = "Fake external source"
+
+  override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation =
+    new BaseRelation {
+      override def sqlContext: SQLContext = cont
+
+      override def schema: StructType =
+        StructType(Seq(StructField("stringType", StringType, nullable = false)))
+    }
+}
+
+class FakeExternalSourceTwo extends RelationProvider with DataSourceRegister {
+
+  def shortName(): String = "Fake external source"
+
+  override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation =
+    new BaseRelation {
+      override def sqlContext: SQLContext = cont
+
+      override def schema: StructType =
+        StructType(Seq(StructField("integerType", IntegerType, nullable = false)))
+    }
+}
+
+class FakeExternalSourceThree extends RelationProvider with DataSourceRegister {
+
+  def shortName(): String = "datasource"
+
+  override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation =
+    new BaseRelation {
+      override def sqlContext: SQLContext = cont
+
+      override def schema: StructType =
+        StructType(Seq(StructField("byteType", ByteType, nullable = false)))
+    }
+}