diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 80bfda9dddb39cb2a48f0b8dcd4432bf02b97d39..24aa386c7212ba5d58b7fd8a49eb464c67972c8e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -27,6 +27,7 @@ import scala.collection.mutable import com.google.common.io.ByteStreams import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.security.AccessControlException import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} @@ -52,6 +53,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val NOT_STARTED = "<Not Started>" + // Interval between safemode checks. + private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds( + "spark.history.fs.safemodeCheck.interval", "5s") + // Interval between each check for event log updates private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s") @@ -107,9 +112,57 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - initialize() + // Conf option used for testing the initialization code. + val initThread = if (!conf.getBoolean("spark.history.testing.skipInitialize", false)) { + initialize(None) + } else { + null + } + + private[history] def initialize(errorHandler: Option[Thread.UncaughtExceptionHandler]): Thread = { + if (!isFsInSafeMode()) { + startPolling() + return null + } + + // Cannot probe anything while the FS is in safe mode, so spawn a new thread that will wait + // for the FS to leave safe mode before enabling polling. This allows the main history server + // UI to be shown (so that the user can see the HDFS status). + // + // The synchronization in the run() method is needed because of the tests; mockito can + // misbehave if the test is modifying the mocked methods while the thread is calling + // them. + val initThread = new Thread(new Runnable() { + override def run(): Unit = { + try { + clock.synchronized { + while (isFsInSafeMode()) { + logInfo("HDFS is still in safe mode. Waiting...") + val deadline = clock.getTimeMillis() + + TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S) + clock.waitTillTime(deadline) + } + } + startPolling() + } catch { + case _: InterruptedException => + } + } + }) + initThread.setDaemon(true) + initThread.setName(s"${getClass().getSimpleName()}-init") + initThread.setUncaughtExceptionHandler(errorHandler.getOrElse( + new Thread.UncaughtExceptionHandler() { + override def uncaughtException(t: Thread, e: Throwable): Unit = { + logError("Error initializing FsHistoryProvider.", e) + System.exit(1) + } + })) + initThread.start() + initThread + } - private def initialize(): Unit = { + private def startPolling(): Unit = { // Validate the log directory. val path = new Path(logDir) if (!fs.exists(path)) { @@ -170,7 +223,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - override def getConfig(): Map[String, String] = Map("Event log directory" -> logDir.toString) + override def getConfig(): Map[String, String] = { + val safeMode = if (isFsInSafeMode()) { + Map("HDFS State" -> "In safe mode, application logs not available.") + } else { + Map() + } + Map("Event log directory" -> logDir.toString) ++ safeMode + } + + override def stop(): Unit = { + if (initThread != null && initThread.isAlive()) { + initThread.interrupt() + initThread.join() + } + } /** * Builds the application list based on the current contents of the log directory. @@ -585,6 +652,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + /** + * Checks whether HDFS is in safe mode. The API is slightly different between hadoop 1 and 2, + * so we have to resort to ugly reflection (as usual...). + * + * Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons + * makes it more public than not. + */ + private[history] def isFsInSafeMode(): Boolean = fs match { + case dfs: DistributedFileSystem => + isFsInSafeMode(dfs) + case _ => + false + } + + // For testing. + private[history] def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = { + val hadoop1Class = "org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction" + val hadoop2Class = "org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction" + val actionClass: Class[_] = + try { + getClass().getClassLoader().loadClass(hadoop2Class) + } catch { + case _: ClassNotFoundException => + getClass().getClassLoader().loadClass(hadoop1Class) + } + + val action = actionClass.getField("SAFEMODE_GET").get(null) + val method = dfs.getClass().getMethod("setSafeMode", action.getClass()) + method.invoke(dfs, action).asInstanceOf[Boolean] + } + } private[history] object FsHistoryProvider { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 73cff89544dc350b44b15b65abf0ae66e8139774..833aab14ca2da7390a29f566797fbfe476b98819 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -24,13 +24,19 @@ import java.util.concurrent.TimeUnit import java.util.zip.{ZipInputStream, ZipOutputStream} import scala.io.Source +import scala.concurrent.duration._ +import scala.language.postfixOps import com.google.common.base.Charsets import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.fs.Path +import org.apache.hadoop.hdfs.DistributedFileSystem import org.json4s.jackson.JsonMethods._ +import org.mockito.Matchers.any +import org.mockito.Mockito.{doReturn, mock, spy, verify, when} import org.scalatest.BeforeAndAfter import org.scalatest.Matchers +import org.scalatest.concurrent.Eventually._ import org.apache.spark.{Logging, SparkConf, SparkFunSuite} import org.apache.spark.io._ @@ -407,6 +413,65 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } + test("provider correctly checks whether fs is in safe mode") { + val provider = spy(new FsHistoryProvider(createTestConf())) + val dfs = mock(classOf[DistributedFileSystem]) + // Asserts that safe mode is false because we can't really control the return value of the mock, + // since the API is different between hadoop 1 and 2. + assert(!provider.isFsInSafeMode(dfs)) + } + + test("provider waits for safe mode to finish before initializing") { + val clock = new ManualClock() + val conf = createTestConf().set("spark.history.testing.skipInitialize", "true") + val provider = spy(new FsHistoryProvider(conf, clock)) + doReturn(true).when(provider).isFsInSafeMode() + + val initThread = provider.initialize(None) + try { + provider.getConfig().keys should contain ("HDFS State") + + clock.setTime(5000) + provider.getConfig().keys should contain ("HDFS State") + + // Synchronization needed because of mockito. + clock.synchronized { + doReturn(false).when(provider).isFsInSafeMode() + clock.setTime(10000) + } + + eventually(timeout(1 second), interval(10 millis)) { + provider.getConfig().keys should not contain ("HDFS State") + } + } finally { + provider.stop() + } + } + + test("provider reports error after FS leaves safe mode") { + testDir.delete() + val clock = new ManualClock() + val conf = createTestConf().set("spark.history.testing.skipInitialize", "true") + val provider = spy(new FsHistoryProvider(conf, clock)) + doReturn(true).when(provider).isFsInSafeMode() + + val errorHandler = mock(classOf[Thread.UncaughtExceptionHandler]) + val initThread = provider.initialize(Some(errorHandler)) + try { + // Synchronization needed because of mockito. + clock.synchronized { + doReturn(false).when(provider).isFsInSafeMode() + clock.setTime(10000) + } + + eventually(timeout(1 second), interval(10 millis)) { + verify(errorHandler).uncaughtException(any(), any()) + } + } finally { + provider.stop() + } + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: