diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index bfdc2cb0ac5b8e36ef3595ec7820b96c21d952ec..3155ce04a11095a849e5f1103259929773dd05a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -114,15 +114,18 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: case ut: UninterruptibleThread => // When using a local file system, "writeBatch" must be called on a // [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled - // while writing the batch file. This is because there is a potential dead-lock in - // Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread running - // "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our case, - // `writeBatch` creates a file using HDFS API and will call "Shell.runCommand" to set - // the file permission if using the local file system, and can get deadlocked if the - // stream execution thread is stopped by interrupt. Hence, we make sure that - // "writeBatch" is called on [[UninterruptibleThread]] which allows us to disable - // interrupts here. Also see SPARK-14131. - ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) } + // while writing the batch file. + // + // This is because Hadoop "Shell.runCommand" swallows InterruptException (HADOOP-14084). + // If the user tries to stop a query, and the thread running "Shell.runCommand" is + // interrupted, then InterruptException will be dropped and the query will be still + // running. (Note: `writeBatch` creates a file using HDFS APIs and will call + // "Shell.runCommand" to set the file permission if using the local file system) + // + // Hence, we make sure that "writeBatch" is called on [[UninterruptibleThread]] which + // allows us to disable interrupts here, in order to propagate the interrupt state + // correctly. Also see SPARK-19599. + ut.runUninterruptibly { writeBatch(batchId, metadata) } case _ => throw new IllegalStateException( "HDFSMetadataLog.add() on a local file system must be executed on " + @@ -132,20 +135,19 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: // For a distributed file system, such as HDFS or S3, if the network is broken, write // operations may just hang until timeout. We should enable interrupts to allow stopping // the query fast. - writeBatch(batchId, metadata, serialize) + writeBatch(batchId, metadata) } true } } - def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = serialize): Option[Path] = { - var nextId = 0 + def writeTempBatch(metadata: T): Option[Path] = { while (true) { val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp") try { val output = fileManager.create(tempPath) try { - writer(metadata, output) + serialize(metadata, output) return Some(tempPath) } finally { IOUtils.closeQuietly(output) @@ -164,7 +166,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: // big problem because it requires the attacker must have the permission to write the // metadata path. In addition, the old Streaming also have this issue, people can create // malicious checkpoint files to crash a Streaming application too. - nextId += 1 } } None @@ -176,8 +177,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a * valid behavior, we still need to prevent it from destroying the files. */ - private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = { - val tempPath = writeTempBatch(metadata, writer).getOrElse( + private def writeBatch(batchId: Long, metadata: T): Unit = { + val tempPath = writeTempBatch(metadata).getOrElse( throw new IllegalStateException(s"Unable to create temp batch file $batchId")) try { // Try to commit the batch @@ -195,12 +196,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: // So throw an exception to tell the user this is not a valid behavior. throw new ConcurrentModificationException( s"Multiple HDFSMetadataLog are using $path", e) - case e: FileNotFoundException => - // Sometimes, "create" will succeed when multiple writers are calling it at the same - // time. However, only one writer can call "rename" successfully, others will get - // FileNotFoundException because the first writer has removed it. - throw new ConcurrentModificationException( - s"Multiple HDFSMetadataLog are using $path", e) } finally { fileManager.delete(tempPath) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 3149ef04f7d1d02988709a2c4de8c53373cb16f6..239d49b08a2e50b08667472367b58e24782d00ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -179,8 +179,8 @@ class StreamExecution( /** * The thread that runs the micro-batches of this stream. Note that this thread must be - * [[org.apache.spark.util.UninterruptibleThread]] to avoid potential deadlocks in using - * [[HDFSMetadataLog]]. See SPARK-14131 for more details. + * [[org.apache.spark.util.UninterruptibleThread]] to avoid swallowing `InterruptException` when + * using [[HDFSMetadataLog]]. See SPARK-19599 for more details. */ val microBatchThread = new StreamExecutionThread(s"stream execution thread for $prettyIdString") {