diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 87fe56315203ed2992a8c56c3b9a42c1bdebc6f2..6e12cd16a3a52b91a128ad737cc0ce53ff4546ef 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -185,7 +185,7 @@ import org.apache.spark.util.Utils
 
 private[spark] class SecurityManager(
     sparkConf: SparkConf,
-    ioEncryptionKey: Option[Array[Byte]] = None)
+    val ioEncryptionKey: Option[Array[Byte]] = None)
   extends Logging with SecretKeyHolder {
 
   import SecurityManager._
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 686305e9335dc362aa210f3d41c2cbf62791ed55..dd98ea265ce4a6d6f01a4b965ef0bfe28dc40bfb 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -172,20 +172,26 @@ private[spark] class SerializerManager(
   }
 
   /** Serializes into a chunked byte buffer. */
-  def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): ChunkedByteBuffer = {
-    dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]])
+  def dataSerialize[T: ClassTag](
+      blockId: BlockId,
+      values: Iterator[T],
+      allowEncryption: Boolean = true): ChunkedByteBuffer = {
+    dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]],
+      allowEncryption = allowEncryption)
   }
 
   /** Serializes into a chunked byte buffer. */
   def dataSerializeWithExplicitClassTag(
       blockId: BlockId,
       values: Iterator[_],
-      classTag: ClassTag[_]): ChunkedByteBuffer = {
+      classTag: ClassTag[_],
+      allowEncryption: Boolean = true): ChunkedByteBuffer = {
     val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)
     val byteStream = new BufferedOutputStream(bbos)
     val autoPick = !blockId.isInstanceOf[StreamBlockId]
     val ser = getSerializer(classTag, autoPick).newInstance()
-    ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close()
+    val encrypted = if (allowEncryption) wrapForEncryption(byteStream) else byteStream
+    ser.serializeStream(wrapForCompression(blockId, encrypted)).writeAll(values).close()
     bbos.toChunkedByteBuffer
   }
 
@@ -195,13 +201,15 @@ private[spark] class SerializerManager(
    */
   def dataDeserializeStream[T](
       blockId: BlockId,
-      inputStream: InputStream)
+      inputStream: InputStream,
+      maybeEncrypted: Boolean = true)
       (classTag: ClassTag[T]): Iterator[T] = {
     val stream = new BufferedInputStream(inputStream)
     val autoPick = !blockId.isInstanceOf[StreamBlockId]
+    val decrypted = if (maybeEncrypted) wrapForEncryption(inputStream) else inputStream
     getSerializer(classTag, autoPick)
       .newInstance()
-      .deserializeStream(wrapStream(blockId, stream))
+      .deserializeStream(wrapForCompression(blockId, decrypted))
       .asIterator.asInstanceOf[Iterator[T]]
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 04521c9159eacf97931367e20d1616959bf54e11..cdf48e430caf5ae91db3095292a69985c583093a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -28,6 +28,8 @@ import scala.reflect.ClassTag
 import scala.util.Random
 import scala.util.control.NonFatal
 
+import com.google.common.io.ByteStreams
+
 import org.apache.spark._
 import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
 import org.apache.spark.internal.Logging
@@ -38,6 +40,7 @@ import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.ExternalShuffleClient
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
 import org.apache.spark.rpc.RpcEnv
+import org.apache.spark.security.CryptoStreamUtils
 import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
 import org.apache.spark.shuffle.ShuffleManager
 import org.apache.spark.storage.memory._
@@ -753,15 +756,43 @@ private[spark] class BlockManager(
   /**
    * Put a new block of serialized bytes to the block manager.
    *
+   * @param encrypt If true, asks the block manager to encrypt the data block before storing,
+   *                when I/O encryption is enabled. This is required for blocks that have been
+   *                read from unencrypted sources, since all the BlockManager read APIs
+   *                automatically do decryption.
    * @return true if the block was stored or false if an error occurred.
    */
   def putBytes[T: ClassTag](
       blockId: BlockId,
       bytes: ChunkedByteBuffer,
       level: StorageLevel,
-      tellMaster: Boolean = true): Boolean = {
+      tellMaster: Boolean = true,
+      encrypt: Boolean = false): Boolean = {
     require(bytes != null, "Bytes is null")
-    doPutBytes(blockId, bytes, level, implicitly[ClassTag[T]], tellMaster)
+
+    val bytesToStore =
+      if (encrypt && securityManager.ioEncryptionKey.isDefined) {
+        try {
+          val data = bytes.toByteBuffer
+          val in = new ByteBufferInputStream(data, true)
+          val byteBufOut = new ByteBufferOutputStream(data.remaining())
+          val out = CryptoStreamUtils.createCryptoOutputStream(byteBufOut, conf,
+            securityManager.ioEncryptionKey.get)
+          try {
+            ByteStreams.copy(in, out)
+          } finally {
+            in.close()
+            out.close()
+          }
+          new ChunkedByteBuffer(byteBufOut.toByteBuffer)
+        } finally {
+          bytes.dispose()
+        }
+      } else {
+        bytes
+      }
+
+    doPutBytes(blockId, bytesToStore, level, implicitly[ClassTag[T]], tellMaster)
   }
 
   /**
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 38b4f7817713daed4a1412366f2e586e196fe76e..a878971608b3541d69f2202c32c01a2b5f64ea94 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -2017,6 +2017,9 @@ To run a Spark Streaming applications, you need to have the following.
   `spark.streaming.driver.writeAheadLog.closeFileAfterWrite` and
   `spark.streaming.receiver.writeAheadLog.closeFileAfterWrite`. See
   [Spark Streaming Configuration](configuration.html#spark-streaming) for more details.
+  Note that Spark will not encrypt data written to the write ahead log when I/O encryption is
+  enabled. If encryption of the write ahead log data is desired, it should be stored in a file
+  system that supports encryption natively.
 
 - *Setting the max receiving rate* - If the cluster resources is not large enough for the streaming
   application to process data as fast as it is being received, the receivers can be rate limited
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index 0b2ec298132ad09de1eb5490ff94edc161e0c69a..d0864fd3678b287e8c059bcbbd9b0c13aa47956f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -27,7 +27,7 @@ import org.apache.spark._
 import org.apache.spark.rdd.BlockRDD
 import org.apache.spark.storage.{BlockId, StorageLevel}
 import org.apache.spark.streaming.util._
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util._
 import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
@@ -158,13 +158,16 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
       logInfo(s"Read partition data of $this from write ahead log, record handle " +
         partition.walRecordHandle)
       if (storeInBlockManager) {
-        blockManager.putBytes(blockId, new ChunkedByteBuffer(dataRead.duplicate()), storageLevel)
+        blockManager.putBytes(blockId, new ChunkedByteBuffer(dataRead.duplicate()), storageLevel,
+          encrypt = true)
         logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
         dataRead.rewind()
       }
       serializerManager
         .dataDeserializeStream(
-          blockId, new ChunkedByteBuffer(dataRead).toInputStream())(elementClassTag)
+          blockId,
+          new ChunkedByteBuffer(dataRead).toInputStream(),
+          maybeEncrypted = false)(elementClassTag)
         .asInstanceOf[Iterator[T]]
     }
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 80c07958b41f24ab7fa2e17bfe81994bd71e1cb5..2b488038f06206fb997df67e815797969faaffc0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -87,7 +87,8 @@ private[streaming] class BlockManagerBasedBlockHandler(
         putResult
       case ByteBufferBlock(byteBuffer) =>
         blockManager.putBytes(
-          blockId, new ChunkedByteBuffer(byteBuffer.duplicate()), storageLevel, tellMaster = true)
+          blockId, new ChunkedByteBuffer(byteBuffer.duplicate()), storageLevel, tellMaster = true,
+          encrypt = true)
       case o =>
         throw new SparkException(
           s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
@@ -175,10 +176,11 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
     val serializedBlock = block match {
       case ArrayBufferBlock(arrayBuffer) =>
         numRecords = Some(arrayBuffer.size.toLong)
-        serializerManager.dataSerialize(blockId, arrayBuffer.iterator)
+        serializerManager.dataSerialize(blockId, arrayBuffer.iterator, allowEncryption = false)
       case IteratorBlock(iterator) =>
         val countIterator = new CountingIterator(iterator)
-        val serializedBlock = serializerManager.dataSerialize(blockId, countIterator)
+        val serializedBlock = serializerManager.dataSerialize(blockId, countIterator,
+          allowEncryption = false)
         numRecords = countIterator.count
         serializedBlock
       case ByteBufferBlock(byteBuffer) =>
@@ -193,7 +195,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
         blockId,
         serializedBlock,
         effectiveStorageLevel,
-        tellMaster = true)
+        tellMaster = true,
+        encrypt = true)
       if (!putSucceeded) {
         throw new SparkException(
           s"Could not store $blockId to block manager with storage level $storageLevel")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index f2241936000a0761e1f3fa3681adbf2d9f2686a7..c2b0389b8c6f0bdc79f0f356681c7f3c721c9b77 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -32,10 +32,12 @@ import org.scalatest.concurrent.Eventually._
 import org.apache.spark._
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 import org.apache.spark.memory.StaticMemoryManager
 import org.apache.spark.network.netty.NettyBlockTransferService
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.security.CryptoStreamUtils
 import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
 import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.storage._
@@ -44,7 +46,7 @@ import org.apache.spark.streaming.util._
 import org.apache.spark.util.{ManualClock, Utils}
 import org.apache.spark.util.io.ChunkedByteBuffer
 
-class ReceivedBlockHandlerSuite
+abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
   extends SparkFunSuite
   with BeforeAndAfter
   with Matchers
@@ -57,14 +59,22 @@ class ReceivedBlockHandlerSuite
   val conf = new SparkConf()
     .set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1")
     .set("spark.app.id", "streaming-test")
+    .set(IO_ENCRYPTION_ENABLED, enableEncryption)
+  val encryptionKey =
+    if (enableEncryption) {
+      Some(CryptoStreamUtils.createKey(conf))
+    } else {
+      None
+    }
+
   val hadoopConf = new Configuration()
   val streamId = 1
-  val securityMgr = new SecurityManager(conf)
+  val securityMgr = new SecurityManager(conf, encryptionKey)
   val broadcastManager = new BroadcastManager(true, conf, securityMgr)
   val mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true)
   val shuffleManager = new SortShuffleManager(conf)
   val serializer = new KryoSerializer(conf)
-  var serializerManager = new SerializerManager(serializer, conf)
+  var serializerManager = new SerializerManager(serializer, conf, encryptionKey)
   val manualClock = new ManualClock
   val blockManagerSize = 10000000
   val blockManagerBuffer = new ArrayBuffer[BlockManager]()
@@ -164,7 +174,9 @@ class ReceivedBlockHandlerSuite
           val bytes = reader.read(fileSegment)
           reader.close()
           serializerManager.dataDeserializeStream(
-            generateBlockId(), new ChunkedByteBuffer(bytes).toInputStream())(ClassTag.Any).toList
+            generateBlockId(),
+            new ChunkedByteBuffer(bytes).toInputStream(),
+            maybeEncrypted = false)(ClassTag.Any).toList
         }
         loggedData shouldEqual data
       }
@@ -208,6 +220,8 @@ class ReceivedBlockHandlerSuite
     sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
     // spark.storage.unrollFraction set to 0.4 for BlockManager
     sparkConf.set("spark.storage.unrollFraction", "0.4")
+
+    sparkConf.set(IO_ENCRYPTION_ENABLED, enableEncryption)
     // Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
     blockManager = createBlockManager(12000, sparkConf)
 
@@ -343,7 +357,7 @@ class ReceivedBlockHandlerSuite
     }
 
     def dataToByteBuffer(b: Seq[String]) =
-      serializerManager.dataSerialize(generateBlockId, b.iterator)
+      serializerManager.dataSerialize(generateBlockId, b.iterator, allowEncryption = false)
 
     val blocks = data.grouped(10).toSeq
 
@@ -418,3 +432,6 @@ class ReceivedBlockHandlerSuite
   private def generateBlockId(): StreamBlockId = StreamBlockId(streamId, scala.util.Random.nextLong)
 }
 
+class ReceivedBlockHandlerSuite extends BaseReceivedBlockHandlerSuite(false)
+
+class ReceivedBlockHandlerWithEncryptionSuite extends BaseReceivedBlockHandlerSuite(true)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
index a37fac87300b738cd8139102b2943e12f86495b6..ee69bf87d4efe6ad2d8e499c6ad854b009892a5c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite}
+import org.apache.spark.internal.config._
 import org.apache.spark.serializer.SerializerManager
 import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
 import org.apache.spark.streaming.util.{FileBasedWriteAheadLogSegment, FileBasedWriteAheadLogWriter}
@@ -45,6 +46,7 @@ class WriteAheadLogBackedBlockRDDSuite
 
   override def beforeEach(): Unit = {
     super.beforeEach()
+    initSparkContext()
     dir = Utils.createTempDir()
   }
 
@@ -56,22 +58,33 @@ class WriteAheadLogBackedBlockRDDSuite
     }
   }
 
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    sparkContext = new SparkContext(conf)
-    blockManager = sparkContext.env.blockManager
-    serializerManager = sparkContext.env.serializerManager
+  override def afterAll(): Unit = {
+    try {
+      stopSparkContext()
+    } finally {
+      super.afterAll()
+    }
   }
 
-  override def afterAll(): Unit = {
+  private def initSparkContext(_conf: Option[SparkConf] = None): Unit = {
+    if (sparkContext == null) {
+      sparkContext = new SparkContext(_conf.getOrElse(conf))
+      blockManager = sparkContext.env.blockManager
+      serializerManager = sparkContext.env.serializerManager
+    }
+  }
+
+  private def stopSparkContext(): Unit = {
     // Copied from LocalSparkContext, simpler than to introduced test dependencies to core tests.
     try {
-      sparkContext.stop()
+      if (sparkContext != null) {
+        sparkContext.stop()
+      }
       System.clearProperty("spark.driver.port")
       blockManager = null
       serializerManager = null
     } finally {
-      super.afterAll()
+      sparkContext = null
     }
   }
 
@@ -106,6 +119,17 @@ class WriteAheadLogBackedBlockRDDSuite
       numPartitions = 5, numPartitionsInBM = 0, numPartitionsInWAL = 5, testStoreInBM = true)
   }
 
+  test("read data in block manager and WAL with encryption on") {
+    stopSparkContext()
+    try {
+      val testConf = conf.clone().set(IO_ENCRYPTION_ENABLED, true)
+      initSparkContext(Some(testConf))
+      testRDD(numPartitions = 5, numPartitionsInBM = 3, numPartitionsInWAL = 2)
+    } finally {
+      stopSparkContext()
+    }
+  }
+
   /**
    * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager
    * and the rest to a write ahead log, and then reading reading it all back using the RDD.
@@ -226,7 +250,8 @@ class WriteAheadLogBackedBlockRDDSuite
     require(blockData.size === blockIds.size)
     val writer = new FileBasedWriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf)
     val segments = blockData.zip(blockIds).map { case (data, id) =>
-      writer.write(serializerManager.dataSerialize(id, data.iterator).toByteBuffer)
+      writer.write(serializerManager.dataSerialize(id, data.iterator, allowEncryption = false)
+        .toByteBuffer)
     }
     writer.close()
     segments