diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index affc2018c43cbcf9e4ad48f707b2f8cc833ac1c4..b6ddf7437ea13d9a28e7a7067c64edeb384cdbb7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 import java.util.concurrent.locks.ReentrantLock
 
+import scala.collection.mutable.{Map => MutableMap}
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
@@ -148,15 +149,18 @@ class StreamExecution(
       "logicalPlan must be initialized in StreamExecutionThread " +
         s"but the current thread was ${Thread.currentThread}")
     var nextSourceId = 0L
+    val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]()
     val _logicalPlan = analyzedPlan.transform {
-      case StreamingRelation(dataSource, _, output) =>
-        // Materialize source to avoid creating it in every batch
-        val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
-        val source = dataSource.createSource(metadataPath)
-        nextSourceId += 1
-        // We still need to use the previous `output` instead of `source.schema` as attributes in
-        // "df.logicalPlan" has already used attributes of the previous `output`.
-        StreamingExecutionRelation(source, output)
+      case streamingRelation@StreamingRelation(dataSource, _, output) =>
+        toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
+          // Materialize source to avoid creating it in every batch
+          val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
+          val source = dataSource.createSource(metadataPath)
+          nextSourceId += 1
+          // We still need to use the previous `output` instead of `source.schema` as attributes in
+          // "df.logicalPlan" has already used attributes of the previous `output`.
+          StreamingExecutionRelation(source, output)
+        })
     }
     sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
     uniqueSources = sources.distinct
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 01ea62a9de4d5cc5745ea6ccf647c67b01cda924..1fc062974e185a9518d53ec896abd27a151ee34f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -71,6 +71,27 @@ class StreamSuite extends StreamTest {
       CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4, "four")))
   }
 
+  test("SPARK-20432: union one stream with itself") {
+    val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
+    val unioned = df.union(df)
+    withTempDir { outputDir =>
+      withTempDir { checkpointDir =>
+        val query =
+          unioned
+            .writeStream.format("parquet")
+            .option("checkpointLocation", checkpointDir.getAbsolutePath)
+            .start(outputDir.getAbsolutePath)
+        try {
+          query.processAllAvailable()
+          val outputDf = spark.read.parquet(outputDir.getAbsolutePath).as[Long]
+          checkDatasetUnorderly[Long](outputDf, (0L to 10L).union((0L to 10L)).toArray: _*)
+        } finally {
+          query.stop()
+        }
+      }
+    }
+  }
+
   test("union two streams") {
     val inputData1 = MemoryStream[Int]
     val inputData2 = MemoryStream[Int]
@@ -122,6 +143,33 @@ class StreamSuite extends StreamTest {
     assertDF(df)
   }
 
+  test("Within the same streaming query, one StreamingRelation should only be transformed to one " +
+    "StreamingExecutionRelation") {
+    val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load()
+    var query: StreamExecution = null
+    try {
+      query =
+        df.union(df)
+          .writeStream
+          .format("memory")
+          .queryName("memory")
+          .start()
+          .asInstanceOf[StreamingQueryWrapper]
+          .streamingQuery
+      query.awaitInitialization(streamingTimeout.toMillis)
+      val executionRelations =
+        query
+          .logicalPlan
+          .collect { case ser: StreamingExecutionRelation => ser }
+      assert(executionRelations.size === 2)
+      assert(executionRelations.distinct.size === 1)
+    } finally {
+      if (query != null) {
+        query.stop()
+      }
+    }
+  }
+
   test("unsupported queries") {
     val streamInput = MemoryStream[Int]
     val batchInput = Seq(1, 2, 3).toDS()