Skip to content
Snippets Groups Projects
Commit 7683982f authored by Evan Yu's avatar Evan Yu Committed by Sean Owen
Browse files

[SPARK-5860][CORE] JdbcRDD: overflow on large range with high number of partitions

Fix a overflow bug in JdbcRDD when calculating partitions for large BIGINT ids

Author: Evan Yu <ehotou@gmail.com>

Closes #4701 from hotou/SPARK-5860 and squashes the following commits:

9e038d1 [Evan Yu] [SPARK-5860][CORE] Prevent overflowing at the length level
7883ad9 [Evan Yu] [SPARK-5860][CORE] Prevent overflowing at the length level
c88755a [Evan Yu] [SPARK-5860][CORE] switch to BigInt instead of BigDecimal
4e9ff4f [Evan Yu] [SPARK-5860][CORE] JdbcRDD overflow on large range with high number of partitions
parent 7138816a
No related branches found
No related tags found
No related merge requests found
...@@ -62,11 +62,11 @@ class JdbcRDD[T: ClassTag]( ...@@ -62,11 +62,11 @@ class JdbcRDD[T: ClassTag](
override def getPartitions: Array[Partition] = { override def getPartitions: Array[Partition] = {
// bounds are inclusive, hence the + 1 here and - 1 on end // bounds are inclusive, hence the + 1 here and - 1 on end
val length = 1 + upperBound - lowerBound val length = BigInt(1) + upperBound - lowerBound
(0 until numPartitions).map(i => { (0 until numPartitions).map(i => {
val start = lowerBound + ((i * length) / numPartitions).toLong val start = lowerBound + ((i * length) / numPartitions)
val end = lowerBound + (((i + 1) * length) / numPartitions).toLong - 1 val end = lowerBound + (((i + 1) * length) / numPartitions) - 1
new JdbcPartition(i, start, end) new JdbcPartition(i, start.toLong, end.toLong)
}).toArray }).toArray
} }
......
...@@ -29,22 +29,42 @@ class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { ...@@ -29,22 +29,42 @@ class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver") Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
val conn = DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb;create=true") val conn = DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb;create=true")
try { try {
val create = conn.createStatement
create.execute(""" try {
CREATE TABLE FOO( val create = conn.createStatement
ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), create.execute("""
DATA INTEGER CREATE TABLE FOO(
)""") ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),
create.close() DATA INTEGER
val insert = conn.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)") )""")
(1 to 100).foreach { i => create.close()
insert.setInt(1, i * 2) val insert = conn.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)")
insert.executeUpdate (1 to 100).foreach { i =>
insert.setInt(1, i * 2)
insert.executeUpdate
}
insert.close()
} catch {
case e: SQLException if e.getSQLState == "X0Y32" =>
// table exists
} }
insert.close()
} catch { try {
case e: SQLException if e.getSQLState == "X0Y32" => val create = conn.createStatement
create.execute("CREATE TABLE BIGINT_TEST(ID BIGINT NOT NULL, DATA INTEGER)")
create.close()
val insert = conn.prepareStatement("INSERT INTO BIGINT_TEST VALUES(?,?)")
(1 to 100).foreach { i =>
insert.setLong(1, 100000000000000000L + 4000000000000000L * i)
insert.setInt(2, i)
insert.executeUpdate
}
insert.close()
} catch {
case e: SQLException if e.getSQLState == "X0Y32" =>
// table exists // table exists
}
} finally { } finally {
conn.close() conn.close()
} }
...@@ -62,6 +82,18 @@ class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { ...@@ -62,6 +82,18 @@ class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
assert(rdd.count === 100) assert(rdd.count === 100)
assert(rdd.reduce(_+_) === 10100) assert(rdd.reduce(_+_) === 10100)
} }
test("large id overflow") {
sc = new SparkContext("local", "test")
val rdd = new JdbcRDD(
sc,
() => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") },
"SELECT DATA FROM BIGINT_TEST WHERE ? <= ID AND ID <= ?",
1131544775L, 567279358897692673L, 20,
(r: ResultSet) => { r.getInt(1) } ).cache()
assert(rdd.count === 100)
assert(rdd.reduce(_+_) === 5050)
}
after { after {
try { try {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment