diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 50b4184e48916166fd193364b64579a2cb8c5c1b..dd8e46ba0f122407985d233a64971d22ab347e16 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -184,7 +184,8 @@ private[spark] class PipedRDD[T: ClassTag]( val exitStatus = proc.waitFor() cleanup() if (exitStatus != 0) { - throw new IllegalStateException(s"Subprocess exited with status $exitStatus") + throw new IllegalStateException(s"Subprocess exited with status $exitStatus. " + + s"Command ran: " + command.mkString(" ")) } false } @@ -205,6 +206,9 @@ private[spark] class PipedRDD[T: ClassTag]( private def propagateChildException(): Unit = { val t = childThreadException.get() if (t != null) { + val commandRan = command.mkString(" ") + logError(s"Caught exception while running pipe() operator. Command ran: $commandRan. " + + s"Exception: ${t.getMessage}") proc.destroy() cleanup() throw t diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index d13da38ed0c4696c2c2d65f58fa7abc22971db18..e9cc8195240f0a5a2c99af775b73afbd072aa204 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -134,15 +134,27 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { } } - test("pipe with non-zero exit status") { + test("pipe with process which cannot be launched due to bad command") { + if (!testCommandAvailable("some_nonexistent_command")) { + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + val command = Seq("some_nonexistent_command") + val piped = nums.pipe(command) + val exception = intercept[SparkException] { + piped.collect() + } + assert(exception.getMessage.contains(command.mkString(" "))) + } + } + + test("pipe with process which is launched but fails with non-zero exit status") { if (testCommandAvailable("cat")) { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) - val piped = nums.pipe(Seq("cat nonexistent_file", "2>", "/dev/null")) - intercept[SparkException] { + val command = Seq("cat", "nonexistent_file") + val piped = nums.pipe(command) + val exception = intercept[SparkException] { piped.collect() } - } else { - assert(true) + assert(exception.getMessage.contains(command.mkString(" "))) } }