diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 9628e93274a1133c2fdb46bf9b4934ebc6ea8411..89544add74430c65e0df1485ce50de43ce34a944 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -23,6 +23,16 @@ import org.apache.spark.sql.types._
 
 case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
   def output = projectList.map(_.toAttribute)
+
+  override lazy val resolved: Boolean = {
+    val containsAggregatesOrGenerators = projectList.exists ( _.collect {
+        case agg: AggregateExpression => agg
+        case generator: Generator => generator
+      }.nonEmpty
+    )
+
+    !expressions.exists(!_.resolved) && childrenResolved && !containsAggregatesOrGenerators
+  }
 }
 
 /**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index e70c651e1486e7be39a1ae561a91d3a45f12aac1..aec7847356cd495140791db6754a79b978b06087 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Literal, Alias, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.types._
 
@@ -58,6 +58,17 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
     assert(caseInsensitiveAnalyze(plan).resolved)
   }
 
+  test("check project's resolved") {
+    assert(Project(testRelation.output, testRelation).resolved)
+
+    assert(!Project(Seq(UnresolvedAttribute("a")), testRelation).resolved)
+
+    val explode = Explode(Nil, AttributeReference("a", IntegerType, nullable = true)())
+    assert(!Project(Seq(Alias(explode, "explode")()), testRelation).resolved)
+
+    assert(!Project(Seq(Alias(Count(Literal(1)), "count")()), testRelation).resolved)
+  }
+
   test("analyze project") {
     assert(
       caseSensitiveAnalyze(Project(Seq(UnresolvedAttribute("a")), testRelation)) ===
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index e8d9eec3d88ff2f71c69096719a6623cd0be5e4e..ae03bc5e9953f6a574338784666ccbe7690766ed 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive.execution
 
-import org.apache.spark.sql.hive.HiveShim
+import org.apache.spark.sql.hive.{MetastoreRelation, HiveShim}
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
@@ -316,4 +316,34 @@ class SQLQuerySuite extends QueryTest {
 
     dropTempTable("data")
   }
+
+  test("logical.Project should not be resolved if it contains aggregates or generators") {
+    // This test is used to test the fix of SPARK-5875.
+    // The original issue was that Project's resolved will be true when it contains
+    // AggregateExpressions or Generators. However, in this case, the Project
+    // is not in a valid state (cannot be executed). Because of this bug, the analysis rule of
+    // PreInsertionCasts will actually start to work before ImplicitGenerate and then
+    // generates an invalid query plan.
+    val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i+1}]}"""))
+    jsonRDD(rdd).registerTempTable("data")
+    val originalConf = getConf("spark.sql.hive.convertCTAS", "false")
+    setConf("spark.sql.hive.convertCTAS", "false")
+
+    sql("CREATE TABLE explodeTest (key bigInt)")
+    table("explodeTest").queryExecution.analyzed match {
+      case metastoreRelation: MetastoreRelation => // OK
+      case _ =>
+        fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation")
+    }
+
+    sql(s"INSERT OVERWRITE TABLE explodeTest SELECT explode(a) AS val FROM data")
+    checkAnswer(
+      sql("SELECT key from explodeTest"),
+      (1 to 5).flatMap(i => Row(i) :: Row(i + 1) :: Nil)
+    )
+
+    sql("DROP TABLE explodeTest")
+    dropTempTable("data")
+    setConf("spark.sql.hive.convertCTAS", originalConf)
+  }
 }