diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 78897daec810787bf22b896dd4a9f680f6e2784a..0c729648ef724fffec8523cbb352f4cf878aea9b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -119,7 +119,8 @@ abstract class ExternalCatalog {
       table: String,
       loadPath: String,
       isOverwrite: Boolean,
-      holdDDLTime: Boolean): Unit
+      holdDDLTime: Boolean,
+      isSrcLocal: Boolean): Unit
 
   def loadPartition(
       db: String,
@@ -128,7 +129,8 @@ abstract class ExternalCatalog {
       partition: TablePartitionSpec,
       isOverwrite: Boolean,
       holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean): Unit
+      inheritTableSpecs: Boolean,
+      isSrcLocal: Boolean): Unit
 
   def loadDynamicPartitions(
       db: String,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index a6bebe1a3938cf35e8268649d12f1333c317cba5..816e4af2df666a1d38a98323355f9a1521d5b0a5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -312,7 +312,8 @@ class InMemoryCatalog(
       table: String,
       loadPath: String,
       isOverwrite: Boolean,
-      holdDDLTime: Boolean): Unit = {
+      holdDDLTime: Boolean,
+      isSrcLocal: Boolean): Unit = {
     throw new UnsupportedOperationException("loadTable is not implemented")
   }
 
@@ -323,7 +324,8 @@ class InMemoryCatalog(
       partition: TablePartitionSpec,
       isOverwrite: Boolean,
       holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean): Unit = {
+      inheritTableSpecs: Boolean,
+      isSrcLocal: Boolean): Unit = {
     throw new UnsupportedOperationException("loadPartition is not implemented.")
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 7a3d2097a85c553de63c545d4b5da28298f51c95..e996a836fe7342ec5a07fcf4d19be70de0d3b4c5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -311,12 +311,13 @@ class SessionCatalog(
       name: TableIdentifier,
       loadPath: String,
       isOverwrite: Boolean,
-      holdDDLTime: Boolean): Unit = {
+      holdDDLTime: Boolean,
+      isSrcLocal: Boolean): Unit = {
     val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(name.table)
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Some(db)))
-    externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime)
+    externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime, isSrcLocal)
   }
 
   /**
@@ -330,13 +331,14 @@ class SessionCatalog(
       partition: TablePartitionSpec,
       isOverwrite: Boolean,
       holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean): Unit = {
+      inheritTableSpecs: Boolean,
+      isSrcLocal: Boolean): Unit = {
     val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(name.table)
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Some(db)))
     externalCatalog.loadPartition(
-      db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs)
+      db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal)
   }
 
   def defaultTablePath(tableIdent: TableIdentifier): String = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 32e2f75737394d611d68acef0511f303a57ee35f..d2a7556476a8177cc510a0049369cd0114717488 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -203,7 +203,7 @@ case class LoadDataCommand(
         throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " +
           s"but number of columns in provided partition spec (${partition.get.size}) " +
           s"do not match number of partitioned columns in table " +
-          s"(s${targetTable.partitionColumnNames.size})")
+          s"(${targetTable.partitionColumnNames.size})")
       }
       partition.get.keys.foreach { colName =>
         if (!targetTable.partitionColumnNames.contains(colName)) {
@@ -297,13 +297,15 @@ case class LoadDataCommand(
         partition.get,
         isOverwrite,
         holdDDLTime = false,
-        inheritTableSpecs = true)
+        inheritTableSpecs = true,
+        isSrcLocal = isLocal)
     } else {
       catalog.loadTable(
         targetTable.identifier,
         loadPath.toString,
         isOverwrite,
-        holdDDLTime = false)
+        holdDDLTime = false,
+        isSrcLocal = isLocal)
     }
     Seq.empty[Row]
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index f67ddc9be1a5a93f8443cc6d9d6c14f18c56a3bb..544f277cdf976240e597226ed9a76d43c590c47f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -736,13 +736,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       table: String,
       loadPath: String,
       isOverwrite: Boolean,
-      holdDDLTime: Boolean): Unit = withClient {
+      holdDDLTime: Boolean,
+      isSrcLocal: Boolean): Unit = withClient {
     requireTableExists(db, table)
     client.loadTable(
       loadPath,
       s"$db.$table",
       isOverwrite,
-      holdDDLTime)
+      holdDDLTime,
+      isSrcLocal)
   }
 
   override def loadPartition(
@@ -752,7 +754,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       partition: TablePartitionSpec,
       isOverwrite: Boolean,
       holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean): Unit = withClient {
+      inheritTableSpecs: Boolean,
+      isSrcLocal: Boolean): Unit = withClient {
     requireTableExists(db, table)
 
     val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
@@ -771,7 +774,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       orderedPartitionSpec,
       isOverwrite,
       holdDDLTime,
-      inheritTableSpecs)
+      inheritTableSpecs,
+      isSrcLocal)
   }
 
   override def loadDynamicPartitions(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 8e7c871183dfd01be3d153e9fd188c7e638742a8..837b6c57fc24092166a9446661c10f49a9956c87 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -211,14 +211,16 @@ private[hive] trait HiveClient {
       partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
       replace: Boolean,
       holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean): Unit
+      inheritTableSpecs: Boolean,
+      isSrcLocal: Boolean): Unit
 
   /** Loads data into an existing table. */
   def loadTable(
       loadPath: String, // TODO URI
       tableName: String,
       replace: Boolean,
-      holdDDLTime: Boolean): Unit
+      holdDDLTime: Boolean,
+      isSrcLocal: Boolean): Unit
 
   /** Loads new dynamic partitions into an existing table. */
   def loadDynamicPartitions(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index db73596e5f520711857dc7b8caca95e6b8880b48..b75f6e98d50522e38966bae13e09aaccbdc7c531 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -651,7 +651,8 @@ private[hive] class HiveClientImpl(
       partSpec: java.util.LinkedHashMap[String, String],
       replace: Boolean,
       holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean): Unit = withHiveState {
+      inheritTableSpecs: Boolean,
+      isSrcLocal: Boolean): Unit = withHiveState {
     val hiveTable = client.getTable(dbName, tableName, true /* throw exception */)
     shim.loadPartition(
       client,
@@ -661,20 +662,23 @@ private[hive] class HiveClientImpl(
       replace,
       holdDDLTime,
       inheritTableSpecs,
-      isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories)
+      isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories,
+      isSrcLocal = isSrcLocal)
   }
 
   def loadTable(
       loadPath: String, // TODO URI
       tableName: String,
       replace: Boolean,
-      holdDDLTime: Boolean): Unit = withHiveState {
+      holdDDLTime: Boolean,
+      isSrcLocal: Boolean): Unit = withHiveState {
     shim.loadTable(
       client,
       new Path(loadPath),
       tableName,
       replace,
-      holdDDLTime)
+      holdDDLTime,
+      isSrcLocal)
   }
 
   def loadDynamicPartitions(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index e561706facf0333bf03f05e7adb9062f0061a340..137ec267603e859b81c2530230728db945cd2b2d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -98,14 +98,16 @@ private[client] sealed abstract class Shim {
       replace: Boolean,
       holdDDLTime: Boolean,
       inheritTableSpecs: Boolean,
-      isSkewedStoreAsSubdir: Boolean): Unit
+      isSkewedStoreAsSubdir: Boolean,
+      isSrcLocal: Boolean): Unit
 
   def loadTable(
       hive: Hive,
       loadPath: Path,
       tableName: String,
       replace: Boolean,
-      holdDDLTime: Boolean): Unit
+      holdDDLTime: Boolean,
+      isSrcLocal: Boolean): Unit
 
   def loadDynamicPartitions(
       hive: Hive,
@@ -332,7 +334,8 @@ private[client] class Shim_v0_12 extends Shim with Logging {
       replace: Boolean,
       holdDDLTime: Boolean,
       inheritTableSpecs: Boolean,
-      isSkewedStoreAsSubdir: Boolean): Unit = {
+      isSkewedStoreAsSubdir: Boolean,
+      isSrcLocal: Boolean): Unit = {
     loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
       holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean)
   }
@@ -342,7 +345,8 @@ private[client] class Shim_v0_12 extends Shim with Logging {
       loadPath: Path,
       tableName: String,
       replace: Boolean,
-      holdDDLTime: Boolean): Unit = {
+      holdDDLTime: Boolean,
+      isSrcLocal: Boolean): Unit = {
     loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean)
   }
 
@@ -698,10 +702,11 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
       replace: Boolean,
       holdDDLTime: Boolean,
       inheritTableSpecs: Boolean,
-      isSkewedStoreAsSubdir: Boolean): Unit = {
+      isSkewedStoreAsSubdir: Boolean,
+      isSrcLocal: Boolean): Unit = {
     loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
       holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
-      isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE)
+      isSrcLocal: JBoolean, JBoolean.FALSE)
   }
 
   override def loadTable(
@@ -709,9 +714,10 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
       loadPath: Path,
       tableName: String,
       replace: Boolean,
-      holdDDLTime: Boolean): Unit = {
+      holdDDLTime: Boolean,
+      isSrcLocal: Boolean): Unit = {
     loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean,
-      isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE, JBoolean.FALSE)
+      isSrcLocal: JBoolean, JBoolean.FALSE, JBoolean.FALSE)
   }
 
   override def loadDynamicPartitions(
@@ -749,12 +755,6 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
       TimeUnit.MILLISECONDS).asInstanceOf[Long]
   }
 
-  protected def isSrcLocal(path: Path, conf: HiveConf): Boolean = {
-    val localFs = FileSystem.getLocal(conf)
-    val pathFs = FileSystem.get(path.toUri(), conf)
-    localFs.getUri() == pathFs.getUri()
-  }
-
 }
 
 private[client] class Shim_v1_0 extends Shim_v0_14 {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 5f5c8e2432d6cc5ed3f89b35a279758268f1ac97..db2239d26aaa6a8be53628ea9bc53dab4dfc6788 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -316,7 +316,8 @@ case class InsertIntoHiveTable(
             partitionSpec,
             isOverwrite = doHiveOverwrite,
             holdDDLTime = holdDDLTime,
-            inheritTableSpecs = inheritTableSpecs)
+            inheritTableSpecs = inheritTableSpecs,
+            isSrcLocal = false)
         }
       }
     } else {
@@ -325,7 +326,8 @@ case class InsertIntoHiveTable(
         table.catalogTable.identifier.table,
         outputPath.toString, // TODO: URI
         overwrite,
-        holdDDLTime)
+        holdDDLTime,
+        isSrcLocal = false)
     }
 
     // Invalidate the cache.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 79e76b3134c2a7cea2f4d9accfdd007ca24c35d4..a001048a9ea5d3c31a0d96962bb6b6fb4aaf2c6c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -172,7 +172,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
         emptyDir,
         tableName = "src",
         replace = false,
-        holdDDLTime = false)
+        holdDDLTime = false,
+        isSrcLocal = false)
     }
 
     test(s"$version: tableExists") {
@@ -310,7 +311,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
         partSpec,
         replace = false,
         holdDDLTime = false,
-        inheritTableSpecs = false)
+        inheritTableSpecs = false,
+        isSrcLocal = false)
     }
 
     test(s"$version: loadDynamicPartitions") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index 46ed18c70fb56b7645211bd1792b573a2a8b4388..1680f6c40acdf0f4766b3c129dbf25dcffb15c29 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.sql.hive.execution
 
+import java.io.File
+
+import com.google.common.io.Files
+
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
@@ -154,7 +158,39 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
     }
   }
 
-  test("LOAD DATA") {
+  Seq(true, false).foreach { local =>
+    val loadQuery = if (local) "LOAD DATA LOCAL" else "LOAD DATA"
+    test(loadQuery) {
+      testLoadData(loadQuery, local)
+    }
+  }
+
+  private def testLoadData(loadQuery: String, local: Boolean): Unit = {
+    // employee.dat has two columns separated by '|', the first is an int, the second is a string.
+    // Its content looks like:
+    // 16|john
+    // 17|robert
+    val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalFile()
+
+    /**
+     * Run a function with a copy of the input data file when running with non-local input. The
+     * semantics in this mode are that the input file is moved to the destination, so we have
+     * to make a copy so that subsequent tests have access to the original file.
+     */
+    def withInputFile(fn: File => Unit): Unit = {
+      if (local) {
+        fn(testData)
+      } else {
+        val tmp = File.createTempFile(testData.getName(), ".tmp")
+        Files.copy(testData, tmp)
+        try {
+          fn(tmp)
+        } finally {
+          tmp.delete()
+        }
+      }
+    }
+
     withTable("non_part_table", "part_table") {
       sql(
         """
@@ -164,18 +200,49 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
           |LINES TERMINATED BY '\n'
         """.stripMargin)
 
-      // employee.dat has two columns separated by '|', the first is an int, the second is a string.
-      // Its content looks like:
-      // 16|john
-      // 17|robert
-      val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
-
       // LOAD DATA INTO non-partitioned table can't specify partition
       intercept[AnalysisException] {
-        sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table PARTITION(ds="1")""")
+        sql(s"""$loadQuery INPATH "$testData" INTO TABLE non_part_table PARTITION(ds="1")""")
+      }
+
+      withInputFile { path =>
+        sql(s"""$loadQuery INPATH "$path" INTO TABLE non_part_table""")
+
+        // Non-local mode is expected to move the file, while local mode is expected to copy it.
+        // Check once here that the behavior is the expected.
+        assert(local === path.exists())
+      }
+
+      checkAnswer(
+        sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
+        Row(16, "john") :: Nil)
+
+      // Incorrect URI.
+      // file://path/to/data/files/employee.dat
+      //
+      // TODO: need a similar test for non-local mode.
+      if (local) {
+        val incorrectUri = "file:/" + testData.getAbsolutePath()
+        intercept[AnalysisException] {
+          sql(s"""LOAD DATA LOCAL INPATH "$incorrectUri" INTO TABLE non_part_table""")
+        }
+      }
+
+      // Use URI as inpath:
+      // file:/path/to/data/files/employee.dat
+      withInputFile { path =>
+        sql(s"""$loadQuery INPATH "${path.toURI()}" INTO TABLE non_part_table""")
+      }
+
+      checkAnswer(
+        sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
+        Row(16, "john") :: Row(16, "john") :: Nil)
+
+      // Overwrite existing data.
+      withInputFile { path =>
+        sql(s"""$loadQuery INPATH "${path.toURI()}" OVERWRITE INTO TABLE non_part_table""")
       }
 
-      sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table""")
       checkAnswer(
         sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
         Row(16, "john") :: Nil)
@@ -190,87 +257,39 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
         """.stripMargin)
 
       // LOAD DATA INTO partitioned table must specify partition
-      intercept[AnalysisException] {
-        sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table""")
+      withInputFile { path =>
+        intercept[AnalysisException] {
+          sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table""")
+        }
+
+        intercept[AnalysisException] {
+          sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(c="1")""")
+        }
+        intercept[AnalysisException] {
+          sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(d="1")""")
+        }
+        intercept[AnalysisException] {
+          sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(c="1", k="2")""")
+        }
       }
 
-      intercept[AnalysisException] {
-        sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1")""")
+      withInputFile { path =>
+        sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(c="1", d="2")""")
       }
-      intercept[AnalysisException] {
-        sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1")""")
-      }
-      intercept[AnalysisException] {
-        sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", k="2")""")
-      }
-
-      sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", d="2")""")
       checkAnswer(
         sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '2'"),
         sql("SELECT * FROM non_part_table").collect())
 
       // Different order of partition columns.
-      sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(d="1", c="2")""")
+      withInputFile { path =>
+        sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table PARTITION(d="1", c="2")""")
+      }
       checkAnswer(
         sql("SELECT employeeID, employeeName FROM part_table WHERE c = '2' AND d = '1'"),
         sql("SELECT * FROM non_part_table").collect())
     }
   }
 
-  test("LOAD DATA: input path") {
-    withTable("non_part_table") {
-      sql(
-        """
-          |CREATE TABLE non_part_table (employeeID INT, employeeName STRING)
-          |ROW FORMAT DELIMITED
-          |FIELDS TERMINATED BY '|'
-          |LINES TERMINATED BY '\n'
-        """.stripMargin)
-
-      // Non-existing inpath
-      intercept[AnalysisException] {
-        sql("""LOAD DATA LOCAL INPATH "/non-existing/data.txt" INTO TABLE non_part_table""")
-      }
-
-      val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
-
-      // Non-local inpath: without URI Scheme and Authority
-      sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""")
-      checkAnswer(
-        sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
-        Row(16, "john") :: Nil)
-
-      // Use URI as LOCAL inpath:
-      // file:/path/to/data/files/employee.dat
-      val uri = "file:" + testData
-      sql(s"""LOAD DATA LOCAL INPATH "$uri" INTO TABLE non_part_table""")
-
-      checkAnswer(
-        sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
-        Row(16, "john") :: Row(16, "john") :: Nil)
-
-      // Use URI as non-LOCAL inpath
-      sql(s"""LOAD DATA INPATH "$uri" INTO TABLE non_part_table""")
-
-      checkAnswer(
-        sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
-        Row(16, "john") :: Row(16, "john") :: Row(16, "john") :: Nil)
-
-      sql(s"""LOAD DATA INPATH "$uri" OVERWRITE INTO TABLE non_part_table""")
-
-      checkAnswer(
-        sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
-        Row(16, "john") :: Nil)
-
-      // Incorrect URI:
-      // file://path/to/data/files/employee.dat
-      val incorrectUri = "file:/" + testData
-      intercept[AnalysisException] {
-        sql(s"""LOAD DATA LOCAL INPATH "$incorrectUri" INTO TABLE non_part_table""")
-      }
-    }
-  }
-
   test("Truncate Table") {
     withTable("non_part_table", "part_table") {
       sql(
@@ -418,4 +437,5 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
       assert(sql("SHOW PARTITIONS part_datasrc").count() == 3)
     }
   }
+
 }