diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
new file mode 100644
index 0000000000000000000000000000000000000000..491f1175576e64a59b44a9ffeaacf3bd301c8564
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+private[streaming] object HdfsUtils {
+
+  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
+    val dfsPath = new Path(path)
+    val dfs = getFileSystemForPath(dfsPath, conf)
+    // If the file exists and we have append support, append instead of creating a new file
+    val stream: FSDataOutputStream = {
+      if (dfs.isFile(dfsPath)) {
+        if (conf.getBoolean("hdfs.append.support", false)) {
+          dfs.append(dfsPath)
+        } else {
+          throw new IllegalStateException("File exists and there is no append support!")
+        }
+      } else {
+        dfs.create(dfsPath)
+      }
+    }
+    stream
+  }
+
+  def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
+    val dfsPath = new Path(path)
+    val dfs = getFileSystemForPath(dfsPath, conf)
+    val instream = dfs.open(dfsPath)
+    instream
+  }
+
+  def checkState(state: Boolean, errorMsg: => String) {
+    if (!state) {
+      throw new IllegalStateException(errorMsg)
+    }
+  }
+
+  def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
+    val dfsPath = new Path(path)
+    val dfs = getFileSystemForPath(dfsPath, conf)
+    val fileStatus = dfs.getFileStatus(dfsPath)
+    val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen))
+    blockLocs.map(_.flatMap(_.getHosts))
+  }
+
+  def getFileSystemForPath(path: Path, conf: Configuration): FileSystem = {
+    // For local file systems, return the raw loca file system, such calls to flush()
+    // actually flushes the stream.
+    val fs = path.getFileSystem(conf)
+    fs match {
+      case localFs: LocalFileSystem => localFs.getRawFileSystem
+      case _ => fs
+    }
+  }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala
new file mode 100644
index 0000000000000000000000000000000000000000..1005a2c8ec3030562430e75754076bfbac0b956f
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+/** Class for representing a segment of data in a write ahead log file */
+private[streaming] case class WriteAheadLogFileSegment (path: String, offset: Long, length: Int)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
new file mode 100644
index 0000000000000000000000000000000000000000..70d234320be7cd07a55c9a757f95d4835704e598
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ * @param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
+ *                            Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
+ *                    Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+    logDirectory: String,
+    hadoopConf: Configuration,
+    rollingIntervalSecs: Int = 60,
+    maxFailures: Int = 3,
+    callerName: String = "",
+    clock: Clock = new SystemClock
+  ) extends Logging {
+
+  private val pastLogs = new ArrayBuffer[LogInfo]
+  private val callerNameTag =
+    if (callerName.nonEmpty) s" for $callerName" else ""
+  private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
+  implicit private val executionContext = ExecutionContext.fromExecutorService(
+    Utils.newDaemonFixedThreadPool(1, threadpoolName))
+  override protected val logName = s"WriteAheadLogManager $callerNameTag"
+
+  private var currentLogPath: Option[String] = None
+  private var currentLogWriter: WriteAheadLogWriter = null
+  private var currentLogWriterStartTime: Long = -1L
+  private var currentLogWriterStopTime: Long = -1L
+
+  initializeOrRecover()
+
+  /**
+   * Write a byte buffer to the log file. This method synchronously writes the data in the
+   * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
+   * to HDFS, and will be available for readers to read.
+   */
+  def writeToLog(byteBuffer: ByteBuffer): WriteAheadLogFileSegment = synchronized {
+    var fileSegment: WriteAheadLogFileSegment = null
+    var failures = 0
+    var lastException: Exception = null
+    var succeeded = false
+    while (!succeeded && failures < maxFailures) {
+      try {
+        fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
+        succeeded = true
+      } catch {
+        case ex: Exception =>
+          lastException = ex
+          logWarning("Failed to write to write ahead log")
+          resetWriter()
+          failures += 1
+      }
+    }
+    if (fileSegment == null) {
+      logError(s"Failed to write to write ahead log after $failures failures")
+      throw lastException
+    }
+    fileSegment
+  }
+
+  /**
+   * Read all the existing logs from the log directory.
+   *
+   * Note that this is typically called when the caller is initializing and wants
+   * to recover past state from the write ahead logs (that is, before making any writes).
+   * If this is called after writes have been made using this manager, then it may not return
+   * the latest the records. This does not deal with currently active log files, and
+   * hence the implementation is kept simple.
+   */
+  def readFromLog(): Iterator[ByteBuffer] = synchronized {
+    val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
+    logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
+    logFilesToRead.iterator.map { file =>
+      logDebug(s"Creating log reader with $file")
+      new WriteAheadLogReader(file, hadoopConf)
+    } flatMap { x => x }
+  }
+
+  /**
+   * Delete the log files that are older than the threshold time.
+   *
+   * Its important to note that the threshold time is based on the time stamps used in the log
+   * files, which is usually based on the local system time. So if there is coordination necessary
+   * between the node calculating the threshTime (say, driver node), and the local system time
+   * (say, worker node), the caller has to take account of possible time skew.
+   */
+  def cleanupOldLogs(threshTime: Long): Unit = {
+    val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
+    logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
+      s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
+
+    def deleteFiles() {
+      oldLogFiles.foreach { logInfo =>
+        try {
+          val path = new Path(logInfo.path)
+          val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
+          fs.delete(path, true)
+          synchronized { pastLogs -= logInfo }
+          logDebug(s"Cleared log file $logInfo")
+        } catch {
+          case ex: Exception =>
+            logWarning(s"Error clearing write ahead log file $logInfo", ex)
+        }
+      }
+      logInfo(s"Cleared log files in $logDirectory older than $threshTime")
+    }
+    if (!executionContext.isShutdown) {
+      Future { deleteFiles() }
+    }
+  }
+
+  /** Stop the manager, close any open log writer */
+  def stop(): Unit = synchronized {
+    if (currentLogWriter != null) {
+      currentLogWriter.close()
+    }
+    executionContext.shutdown()
+    logInfo("Stopped write ahead log manager")
+  }
+
+  /** Get the current log writer while taking care of rotation */
+  private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized {
+    if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
+      resetWriter()
+      currentLogPath.foreach {
+        pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _)
+      }
+      currentLogWriterStartTime = currentTime
+      currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
+      val newLogPath = new Path(logDirectory,
+        timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
+      currentLogPath = Some(newLogPath.toString)
+      currentLogWriter = new WriteAheadLogWriter(currentLogPath.get, hadoopConf)
+    }
+    currentLogWriter
+  }
+
+  /** Initialize the log directory or recover existing logs inside the directory */
+  private def initializeOrRecover(): Unit = synchronized {
+    val logDirectoryPath = new Path(logDirectory)
+    val fileSystem =  HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
+
+    if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
+      val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
+      pastLogs.clear()
+      pastLogs ++= logFileInfo
+      logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
+      logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
+    }
+  }
+
+  private def resetWriter(): Unit = synchronized {
+    if (currentLogWriter != null) {
+      currentLogWriter.close()
+      currentLogWriter = null
+    }
+  }
+}
+
+private[util] object WriteAheadLogManager {
+
+  case class LogInfo(startTime: Long, endTime: Long, path: String)
+
+  val logFileRegex = """log-(\d+)-(\d+)""".r
+
+  def timeToLogFile(startTime: Long, stopTime: Long): String = {
+    s"log-$startTime-$stopTime"
+  }
+
+  /** Convert a sequence of files to a sequence of sorted LogInfo objects */
+  def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = {
+    files.flatMap { file =>
+      logFileRegex.findFirstIn(file.getName()) match {
+        case Some(logFileRegex(startTimeStr, stopTimeStr)) =>
+          val startTime = startTimeStr.toLong
+          val stopTime = stopTimeStr.toLong
+          Some(LogInfo(startTime, stopTime, file.toString))
+        case None =>
+          None
+      }
+    }.sortBy { _.startTime }
+  }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
new file mode 100644
index 0000000000000000000000000000000000000000..92bad7a882a65e39ed9fe9dcee97e248cc516ea9
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.Closeable
+import java.nio.ByteBuffer
+
+import org.apache.hadoop.conf.Configuration
+
+/**
+ * A random access reader for reading write ahead log files written using
+ * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. Given the file segment info,
+ * this reads the record (bytebuffer) from the log file.
+ */
+private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration)
+  extends Closeable {
+
+  private val instream = HdfsUtils.getInputStream(path, conf)
+  private var closed = false
+
+  def read(segment: WriteAheadLogFileSegment): ByteBuffer = synchronized {
+    assertOpen()
+    instream.seek(segment.offset)
+    val nextLength = instream.readInt()
+    HdfsUtils.checkState(nextLength == segment.length,
+      s"Expected message length to be ${segment.length}, but was $nextLength")
+    val buffer = new Array[Byte](nextLength)
+    instream.readFully(buffer)
+    ByteBuffer.wrap(buffer)
+  }
+
+  override def close(): Unit = synchronized {
+    closed = true
+    instream.close()
+  }
+
+  private def assertOpen() {
+    HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.")
+  }
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala
new file mode 100644
index 0000000000000000000000000000000000000000..2afc0d1551acf665cbc4c3415395361a8af8a285
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.{Closeable, EOFException}
+import java.nio.ByteBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.Logging
+
+/**
+ * A reader for reading write ahead log files written using
+ * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. This reads
+ * the records (bytebuffers) in the log file sequentially and return them as an
+ * iterator of bytebuffers.
+ */
+private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
+  extends Iterator[ByteBuffer] with Closeable with Logging {
+
+  private val instream = HdfsUtils.getInputStream(path, conf)
+  private var closed = false
+  private var nextItem: Option[ByteBuffer] = None
+
+  override def hasNext: Boolean = synchronized {
+    if (closed) {
+      return false
+    }
+
+    if (nextItem.isDefined) { // handle the case where hasNext is called without calling next
+      true
+    } else {
+      try {
+        val length = instream.readInt()
+        val buffer = new Array[Byte](length)
+        instream.readFully(buffer)
+        nextItem = Some(ByteBuffer.wrap(buffer))
+        logTrace("Read next item " + nextItem.get)
+        true
+      } catch {
+        case e: EOFException =>
+          logDebug("Error reading next item, EOF reached", e)
+          close()
+          false
+        case e: Exception =>
+          logWarning("Error while trying to read data from HDFS.", e)
+          close()
+          throw e
+      }
+    }
+  }
+
+  override def next(): ByteBuffer = synchronized {
+    val data = nextItem.getOrElse {
+      close()
+      throw new IllegalStateException(
+        "next called without calling hasNext or after hasNext returned false")
+    }
+    nextItem = None // Ensure the next hasNext call loads new data.
+    data
+  }
+
+  override def close(): Unit = synchronized {
+    if (!closed) {
+      instream.close()
+    }
+    closed = true
+  }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
new file mode 100644
index 0000000000000000000000000000000000000000..679f6a6dfd7c17e6fcf61d93cacbce42e1004eaa
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io._
+import java.net.URI
+import java.nio.ByteBuffer
+
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
+
+/**
+ * A writer for writing byte-buffers to a write ahead log file.
+ */
+private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
+  extends Closeable {
+
+  private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)
+
+  private lazy val hadoopFlushMethod = {
+    // Use reflection to get the right flush operation
+    val cls = classOf[FSDataOutputStream]
+    Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption
+  }
+
+  private var nextOffset = stream.getPos()
+  private var closed = false
+
+  /** Write the bytebuffer to the log file */
+  def write(data: ByteBuffer): WriteAheadLogFileSegment = synchronized {
+    assertOpen()
+    data.rewind() // Rewind to ensure all data in the buffer is retrieved
+    val lengthToWrite = data.remaining()
+    val segment = new WriteAheadLogFileSegment(path, nextOffset, lengthToWrite)
+    stream.writeInt(lengthToWrite)
+    if (data.hasArray) {
+      stream.write(data.array())
+    } else {
+      // If the buffer is not backed by an array, we transfer using temp array
+      // Note that despite the extra array copy, this should be faster than byte-by-byte copy
+      while (data.hasRemaining) {
+        val array = new Array[Byte](data.remaining)
+        data.get(array)
+        stream.write(array)
+      }
+    }
+    flush()
+    nextOffset = stream.getPos()
+    segment
+  }
+
+  override def close(): Unit = synchronized {
+    closed = true
+    stream.close()
+  }
+
+  private def flush() {
+    hadoopFlushMethod.foreach { _.invoke(stream) }
+    // Useful for local file system where hflush/sync does not work (HADOOP-7844)
+    stream.getWrappedStream.flush()
+  }
+
+  private def assertOpen() {
+    HdfsUtils.checkState(!closed, "Stream is closed. Create a new Writer to write to file.")
+  }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..5eba93c208c502c2a22d74c612157b4e047e4b95
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io._
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.language.{implicitConversions, postfixOps}
+import scala.util.Random
+
+import WriteAheadLogSuite._
+import com.google.common.io.Files
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually._
+
+class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
+
+  val hadoopConf = new Configuration()
+  var tempDir: File = null
+  var testDir: String = null
+  var testFile: String = null
+  var manager: WriteAheadLogManager = null
+
+  before {
+    tempDir = Files.createTempDir()
+    testDir = tempDir.toString
+    testFile = new File(tempDir, Random.nextString(10)).toString
+    if (manager != null) {
+      manager.stop()
+      manager = null
+    }
+  }
+
+  after {
+    FileUtils.deleteQuietly(tempDir)
+  }
+
+  test("WriteAheadLogWriter - writing data") {
+    val dataToWrite = generateRandomData()
+    val segments = writeDataUsingWriter(testFile, dataToWrite)
+    val writtenData = readDataManually(testFile, segments)
+    assert(writtenData === dataToWrite)
+  }
+
+  test("WriteAheadLogWriter - syncing of data by writing and reading immediately") {
+    val dataToWrite = generateRandomData()
+    val writer = new WriteAheadLogWriter(testFile, hadoopConf)
+    dataToWrite.foreach { data =>
+      val segment = writer.write(stringToByteBuffer(data))
+      val dataRead = readDataManually(testFile, Seq(segment)).head
+      assert(data === dataRead)
+    }
+    writer.close()
+  }
+
+  test("WriteAheadLogReader - sequentially reading data") {
+    val writtenData = generateRandomData()
+    writeDataManually(writtenData, testFile)
+    val reader = new WriteAheadLogReader(testFile, hadoopConf)
+    val readData = reader.toSeq.map(byteBufferToString)
+    assert(readData === writtenData)
+    assert(reader.hasNext === false)
+    intercept[Exception] {
+      reader.next()
+    }
+    reader.close()
+  }
+
+  test("WriteAheadLogReader - sequentially reading data written with writer") {
+    val dataToWrite = generateRandomData()
+    writeDataUsingWriter(testFile, dataToWrite)
+    val readData = readDataUsingReader(testFile)
+    assert(readData === dataToWrite)
+  }
+
+  test("WriteAheadLogReader - reading data written with writer after corrupted write") {
+    // Write data manually for testing the sequential reader
+    val dataToWrite = generateRandomData()
+    writeDataUsingWriter(testFile, dataToWrite)
+    val fileLength = new File(testFile).length()
+
+    // Append some garbage data to get the effect of a corrupted write
+    val fw = new FileWriter(testFile, true)
+    fw.append("This line appended to file!")
+    fw.close()
+
+    // Verify the data can be read and is same as the one correctly written
+    assert(readDataUsingReader(testFile) === dataToWrite)
+
+    // Corrupt the last correctly written file
+    val raf = new FileOutputStream(testFile, true).getChannel()
+    raf.truncate(fileLength - 1)
+    raf.close()
+
+    // Verify all the data except the last can be read
+    assert(readDataUsingReader(testFile) === (dataToWrite.dropRight(1)))
+  }
+
+  test("WriteAheadLogRandomReader - reading data using random reader") {
+    // Write data manually for testing the random reader
+    val writtenData = generateRandomData()
+    val segments = writeDataManually(writtenData, testFile)
+
+    // Get a random order of these segments and read them back
+    val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten
+    val reader = new WriteAheadLogRandomReader(testFile, hadoopConf)
+    writtenDataAndSegments.foreach { case (data, segment) =>
+      assert(data === byteBufferToString(reader.read(segment)))
+    }
+    reader.close()
+  }
+
+  test("WriteAheadLogRandomReader - reading data using random reader written with writer") {
+    // Write data using writer for testing the random reader
+    val data = generateRandomData()
+    val segments = writeDataUsingWriter(testFile, data)
+
+    // Read a random sequence of segments and verify read data
+    val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten
+    val reader = new WriteAheadLogRandomReader(testFile, hadoopConf)
+    dataAndSegments.foreach { case (data, segment) =>
+      assert(data === byteBufferToString(reader.read(segment)))
+    }
+    reader.close()
+  }
+
+  test("WriteAheadLogManager - write rotating logs") {
+    // Write data using manager
+    val dataToWrite = generateRandomData()
+    writeDataUsingManager(testDir, dataToWrite)
+
+    // Read data manually to verify the written data
+    val logFiles = getLogFilesInDirectory(testDir)
+    assert(logFiles.size > 1)
+    val writtenData = logFiles.flatMap { file => readDataManually(file)}
+    assert(writtenData === dataToWrite)
+  }
+
+  test("WriteAheadLogManager - read rotating logs") {
+    // Write data manually for testing reading through manager
+    val writtenData = (1 to 10).map { i =>
+      val data = generateRandomData()
+      val file = testDir + s"/log-$i-$i"
+      writeDataManually(data, file)
+      data
+    }.flatten
+
+    val logDirectoryPath = new Path(testDir)
+    val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
+    assert(fileSystem.exists(logDirectoryPath) === true)
+
+    // Read data using manager and verify
+    val readData = readDataUsingManager(testDir)
+    assert(readData === writtenData)
+  }
+
+  test("WriteAheadLogManager - recover past logs when creating new manager") {
+    // Write data with manager, recover with new manager and verify
+    val dataToWrite = generateRandomData()
+    writeDataUsingManager(testDir, dataToWrite)
+    val logFiles = getLogFilesInDirectory(testDir)
+    assert(logFiles.size > 1)
+    val readData = readDataUsingManager(testDir)
+    assert(dataToWrite === readData)
+  }
+
+  test("WriteAheadLogManager - cleanup old logs") {
+    // Write data with manager, recover with new manager and verify
+    val manualClock = new ManualClock
+    val dataToWrite = generateRandomData()
+    manager = writeDataUsingManager(testDir, dataToWrite, manualClock, stopManager = false)
+    val logFiles = getLogFilesInDirectory(testDir)
+    assert(logFiles.size > 1)
+    manager.cleanupOldLogs(manualClock.currentTime() / 2)
+    eventually(timeout(1 second), interval(10 milliseconds)) {
+      assert(getLogFilesInDirectory(testDir).size < logFiles.size)
+    }
+  }
+
+  test("WriteAheadLogManager - handling file errors while reading rotating logs") {
+    // Generate a set of log files
+    val manualClock = new ManualClock
+    val dataToWrite1 = generateRandomData()
+    writeDataUsingManager(testDir, dataToWrite1, manualClock)
+    val logFiles1 = getLogFilesInDirectory(testDir)
+    assert(logFiles1.size > 1)
+
+
+    // Recover old files and generate a second set of log files
+    val dataToWrite2 = generateRandomData()
+    manualClock.addToTime(100000)
+    writeDataUsingManager(testDir, dataToWrite2, manualClock)
+    val logFiles2 = getLogFilesInDirectory(testDir)
+    assert(logFiles2.size > logFiles1.size)
+
+    // Read the files and verify that all the written data can be read
+    val readData1 = readDataUsingManager(testDir)
+    assert(readData1 === (dataToWrite1 ++ dataToWrite2))
+
+    // Corrupt the first set of files so that they are basically unreadable
+    logFiles1.foreach { f =>
+      val raf = new FileOutputStream(f, true).getChannel()
+      raf.truncate(1)
+      raf.close()
+    }
+
+    // Verify that the corrupted files do not prevent reading of the second set of data
+    val readData = readDataUsingManager(testDir)
+    assert(readData === dataToWrite2)
+  }
+}
+
+object WriteAheadLogSuite {
+
+  private val hadoopConf = new Configuration()
+
+  /** Write data to a file directly and return an array of the file segments written. */
+  def writeDataManually(data: Seq[String], file: String): Seq[WriteAheadLogFileSegment] = {
+    val segments = new ArrayBuffer[WriteAheadLogFileSegment]()
+    val writer = HdfsUtils.getOutputStream(file, hadoopConf)
+    data.foreach { item =>
+      val offset = writer.getPos
+      val bytes = Utils.serialize(item)
+      writer.writeInt(bytes.size)
+      writer.write(bytes)
+      segments += WriteAheadLogFileSegment(file, offset, bytes.size)
+    }
+    writer.close()
+    segments
+  }
+
+  /**
+   * Write data to a file using the writer class and return an array of the file segments written.
+   */
+  def writeDataUsingWriter(filePath: String, data: Seq[String]): Seq[WriteAheadLogFileSegment] = {
+    val writer = new WriteAheadLogWriter(filePath, hadoopConf)
+    val segments = data.map {
+      item => writer.write(item)
+    }
+    writer.close()
+    segments
+  }
+
+  /** Write data to rotating files in log directory using the manager class. */
+  def writeDataUsingManager(
+      logDirectory: String,
+      data: Seq[String],
+      manualClock: ManualClock = new ManualClock,
+      stopManager: Boolean = true
+    ): WriteAheadLogManager = {
+    if (manualClock.currentTime < 100000) manualClock.setTime(10000)
+    val manager = new WriteAheadLogManager(logDirectory, hadoopConf,
+      rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = manualClock)
+    // Ensure that 500 does not get sorted after 2000, so put a high base value.
+    data.foreach { item =>
+      manualClock.addToTime(500)
+      manager.writeToLog(item)
+    }
+    if (stopManager) manager.stop()
+    manager
+  }
+
+  /** Read data from a segments of a log file directly and return the list of byte buffers.*/
+  def readDataManually(file: String, segments: Seq[WriteAheadLogFileSegment]): Seq[String] = {
+    val reader = HdfsUtils.getInputStream(file, hadoopConf)
+    segments.map { x =>
+      reader.seek(x.offset)
+      val data = new Array[Byte](x.length)
+      reader.readInt()
+      reader.readFully(data)
+      Utils.deserialize[String](data)
+    }
+  }
+
+  /** Read all the data from a log file directly and return the list of byte buffers. */
+  def readDataManually(file: String): Seq[String] = {
+    val reader = HdfsUtils.getInputStream(file, hadoopConf)
+    val buffer = new ArrayBuffer[String]
+    try {
+      while (true) {
+        // Read till EOF is thrown
+        val length = reader.readInt()
+        val bytes = new Array[Byte](length)
+        reader.read(bytes)
+        buffer += Utils.deserialize[String](bytes)
+      }
+    } catch {
+      case ex: EOFException =>
+    } finally {
+      reader.close()
+    }
+    buffer
+  }
+
+  /** Read all the data from a log file using reader class and return the list of byte buffers. */
+  def readDataUsingReader(file: String): Seq[String] = {
+    val reader = new WriteAheadLogReader(file, hadoopConf)
+    val readData = reader.toList.map(byteBufferToString)
+    reader.close()
+    readData
+  }
+
+  /** Read all the data in the log file in a directory using the manager class. */
+  def readDataUsingManager(logDirectory: String): Seq[String] = {
+    val manager = new WriteAheadLogManager(logDirectory, hadoopConf,
+      callerName = "WriteAheadLogSuite")
+    val data = manager.readFromLog().map(byteBufferToString).toSeq
+    manager.stop()
+    data
+  }
+
+  /** Get the log files in a direction */
+  def getLogFilesInDirectory(directory: String): Seq[String] = {
+    val logDirectoryPath = new Path(directory)
+    val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
+
+    if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
+      fileSystem.listStatus(logDirectoryPath).map {
+        _.getPath.toString.stripPrefix("file:")
+      }.sorted
+    } else {
+      Seq.empty
+    }
+  }
+
+  def generateRandomData(): Seq[String] = {
+    (1 to 100).map { _.toString }
+  }
+
+  implicit def stringToByteBuffer(str: String): ByteBuffer = {
+    ByteBuffer.wrap(Utils.serialize(str))
+  }
+
+  implicit def byteBufferToString(byteBuffer: ByteBuffer): String = {
+    Utils.deserialize[String](byteBuffer.array)
+  }
+}