diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 8d4a53b4ca9b05d73d5b42f53d6c4e23696b5dce..4c71b69069eb33e116479cb1b0da8e80f47cd3aa 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -76,7 +76,6 @@ private[spark] class PythonRDD(
 
     context.addTaskCompletionListener { context =>
       writerThread.shutdownOnTaskCompletion()
-      writerThread.join()
       if (!reuse_worker || !released) {
         try {
           worker.close()
@@ -248,13 +247,17 @@ private[spark] class PythonRDD(
       } catch {
         case e: Exception if context.isCompleted || context.isInterrupted =>
           logDebug("Exception thrown after task completion (likely due to cleanup)", e)
-          Utils.tryLog(worker.shutdownOutput())
+          if (!worker.isClosed) {
+            Utils.tryLog(worker.shutdownOutput())
+          }
 
         case e: Exception =>
           // We must avoid throwing exceptions here, because the thread uncaught exception handler
           // will kill the whole executor (see org.apache.spark.executor.Executor).
           _exception = e
-          Utils.tryLog(worker.shutdownOutput())
+          if (!worker.isClosed) {
+            Utils.tryLog(worker.shutdownOutput())
+          }
       } finally {
         // Release memory used by this thread for shuffles
         env.shuffleMemoryManager.releaseMemoryForThisThread()
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index f09587f2117089f76e53e722741cee398bcc4547..93885985fe3774672e4a846b7f8a8bc82d70e77d 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -61,7 +61,10 @@ def worker(sock):
     except SystemExit as exc:
         exit_code = compute_real_exit_code(exc.code)
     finally:
-        outfile.flush()
+        try:
+            outfile.flush()
+        except Exception:
+            pass
     return exit_code
 
 
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 06ba2b461d53e4f91a9eb45f6165f39d6edcdb0d..dd8d3b1c53733cd96711b33e7caffb43e35d3e48 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -782,6 +782,11 @@ class RDDTests(ReusedPySparkTestCase):
         jobId = tracker.getJobIdsForGroup("test4")[0]
         self.assertEqual(3, len(tracker.getJobInfo(jobId).stageIds))
 
+    # Regression test for SPARK-6294
+    def test_take_on_jrdd(self):
+        rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
+        rdd._jrdd.first()
+
 
 class ProfilerTests(PySparkTestCase):