diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
index 4fe7622bda00fe3e7c3e6dec496ab0e455dba898..e2267861e79df01a89b204df884417b2c58b9622 100644
--- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
@@ -62,11 +62,11 @@ class JdbcRDD[T: ClassTag](
 
   override def getPartitions: Array[Partition] = {
     // bounds are inclusive, hence the + 1 here and - 1 on end
-    val length = 1 + upperBound - lowerBound
+    val length = BigInt(1) + upperBound - lowerBound
     (0 until numPartitions).map(i => {
-      val start = lowerBound + ((i * length) / numPartitions).toLong
-      val end = lowerBound + (((i + 1) * length) / numPartitions).toLong - 1
-      new JdbcPartition(i, start, end)
+      val start = lowerBound + ((i * length) / numPartitions)
+      val end = lowerBound + (((i + 1) * length) / numPartitions) - 1
+      new JdbcPartition(i, start.toLong, end.toLong)
     }).toArray
   }
 
diff --git a/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
index 6138d0bbd57f6a4ec3fa30b4648647cb8d4e0f0a..0dc59888f73041145aa7a0b9584e19a877c9582c 100644
--- a/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala
@@ -29,22 +29,42 @@ class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
     Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
     val conn = DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb;create=true")
     try {
-      val create = conn.createStatement
-      create.execute("""
-        CREATE TABLE FOO(
-          ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),
-          DATA INTEGER
-        )""")
-      create.close()
-      val insert = conn.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)")
-      (1 to 100).foreach { i =>
-        insert.setInt(1, i * 2)
-        insert.executeUpdate
+
+      try {
+        val create = conn.createStatement
+        create.execute("""
+          CREATE TABLE FOO(
+            ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),
+            DATA INTEGER
+          )""")
+        create.close()
+        val insert = conn.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)")
+        (1 to 100).foreach { i =>
+          insert.setInt(1, i * 2)
+          insert.executeUpdate
+        }
+        insert.close()
+      } catch {
+        case e: SQLException if e.getSQLState == "X0Y32" =>
+        // table exists
       }
-      insert.close()
-    } catch {
-      case e: SQLException if e.getSQLState == "X0Y32" =>
+
+      try {
+        val create = conn.createStatement
+        create.execute("CREATE TABLE BIGINT_TEST(ID BIGINT NOT NULL, DATA INTEGER)")
+        create.close()
+        val insert = conn.prepareStatement("INSERT INTO BIGINT_TEST VALUES(?,?)")
+        (1 to 100).foreach { i =>
+          insert.setLong(1, 100000000000000000L +  4000000000000000L * i)
+          insert.setInt(2, i)
+          insert.executeUpdate
+        }
+        insert.close()
+      } catch {
+        case e: SQLException if e.getSQLState == "X0Y32" =>
         // table exists
+      }
+
     } finally {
       conn.close()
     }
@@ -62,6 +82,18 @@ class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
     assert(rdd.count === 100)
     assert(rdd.reduce(_+_) === 10100)
   }
+  
+  test("large id overflow") {
+    sc = new SparkContext("local", "test")
+    val rdd = new JdbcRDD(
+      sc,
+      () => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") },
+      "SELECT DATA FROM BIGINT_TEST WHERE ? <= ID AND ID <= ?",
+      1131544775L, 567279358897692673L, 20,
+      (r: ResultSet) => { r.getInt(1) } ).cache()
+    assert(rdd.count === 100)
+    assert(rdd.reduce(_+_) === 5050)
+  }
 
   after {
     try {