diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index dd768d18e8588aa9a059723d97801a5ccebebfd7..f2b9764b0f08890b7c7169885c2a53bae9aea549 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -441,8 +441,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
       g.copy(child = prunedChild(g.child, g.references))
 
     // Turn off `join` for Generate if no column from it's child is used
-    case p @ Project(_, g: Generate)
-        if g.join && !g.outer && p.references.subsetOf(g.generatedSet) =>
+    case p @ Project(_, g: Generate) if g.join && p.references.subsetOf(g.generatedSet) =>
       p.copy(child = g.copy(join = false))
 
     // Eliminate unneeded attributes from right side of a Left Existence Join.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 3ad757ebba8515a682bf1d7693b33eb3ccce75e2..f663d7b8a8f7b399476809579cb5589fe3370667 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -83,7 +83,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend
  * @param join  when true, each output row is implicitly joined with the input tuple that produced
  *              it.
  * @param outer when true, each input row will be output at least once, even if the output of the
- *              given `generator` is empty. `outer` has no effect when `join` is false.
+ *              given `generator` is empty.
  * @param qualifier Qualifier for the attributes of generator(UDTF)
  * @param generatorOutput The output schema of the Generator.
  * @param child Children logical plan node
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
index f87d05884b27666207b06db6ba8fdc5fda9e1821..1812a1152cb48e514286bbeee5d9f3be1cc42223 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
 private[execution] sealed case class LazyIterator(func: () => TraversableOnce[InternalRow])
   extends Iterator[InternalRow] {
 
-  lazy val results = func().toIterator
+  lazy val results: Iterator[InternalRow] = func().toIterator
   override def hasNext: Boolean = results.hasNext
   override def next(): InternalRow = results.next()
 }
@@ -50,7 +50,7 @@ private[execution] sealed case class LazyIterator(func: () => TraversableOnce[In
  * @param join  when true, each output row is implicitly joined with the input tuple that produced
  *              it.
  * @param outer when true, each input row will be output at least once, even if the output of the
- *              given `generator` is empty. `outer` has no effect when `join` is false.
+ *              given `generator` is empty.
  * @param generatorOutput the qualified output attributes of the generator of this node, which
  *                        constructed in analysis phase, and we can not change it, as the
  *                        parent node bound with it already.
@@ -78,15 +78,15 @@ case class GenerateExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
-  val boundGenerator = BindReferences.bindReference(generator, child.output)
+  val boundGenerator: Generator = BindReferences.bindReference(generator, child.output)
 
   protected override def doExecute(): RDD[InternalRow] = {
     // boundGenerator.terminate() should be triggered after all of the rows in the partition
-    val rows = if (join) {
-      child.execute().mapPartitionsInternal { iter =>
-        val generatorNullRow = new GenericInternalRow(generator.elementSchema.length)
+    val numOutputRows = longMetric("numOutputRows")
+    child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
+      val generatorNullRow = new GenericInternalRow(generator.elementSchema.length)
+      val rows = if (join) {
         val joinedRow = new JoinedRow
-
         iter.flatMap { row =>
           // we should always set the left (child output)
           joinedRow.withLeft(row)
@@ -101,18 +101,21 @@ case class GenerateExec(
           // keep it the same as Hive does
           joinedRow.withRight(row)
         }
+      } else {
+        iter.flatMap { row =>
+          val outputRows = boundGenerator.eval(row)
+          if (outer && outputRows.isEmpty) {
+            Seq(generatorNullRow)
+          } else {
+            outputRows
+          }
+        } ++ LazyIterator(boundGenerator.terminate)
       }
-    } else {
-      child.execute().mapPartitionsInternal { iter =>
-        iter.flatMap(boundGenerator.eval) ++ LazyIterator(boundGenerator.terminate)
-      }
-    }
 
-    val numOutputRows = longMetric("numOutputRows")
-    rows.mapPartitionsWithIndexInternal { (index, iter) =>
+      // Convert the rows to unsafe rows.
       val proj = UnsafeProjection.create(output, output)
       proj.initialize(index)
-      iter.map { r =>
+      rows.map { r =>
         numOutputRows += 1
         proj(r)
       }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
index cef5bbf0e85a743621c884dafb2cb81c2e19ec35..b9871afd59e4ffd5b13f012edd789034a9e7b98f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
@@ -91,7 +91,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
     val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList")
     checkAnswer(
       df.select(explode_outer('intList)),
-      Row(1) :: Row(2) :: Row(3) :: Nil)
+      Row(1) :: Row(2) :: Row(3) :: Row(null) :: Nil)
   }
 
   test("single posexplode") {
@@ -105,7 +105,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
     val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList")
     checkAnswer(
       df.select(posexplode_outer('intList)),
-      Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Nil)
+      Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Row(null, null) :: Nil)
   }
 
   test("explode and other columns") {
@@ -161,7 +161,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
 
     checkAnswer(
       df.select(explode_outer('intList).as('int)).select('int),
-      Row(1) :: Row(2) :: Row(3) :: Nil)
+      Row(1) :: Row(2) :: Row(3) :: Row(null) :: Nil)
 
     checkAnswer(
       df.select(explode('intList).as('int)).select(sum('int)),
@@ -182,7 +182,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
 
     checkAnswer(
       df.select(explode_outer('map)),
-      Row("a", "b") :: Row("c", "d") :: Nil)
+      Row("a", "b") :: Row(null, null) :: Row("c", "d") :: Nil)
   }
 
   test("explode on map with aliases") {
@@ -198,7 +198,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
 
     checkAnswer(
       df.select(explode_outer('map).as("key1" :: "value1" :: Nil)).select("key1", "value1"),
-      Row("a", "b") :: Nil)
+      Row("a", "b") :: Row(null, null) :: Nil)
   }
 
   test("self join explode") {
@@ -279,7 +279,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSQLContext {
     )
     checkAnswer(
       df2.selectExpr("inline_outer(col1)"),
-      Row(3, "4") :: Row(5, "6") :: Nil
+      Row(null, null) :: Row(3, "4") :: Row(5, "6") :: Nil
     )
   }