diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 080729b2ca8d6d83294563959c1cb58aa46bc842..d95ec7f67feb3ba9fd21d10cda39ba1aef614954 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -105,25 +105,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
   /**
    * Store the metadata for the specified batchId and return `true` if successful. If the batchId's
    * metadata has already been stored, this method will return `false`.
-   *
-   * Note that this method must be called on a [[org.apache.spark.util.UninterruptibleThread]]
-   * so that interrupts can be disabled while writing the batch file. This is because there is a
-   * potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread
-   * running "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our
-   * case, `writeBatch` creates a file using HDFS API and calls "Shell.runCommand" to set the
-   * file permissions, and can get deadlocked if the stream execution thread is stopped by
-   * interrupt. Hence, we make sure that this method is called on [[UninterruptibleThread]] which
-   * allows us to disable interrupts here. Also see SPARK-14131.
    */
   override def add(batchId: Long, metadata: T): Boolean = {
     get(batchId).map(_ => false).getOrElse {
       // Only write metadata when the batch has not yet been written
-      Thread.currentThread match {
-        case ut: UninterruptibleThread =>
-          ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
-        case _ =>
-          throw new IllegalStateException(
-            "HDFSMetadataLog.add() must be executed on a o.a.spark.util.UninterruptibleThread")
+      if (fileManager.isLocalFileSystem) {
+        Thread.currentThread match {
+          case ut: UninterruptibleThread =>
+            // When using a local file system, "writeBatch" must be called on a
+            // [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled
+            // while writing the batch file. This is because there is a potential dead-lock in
+            // Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread running
+            // "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our case,
+            // `writeBatch` creates a file using HDFS API and will call "Shell.runCommand" to set
+            // the file permission if using the local file system, and can get deadlocked if the
+            // stream execution thread is stopped by interrupt. Hence, we make sure that
+            // "writeBatch" is called on [[UninterruptibleThread]] which allows us to disable
+            // interrupts here. Also see SPARK-14131.
+            ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
+          case _ =>
+            throw new IllegalStateException(
+              "HDFSMetadataLog.add() on a local file system must be executed on " +
+                "a o.a.spark.util.UninterruptibleThread")
+        }
+      } else {
+        // For a distributed file system, such as HDFS or S3, if the network is broken, write
+        // operations may just hang until timeout. We should enable interrupts to allow stopping
+        // the query fast.
+        writeBatch(batchId, metadata, serialize)
       }
       true
     }
@@ -298,6 +307,9 @@ object HDFSMetadataLog {
 
     /** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */
     def delete(path: Path): Unit
+
+    /** Whether the file systme is a local FS. */
+    def isLocalFileSystem: Boolean
   }
 
   /**
@@ -342,6 +354,13 @@ object HDFSMetadataLog {
         // ignore if file has already been deleted
       }
     }
+
+    override def isLocalFileSystem: Boolean = fc.getDefaultFileSystem match {
+      case _: local.LocalFs | _: local.RawLocalFs =>
+        // LocalFs = RawLocalFs + ChecksumFs
+        true
+      case _ => false
+    }
   }
 
   /**
@@ -398,5 +417,12 @@ object HDFSMetadataLog {
           // ignore if file has already been deleted
       }
     }
+
+    override def isLocalFileSystem: Boolean = fs match {
+      case _: LocalFileSystem | _: RawLocalFileSystem =>
+        // LocalFileSystem = RawLocalFileSystem + ChecksumFileSystem
+        true
+      case _ => false
+    }
   }
 }