diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 252721192904f871819808aa67d095fb62311bb9..c3c8336a437a692dec65ec4d2e5fd67a95959dc0 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -144,11 +144,24 @@ private[spark] class PythonRDD(
                 stream.readFully(update)
                 accumulator += Collections.singletonList(update)
               }
+
               // Check whether the worker is ready to be re-used.
-              if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
-                if (reuse_worker) {
-                  env.releasePythonWorker(pythonExec, envVars.toMap, worker)
-                  released = true
+              if (reuse_worker) {
+                // It has a high possibility that the ending mark is already available,
+                // And current task should not be blocked by checking it
+
+                if (stream.available() >= 4) {
+                  val ending = stream.readInt()
+                  if (ending == SpecialLengths.END_OF_STREAM) {
+                    env.releasePythonWorker(pythonExec, envVars.toMap, worker)
+                    released = true
+                    logInfo(s"Communication with worker ended cleanly, re-use it: $worker")
+                  } else {
+                    logInfo(s"Communication with worker did not end cleanly (ending with $ending), " +
+                      s"close it: $worker")
+                  }
+                } else {
+                  logInfo(s"The ending mark from worker is not available, close it: $worker")
                 }
               }
               null
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 8a93c320ec5d342be490f2ba73fc5850bfd85c8f..180bdbb4c2c4fa7987e0e21aa1c29675606cd6b2 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -121,6 +121,7 @@ def main(infile, outfile):
     write_int(len(_accumulatorRegistry), outfile)
     for (aid, accum) in _accumulatorRegistry.items():
         pickleSer._write_with_length((aid, accum._value), outfile)
+    outfile.flush()
 
     # check end of stream
     if read_int(infile) == SpecialLengths.END_OF_STREAM: