diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
index 65d910a0c3ffca5ddb435c4a59f77e59ba569b5c..bba29b2bdca4dbe9e63e3a1cf1c6be8f381ac055 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
@@ -267,4 +267,27 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
       assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}")
     }
   }
+
+  test("SPARK-4292 regression: result set iterator issue") {
+    withJdbcStatement() { statement =>
+      val dataFilePath =
+        Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
+
+      val queries = Seq(
+        "DROP TABLE IF EXISTS test_4292",
+        "CREATE TABLE test_4292(key INT, val STRING)",
+        s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test_4292")
+
+      queries.foreach(statement.execute)
+
+      val resultSet = statement.executeQuery("SELECT key FROM test_4292")
+
+      Seq(238, 86, 311, 27, 165).foreach { key =>
+        resultSet.next()
+        assert(resultSet.getInt(1) == key)
+      }
+
+      statement.executeQuery("DROP TABLE IF EXISTS test_4292")
+    }
+  }
 }
diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
index 8077d0ec46fd7c05715307a6aa34137f5b931251..e3ba9914c6cc0e96d2f7ede0a846e72f1f7578c1 100644
--- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
+++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
@@ -202,13 +202,12 @@ private[hive] class SparkExecuteStatementOperation(
         hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
       }
       iter = {
-        val resultRdd = result.queryExecution.toRdd
         val useIncrementalCollect =
           hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
         if (useIncrementalCollect) {
-          resultRdd.toLocalIterator
+          result.toLocalIterator
         } else {
-          resultRdd.collect().iterator
+          result.collect().iterator
         }
       }
       dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
index 2c1983de1d0d57bd3acccd0075adb0482519efa3..f2ceba828296bfa12db0004eb281415c0c4fe4bf 100644
--- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
+++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
@@ -87,13 +87,12 @@ private[hive] class SparkExecuteStatementOperation(
       val groupId = round(random * 1000000).toString
       hiveContext.sparkContext.setJobGroup(groupId, statement)
       iter = {
-        val resultRdd = result.queryExecution.toRdd
         val useIncrementalCollect =
           hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
         if (useIncrementalCollect) {
-          resultRdd.toLocalIterator
+          result.toLocalIterator
         } else {
-          resultRdd.collect().iterator
+          result.collect().iterator
         }
       }
       dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray