diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 6e77f354b58b1455986b995dde5c124cde2991be..70912d13ae4582a461ed17d993ae70c568f91a04 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{InterruptedIOException, IOException}
 import java.util.UUID
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
@@ -37,6 +38,12 @@ import org.apache.spark.sql.execution.command.StreamingExplainCommand
 import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
 
+/** States for [[StreamExecution]]'s lifecycle. */
+trait State
+case object INITIALIZING extends State
+case object ACTIVE extends State
+case object TERMINATED extends State
+
 /**
  * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
  * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
@@ -298,7 +305,14 @@ class StreamExecution(
         // `stop()` is already called. Let `finally` finish the cleanup.
       }
     } catch {
-      case _: InterruptedException if state.get == TERMINATED => // interrupted by stop()
+      case _: InterruptedException | _: InterruptedIOException if state.get == TERMINATED =>
+        // interrupted by stop()
+        updateStatusMessage("Stopped")
+      case e: IOException if e.getMessage != null
+        && e.getMessage.startsWith(classOf[InterruptedException].getName)
+        && state.get == TERMINATED =>
+        // This is a workaround for HADOOP-12074: `Shell.runCommand` converts `InterruptedException`
+        // to `new IOException(ie.toString())` before Hadoop 2.8.
         updateStatusMessage("Stopped")
       case e: Throwable =>
         streamDeathCause = new StreamingQueryException(
@@ -721,10 +735,6 @@ class StreamExecution(
     }
   }
 
-  trait State
-  case object INITIALIZING extends State
-  case object ACTIVE extends State
-  case object TERMINATED extends State
 }
 
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index f44cfada29e2b9e6a1570ed6088da898105748ba..6dfcd8baba20e28ba7c0bba2a91625dc0bd740ee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.streaming
 
+import java.io.{InterruptedIOException, IOException}
+import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
+
 import scala.reflect.ClassTag
 import scala.util.control.ControlThrowable
 
@@ -350,13 +353,45 @@ class StreamSuite extends StreamTest {
       }
     }
   }
-}
 
-/**
- * A fake StreamSourceProvider thats creates a fake Source that cannot be reused.
- */
-class FakeDefaultSource extends StreamSourceProvider {
+  test("handle IOException when the streaming thread is interrupted (pre Hadoop 2.8)") {
+    // This test uses a fake source to throw the same IOException as pre Hadoop 2.8 when the
+    // streaming thread is interrupted. We should handle it properly by not failing the query.
+    ThrowingIOExceptionLikeHadoop12074.createSourceLatch = new CountDownLatch(1)
+    val query = spark
+      .readStream
+      .format(classOf[ThrowingIOExceptionLikeHadoop12074].getName)
+      .load()
+      .writeStream
+      .format("console")
+      .start()
+    assert(ThrowingIOExceptionLikeHadoop12074.createSourceLatch
+      .await(streamingTimeout.toMillis, TimeUnit.MILLISECONDS),
+      "ThrowingIOExceptionLikeHadoop12074.createSource wasn't called before timeout")
+    query.stop()
+    assert(query.exception.isEmpty)
+  }
 
+  test("handle InterruptedIOException when the streaming thread is interrupted (Hadoop 2.8+)") {
+    // This test uses a fake source to throw the same InterruptedIOException as Hadoop 2.8+ when the
+    // streaming thread is interrupted. We should handle it properly by not failing the query.
+    ThrowingInterruptedIOException.createSourceLatch = new CountDownLatch(1)
+    val query = spark
+      .readStream
+      .format(classOf[ThrowingInterruptedIOException].getName)
+      .load()
+      .writeStream
+      .format("console")
+      .start()
+    assert(ThrowingInterruptedIOException.createSourceLatch
+      .await(streamingTimeout.toMillis, TimeUnit.MILLISECONDS),
+      "ThrowingInterruptedIOException.createSource wasn't called before timeout")
+    query.stop()
+    assert(query.exception.isEmpty)
+  }
+}
+
+abstract class FakeSource extends StreamSourceProvider {
   private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
 
   override def sourceSchema(
@@ -364,6 +399,10 @@ class FakeDefaultSource extends StreamSourceProvider {
       schema: Option[StructType],
       providerName: String,
       parameters: Map[String, String]): (String, StructType) = ("fakeSource", fakeSchema)
+}
+
+/** A fake StreamSourceProvider that creates a fake Source that cannot be reused. */
+class FakeDefaultSource extends FakeSource {
 
   override def createSource(
       spark: SQLContext,
@@ -395,3 +434,63 @@ class FakeDefaultSource extends StreamSourceProvider {
     }
   }
 }
+
+/** A fake source that throws the same IOException like pre Hadoop 2.8 when it's interrupted. */
+class ThrowingIOExceptionLikeHadoop12074 extends FakeSource {
+  import ThrowingIOExceptionLikeHadoop12074._
+
+  override def createSource(
+      spark: SQLContext,
+      metadataPath: String,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): Source = {
+    createSourceLatch.countDown()
+    try {
+      Thread.sleep(30000)
+      throw new TimeoutException("sleep was not interrupted in 30 seconds")
+    } catch {
+      case ie: InterruptedException =>
+        throw new IOException(ie.toString)
+    }
+  }
+}
+
+object ThrowingIOExceptionLikeHadoop12074 {
+  /**
+   * A latch to allow the user to wait until [[ThrowingIOExceptionLikeHadoop12074.createSource]] is
+   * called.
+   */
+  @volatile var createSourceLatch: CountDownLatch = null
+}
+
+/** A fake source that throws InterruptedIOException like Hadoop 2.8+ when it's interrupted. */
+class ThrowingInterruptedIOException extends FakeSource {
+  import ThrowingInterruptedIOException._
+
+  override def createSource(
+      spark: SQLContext,
+      metadataPath: String,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): Source = {
+    createSourceLatch.countDown()
+    try {
+      Thread.sleep(30000)
+      throw new TimeoutException("sleep was not interrupted in 30 seconds")
+    } catch {
+      case ie: InterruptedException =>
+        val iie = new InterruptedIOException(ie.toString)
+        iie.initCause(ie)
+        throw iie
+    }
+  }
+}
+
+object ThrowingInterruptedIOException {
+  /**
+   * A latch to allow the user to wait until [[ThrowingInterruptedIOException.createSource]] is
+   * called.
+   */
+  @volatile var createSourceLatch: CountDownLatch = null
+}