diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
index 58c8560a3d0490afaa1c591ae7106590ae08caec..86bbaa20f6cf227ec9d4e401eb92896c4e6919b5 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.util.logging
 
-import java.io.{File, FileOutputStream, InputStream}
+import java.io.{File, FileOutputStream, InputStream, IOException}
 
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.util.{IntParam, Utils}
@@ -58,20 +58,28 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
   protected def appendStreamToFile() {
     try {
       logDebug("Started appending thread")
-      openFile()
-      val buf = new Array[Byte](bufferSize)
-      var n = 0
-      while (!markedForStop && n != -1) {
-        n = inputStream.read(buf)
-        if (n != -1) {
-          appendToFile(buf, n)
+      Utils.tryWithSafeFinally {
+        openFile()
+        val buf = new Array[Byte](bufferSize)
+        var n = 0
+        while (!markedForStop && n != -1) {
+          try {
+            n = inputStream.read(buf)
+          } catch {
+            // An InputStream can throw IOException during read if the stream is closed
+            // asynchronously, so once appender has been flagged to stop these will be ignored
+            case _: IOException if markedForStop =>  // do nothing and proceed to stop appending
+          }
+          if (n > 0) {
+            appendToFile(buf, n)
+          }
         }
+      } {
+        closeFile()
       }
     } catch {
       case e: Exception =>
         logError(s"Error writing stream to file $file", e)
-    } finally {
-      closeFile()
     }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index 98d1b28d5a167158e8ac6405c9b4c060c7cae3a1..b367cc835834294c10448bf93a5e11e5a86e5582 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -18,12 +18,17 @@
 package org.apache.spark.util
 
 import java.io._
+import java.util.concurrent.CountDownLatch
 
 import scala.collection.mutable.HashSet
 import scala.reflect._
 
 import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.Files
+import org.apache.log4j.{Appender, Level, Logger}
+import org.apache.log4j.spi.LoggingEvent
+import org.mockito.ArgumentCaptor
+import org.mockito.Mockito.{atLeast, mock, verify}
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
@@ -188,6 +193,67 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
     testAppenderSelection[FileAppender, Any](rollingStrategy("xyz"))
   }
 
+  test("file appender async close stream abruptly") {
+    // Test FileAppender reaction to closing InputStream using a mock logging appender
+    val mockAppender = mock(classOf[Appender])
+    val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]
+
+    // Make sure only logging errors
+    val logger = Logger.getRootLogger
+    logger.setLevel(Level.ERROR)
+    logger.addAppender(mockAppender)
+
+    val testOutputStream = new PipedOutputStream()
+    val testInputStream = new PipedInputStream(testOutputStream)
+
+    // Close the stream before appender tries to read will cause an IOException
+    testInputStream.close()
+    testOutputStream.close()
+    val appender = FileAppender(testInputStream, testFile, new SparkConf)
+
+    appender.awaitTermination()
+
+    // If InputStream was closed without first stopping the appender, an exception will be logged
+    verify(mockAppender, atLeast(1)).doAppend(loggingEventCaptor.capture)
+    val loggingEvent = loggingEventCaptor.getValue
+    assert(loggingEvent.getThrowableInformation !== null)
+    assert(loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
+  }
+
+  test("file appender async close stream gracefully") {
+    // Test FileAppender reaction to closing InputStream using a mock logging appender
+    val mockAppender = mock(classOf[Appender])
+    val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]
+
+    // Make sure only logging errors
+    val logger = Logger.getRootLogger
+    logger.setLevel(Level.ERROR)
+    logger.addAppender(mockAppender)
+
+    val testOutputStream = new PipedOutputStream()
+    val testInputStream = new PipedInputStream(testOutputStream) with LatchedInputStream
+
+    // Close the stream before appender tries to read will cause an IOException
+    testInputStream.close()
+    testOutputStream.close()
+    val appender = FileAppender(testInputStream, testFile, new SparkConf)
+
+    // Stop the appender before an IOException is called during read
+    testInputStream.latchReadStarted.await()
+    appender.stop()
+    testInputStream.latchReadProceed.countDown()
+
+    appender.awaitTermination()
+
+    // Make sure no IOException errors have been logged as a result of appender closing gracefully
+    verify(mockAppender, atLeast(0)).doAppend(loggingEventCaptor.capture)
+    import scala.collection.JavaConverters._
+    loggingEventCaptor.getAllValues.asScala.foreach { loggingEvent =>
+      assert(loggingEvent.getThrowableInformation === null
+        || !loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
+    }
+  }
+
   /**
    * Run the rolling file appender with data and see whether all the data was written correctly
    * across rolled over files.
@@ -228,4 +294,15 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
       file.getName.startsWith(testFile.getName)
     }.foreach { _.delete() }
   }
+
+  /** Used to synchronize when read is called on a stream */
+  private trait LatchedInputStream extends PipedInputStream {
+    val latchReadStarted = new CountDownLatch(1)
+    val latchReadProceed = new CountDownLatch(1)
+    abstract override def read(): Int = {
+      latchReadStarted.countDown()
+      latchReadProceed.await()
+      super.read()
+    }
+  }
 }