Skip to content
Snippets Groups Projects
Commit 2089e0e7 authored by zsxwing's avatar zsxwing Committed by Matei Zaharia
Browse files

SPARK-1482: Fix potential resource leaks in saveAsHadoopDataset and save...

...AsNewAPIHadoopDataset

`writer.close` should be put in the `finally` block to avoid potential resource leaks.

JIRA: https://issues.apache.org/jira/browse/SPARK-1482

Author: zsxwing <zsxwing@gmail.com>

Closes #400 from zsxwing/SPARK-1482 and squashes the following commits:

06b197a [zsxwing] SPARK-1482: Fix potential resource leaks in saveAsHadoopDataset and saveAsNewAPIHadoopDataset
parent c399baa0
No related branches found
No related tags found
No related merge requests found
......@@ -693,11 +693,15 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
while (iter.hasNext) {
val (k, v) = iter.next()
writer.write(k, v)
try {
while (iter.hasNext) {
val (k, v) = iter.next()
writer.write(k, v)
}
}
finally {
writer.close(hadoopContext)
}
writer.close(hadoopContext)
committer.commitTask(hadoopContext)
return 1
}
......@@ -750,15 +754,17 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
writer.setup(context.stageId, context.partitionId, attemptNumber)
writer.open()
var count = 0
while(iter.hasNext) {
val record = iter.next()
count += 1
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
try {
var count = 0
while(iter.hasNext) {
val record = iter.next()
count += 1
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
}
}
finally {
writer.close()
}
writer.close()
writer.commit()
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment