diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index e2c301603b4a5e59a422e4c54817a3d3fb68bf51..8c43a559409f2c9d1b49899fa214b9cd0a5cb1c8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -39,21 +39,24 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val startIndex: Long)
 private[spark]
 class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, Long)](prev) {
 
-  override def getPartitions: Array[Partition] = {
+  /** The start index of each partition. */
+  @transient private val startIndices: Array[Long] = {
     val n = prev.partitions.size
-    val startIndices: Array[Long] =
-      if (n == 0) {
-        Array[Long]()
-      } else if (n == 1) {
-        Array(0L)
-      } else {
-        prev.context.runJob(
-          prev,
-          Utils.getIteratorSize _,
-          0 until n - 1, // do not need to count the last partition
-          false
-        ).scanLeft(0L)(_ + _)
-      }
+    if (n == 0) {
+      Array[Long]()
+    } else if (n == 1) {
+      Array(0L)
+    } else {
+      prev.context.runJob(
+        prev,
+        Utils.getIteratorSize _,
+        0 until n - 1, // do not need to count the last partition
+        allowLocal = false
+      ).scanLeft(0L)(_ + _)
+    }
+  }
+
+  override def getPartitions: Array[Partition] = {
     firstParent[T].partitions.map(x => new ZippedWithIndexRDDPartition(x, startIndices(x.index)))
   }
 
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 6d2e696dc2fc44d1b1198850dea2c25ae5d6e06a..e079ca3b1e896152aa94813c86bdc8dd72a67669 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -739,6 +739,11 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     }
   }
 
+  test("zipWithIndex chained with other RDDs (SPARK-4433)") {
+    val count = sc.parallelize(0 until 10, 2).zipWithIndex().repartition(4).count()
+    assert(count === 10)
+  }
+
   test("zipWithUniqueId") {
     val n = 10
     val data = sc.parallelize(0 until n, 3)