Skip to content
Snippets Groups Projects
Commit e02dc83a authored by Matei Zaharia's avatar Matei Zaharia
Browse files

IO optimizations

parent c40e7663
No related branches found
No related tags found
No related merge requests found
......@@ -65,11 +65,13 @@ object ZigZag {
class KryoSerializationStream(kryo: Kryo, buf: ByteBuffer, out: OutputStream)
extends SerializationStream {
val channel = Channels.newChannel(out)
def writeObject[T](t: T) {
kryo.writeClassAndObject(buf, t)
ZigZag.writeInt(buf.position(), out)
buf.flip()
Channels.newChannel(out).write(buf)
channel.write(buf)
buf.clear()
}
......
......@@ -12,6 +12,8 @@ import java.util.concurrent.atomic.AtomicReference
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
class ParallelShuffleFetcher extends ShuffleFetcher with Logging {
val parallelFetches = System.getProperty("spark.parallel.fetches", "3").toInt
......@@ -60,7 +62,7 @@ class ParallelShuffleFetcher extends ShuffleFetcher with Logging {
if (len == -1)
throw new SparkException("Content length was not specified by server")
val buf = new Array[Byte](len)
val in = conn.getInputStream()
val in = new FastBufferedInputStream(conn.getInputStream())
var pos = 0
while (pos < len) {
val n = in.read(buf, pos, len-pos)
......
......@@ -5,6 +5,8 @@ import java.io.FileOutputStream
import java.io.ObjectOutputStream
import java.util.{HashMap => JHashMap}
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
class ShuffleMapTask(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_], val partition: Int, locs: Seq[String])
extends DAGTask[String](stageId) with Logging {
......@@ -29,7 +31,7 @@ extends DAGTask[String](stageId) with Logging {
val ser = SparkEnv.get.serializer.newInstance()
for (i <- 0 until numOutputSplits) {
val file = LocalFileShuffle.getOutputFile(dep.shuffleId, partition, i)
val out = ser.outputStream(new BufferedOutputStream(new FileOutputStream(file)))
val out = ser.outputStream(new FastBufferedOutputStream(new FileOutputStream(file)))
val iter = buckets(i).entrySet().iterator()
while (iter.hasNext()) {
val entry = iter.next()
......
......@@ -6,6 +6,8 @@ import java.net.URL
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
class SimpleShuffleFetcher extends ShuffleFetcher with Logging {
def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit) {
......@@ -22,7 +24,8 @@ class SimpleShuffleFetcher extends ShuffleFetcher with Logging {
val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, reduceId)
// TODO: multithreaded fetch
// TODO: would be nice to retry multiple times
val inputStream = ser.inputStream(new URL(url).openStream())
val inputStream = ser.inputStream(
new FastBufferedInputStream(new URL(url).openStream()))
try {
while (true) {
val pair = inputStream.readObject().asInstanceOf[(K, V)]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment