diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 19ca2bb6133122eaaf3c1e63f995bea4866aa2de..fb52a960e0765588a3775e9317ce812c4baa7a77 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -202,7 +202,10 @@ private[spark] object SerDeUtil extends Logging {
    * representation is serialized
    */
   def pairRDDToPython(rdd: RDD[(Any, Any)], batchSize: Int): RDD[Array[Byte]] = {
-    val (keyFailed, valueFailed) = checkPickle(rdd.first())
+    val (keyFailed, valueFailed) = rdd.take(1) match {
+      case Array() => (false, false)
+      case Array(first) => checkPickle(first)
+    }
 
     rdd.mapPartitions { iter =>
       val cleaned = iter.map { case (k, v) =>
@@ -229,10 +232,12 @@ private[spark] object SerDeUtil extends Logging {
     }
 
     val rdd = pythonToJava(pyRDD, batched).rdd
-    rdd.first match {
-      case obj if isPair(obj) =>
+    rdd.take(1) match {
+      case Array(obj) if isPair(obj) =>
         // we only accept (K, V)
-      case other => throw new SparkException(
+      case Array() =>
+        // we also accept empty collections
+      case Array(other) => throw new SparkException(
         s"RDD element of type ${other.getClass.getName} cannot be used")
     }
     rdd.map { obj =>
diff --git a/core/src/test/scala/org/apache/spark/api/python/SerDeUtilSuite.scala b/core/src/test/scala/org/apache/spark/api/python/SerDeUtilSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f8c39326145e122b2ec624cd2aacbce46fa7d4e5
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/api/python/SerDeUtilSuite.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SharedSparkContext
+
+class SerDeUtilSuite extends FunSuite with SharedSparkContext {
+
+  test("Converting an empty pair RDD to python does not throw an exception (SPARK-5441)") {
+    val emptyRdd = sc.makeRDD(Seq[(Any, Any)]())
+    SerDeUtil.pairRDDToPython(emptyRdd, 10)
+  }
+
+  test("Converting an empty python RDD to pair RDD does not throw an exception (SPARK-5441)") {
+    val emptyRdd = sc.makeRDD(Seq[(Any, Any)]())
+    val javaRdd = emptyRdd.toJavaRDD()
+    val pythonRdd = SerDeUtil.javaToPython(javaRdd)
+    SerDeUtil.pythonToPairRDD(pythonRdd, false)
+  }
+}
+