diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index aeff4d7a98e7acc113768947cc8e41b62a5a7683..46bfc60856453f4b507deac74aa8248e9c90f839 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -24,11 +24,14 @@ import java.util.{ArrayList => JArrayList, List => JList}
 import scala.collection.JavaConverters._
 import scala.language.existentials
 
+import py4j.Py4JException
+
 import org.apache.spark.SparkException
 import org.apache.spark.api.java._
+import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Duration, Interval, Time}
+import org.apache.spark.streaming.{Duration, Interval, StreamingContext, Time}
 import org.apache.spark.streaming.api.java._
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.util.Utils
@@ -157,7 +160,7 @@ private[python] object PythonTransformFunctionSerializer {
 /**
  * Helper functions, which are called from Python via Py4J.
  */
-private[python] object PythonDStream {
+private[streaming] object PythonDStream {
 
   /**
    * can not access PythonTransformFunctionSerializer.register() via Py4j
@@ -184,6 +187,32 @@ private[python] object PythonDStream {
     rdds.asScala.foreach(queue.add)
     queue
   }
+
+  /**
+   * Stop [[StreamingContext]] if the Python process crashes (E.g., OOM) in case the user cannot
+   * stop it in the Python side.
+   */
+  def stopStreamingContextIfPythonProcessIsDead(e: Throwable): Unit = {
+    // These two special messages are from:
+    // scalastyle:off
+    // https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L218
+    // https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L340
+    // scalastyle:on
+    if (e.isInstanceOf[Py4JException] &&
+      ("Cannot obtain a new communication channel" == e.getMessage ||
+        "Error while obtaining a new communication channel" == e.getMessage)) {
+      // Start a new thread to stop StreamingContext to avoid deadlock.
+      new Thread("Stop-StreamingContext") with Logging {
+        setDaemon(true)
+
+        override def run(): Unit = {
+          logError(
+            "Cannot connect to Python process. It's probably dead. Stopping StreamingContext.", e)
+          StreamingContext.getActive().foreach(_.stop(stopSparkContext = false))
+        }
+      }.start()
+    }
+  }
 }
 
 /**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 10d64f98ac71b322c5120e44a9ee7b734a145ffa..8d83dc8a8fc0429a5f6bb193ebc58db2c0f1a56f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -22,6 +22,7 @@ import scala.util.{Failure, Success, Try}
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
+import org.apache.spark.streaming.api.python.PythonDStream
 import org.apache.spark.streaming.util.RecurringTimer
 import org.apache.spark.util.{Clock, EventLoop, ManualClock, Utils}
 
@@ -252,6 +253,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
         jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
       case Failure(e) =>
         jobScheduler.reportError("Error generating jobs for time " + time, e)
+        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
     }
     eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
   }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index dbc50da21c701c161789652a7a2f0637e201d36f..98e099354a7dbdfed821d09a402c78645bae0421 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -28,6 +28,7 @@ import org.apache.spark.ExecutorAllocationClient
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.{PairRDDFunctions, RDD}
 import org.apache.spark.streaming._
+import org.apache.spark.streaming.api.python.PythonDStream
 import org.apache.spark.streaming.ui.UIUtils
 import org.apache.spark.util.{EventLoop, ThreadUtils}
 
@@ -217,6 +218,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
   private def handleError(msg: String, e: Throwable) {
     logError(msg, e)
     ssc.waiter.notifyError(e)
+    PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
   }
 
   private class JobHandler(job: Job) extends Runnable with Logging {