diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 19157af5b6f4dced4769fe6881e70fbab9b28f61..a7fc749a2b0c6d94b0cbb9fc1ef2f3a3ecdcb6c8 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2333,3 +2333,36 @@ private[spark] class RedirectThread(
     }
   }
 }
+
+/**
+ * An [[OutputStream]] that will store the last 10 kilobytes (by default) written to it
+ * in a circular buffer. The current contents of the buffer can be accessed using
+ * the toString method.
+ */
+private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.OutputStream {
+  var pos: Int = 0
+  var buffer = new Array[Int](sizeInBytes)
+
+  def write(i: Int): Unit = {
+    buffer(pos) = i
+    pos = (pos + 1) % buffer.length
+  }
+
+  override def toString: String = {
+    val (end, start) = buffer.splitAt(pos)
+    val input = new java.io.InputStream {
+      val iterator = (start ++ end).iterator
+
+      def read(): Int = if (iterator.hasNext) iterator.next() else -1
+    }
+    val reader = new BufferedReader(new InputStreamReader(input))
+    val stringBuilder = new StringBuilder
+    var line = reader.readLine()
+    while (line != null) {
+      stringBuilder.append(line)
+      stringBuilder.append("\n")
+      line = reader.readLine()
+    }
+    stringBuilder.toString()
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index a61ea3918f46ad2e98daf25254f9f596dfb96594..baa4c661cc21ef2b800b4828086637a451622e71 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -673,4 +673,12 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
     assert(!Utils.isInDirectory(nullFile, parentDir))
     assert(!Utils.isInDirectory(nullFile, childFile3))
   }
+
+  test("circular buffer") {
+    val buffer = new CircularBuffer(25)
+    val stream = new java.io.PrintStream(buffer, true, "UTF-8")
+
+    stream.println("test circular test circular test circular test circular test circular")
+    assert(buffer.toString === "t circular test circular\n")
+  }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 4c708cec572ae0b5941521e1a9fbda1f0ab9a7d4..cbd2bf6b5eede20f39c8f1833b6477353db66762 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -22,6 +22,8 @@ import java.net.URI
 import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet}
 import javax.annotation.concurrent.GuardedBy
 
+import org.apache.spark.util.CircularBuffer
+
 import scala.collection.JavaConversions._
 import scala.language.reflectiveCalls
 
@@ -66,32 +68,7 @@ private[hive] class ClientWrapper(
   with Logging {
 
   // Circular buffer to hold what hive prints to STDOUT and ERR.  Only printed when failures occur.
-  private val outputBuffer = new java.io.OutputStream {
-    var pos: Int = 0
-    var buffer = new Array[Int](10240)
-    def write(i: Int): Unit = {
-      buffer(pos) = i
-      pos = (pos + 1) % buffer.size
-    }
-
-    override def toString: String = {
-      val (end, start) = buffer.splitAt(pos)
-      val input = new java.io.InputStream {
-        val iterator = (start ++ end).iterator
-
-        def read(): Int = if (iterator.hasNext) iterator.next() else -1
-      }
-      val reader = new BufferedReader(new InputStreamReader(input))
-      val stringBuilder = new StringBuilder
-      var line = reader.readLine()
-      while(line != null) {
-        stringBuilder.append(line)
-        stringBuilder.append("\n")
-        line = reader.readLine()
-      }
-      stringBuilder.toString()
-    }
-  }
+  private val outputBuffer = new CircularBuffer()
 
   private val shim = version match {
     case hive.v12 => new Shim_v0_12()
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 611888055d6cf102df08c075ee1bd3b7517488b4..b967e191c5855d9af9dc375d56c6193575293e01 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive.HiveShim._
 import org.apache.spark.sql.hive.{HiveContext, HiveInspectors}
 import org.apache.spark.sql.types.DataType
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils}
 
 /**
  * Transforms the input by forking and running the specified script.
@@ -59,15 +59,13 @@ case class ScriptTransformation(
     child.execute().mapPartitions { iter =>
       val cmd = List("/bin/bash", "-c", script)
       val builder = new ProcessBuilder(cmd)
-      // redirectError(Redirect.INHERIT) would consume the error output from buffer and
-      // then print it to stderr (inherit the target from the current Scala process).
-      // If without this there would be 2 issues:
+      // We need to start threads connected to the process pipeline:
       // 1) The error msg generated by the script process would be hidden.
       // 2) If the error msg is too big to chock up the buffer, the input logic would be hung
-      builder.redirectError(Redirect.INHERIT)
       val proc = builder.start()
       val inputStream = proc.getInputStream
       val outputStream = proc.getOutputStream
+      val errorStream = proc.getErrorStream
       val reader = new BufferedReader(new InputStreamReader(inputStream))
 
       val (outputSerde, outputSoi) = ioschema.initOutputSerDe(output)
@@ -152,29 +150,43 @@ case class ScriptTransformation(
       val dataOutputStream = new DataOutputStream(outputStream)
       val outputProjection = new InterpretedProjection(input, child.output)
 
+      // TODO make the 2048 configurable?
+      val stderrBuffer = new CircularBuffer(2048)
+      // Consume the error stream from the pipeline, otherwise it will be blocked if
+      // the pipeline is full.
+      new RedirectThread(errorStream, // input stream from the pipeline
+        stderrBuffer,                 // output to a circular buffer
+        "Thread-ScriptTransformation-STDERR-Consumer").start()
+
       // Put the write(output to the pipeline) into a single thread
       // and keep the collector as remain in the main thread.
       // otherwise it will causes deadlock if the data size greater than
       // the pipeline / buffer capacity.
       new Thread(new Runnable() {
         override def run(): Unit = {
-          iter
-            .map(outputProjection)
-            .foreach { row =>
-            if (inputSerde == null) {
-              val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
-                ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
-
-              outputStream.write(data)
-            } else {
-              val writable = inputSerde.serialize(
-                row.asInstanceOf[GenericInternalRow].values, inputSoi)
-              prepareWritable(writable).write(dataOutputStream)
+          Utils.tryWithSafeFinally {
+            iter
+              .map(outputProjection)
+              .foreach { row =>
+              if (inputSerde == null) {
+                val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
+                  ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
+
+                outputStream.write(data)
+              } else {
+                val writable = inputSerde.serialize(
+                  row.asInstanceOf[GenericInternalRow].values, inputSoi)
+                prepareWritable(writable).write(dataOutputStream)
+              }
+            }
+            outputStream.close()
+          } {
+            if (proc.waitFor() != 0) {
+              logError(stderrBuffer.toString) // log the stderr circular buffer
             }
           }
-          outputStream.close()
         }
-      }).start()
+      }, "Thread-ScriptTransformation-Feed").start()
 
       iterator
     }
@@ -278,3 +290,4 @@ case class HiveScriptIOSchema (
     }
   }
 }
+
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index f0aad8dbbe64d4e1a8848ea0efb01858fec9bc8f..9f7e58f89024164a8f5616e138f0847b0d199c1a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -653,7 +653,7 @@ class SQLQuerySuite extends QueryTest {
         .queryExecution.toRdd.count())
   }
 
-  ignore("test script transform for stderr") {
+  test("test script transform for stderr") {
     val data = (1 to 100000).map { i => (i, i, i) }
     data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
     assert(0 ===