From 95daff6459fc749949c2d71a0b7ab1c5be854f70 Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@databricks.com>
Date: Wed, 11 Nov 2015 10:17:54 -0800
Subject: [PATCH] [SPARK-11646] WholeTextFileRDD should return Text rather than
 String

If it returns Text, we can reuse this in Spark SQL to provide a WholeTextFile data source and directly convert the Text into UTF8String without extra string decoding and encoding.

Author: Reynold Xin <rxin@databricks.com>

Closes #9622 from rxin/SPARK-11646.
---
 .../scala/org/apache/spark/SparkContext.scala |  6 +-
 .../input/WholeTextFileInputFormat.scala      |  6 +-
 .../input/WholeTextFileRecordReader.scala     | 12 ++--
 .../org/apache/spark/rdd/NewHadoopRDD.scala   | 33 +----------
 .../apache/spark/rdd/WholeTextFileRDD.scala   | 56 +++++++++++++++++++
 5 files changed, 69 insertions(+), 44 deletions(-)
 create mode 100644 core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 67270c38fa..43a241686f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -863,10 +863,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     new WholeTextFileRDD(
       this,
       classOf[WholeTextFileInputFormat],
-      classOf[String],
-      classOf[String],
+      classOf[Text],
+      classOf[Text],
       updateConf,
-      minPartitions).setName(path)
+      minPartitions).setName(path).map(record => (record._1.toString, record._2.toString))
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
index 1ba34a1141..413408723b 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -20,6 +20,7 @@ package org.apache.spark.input
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapreduce.InputSplit
 import org.apache.hadoop.mapreduce.JobContext
 import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
@@ -33,14 +34,13 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext
  */
 
 private[spark] class WholeTextFileInputFormat
-  extends CombineFileInputFormat[String, String] with Configurable {
+  extends CombineFileInputFormat[Text, Text] with Configurable {
 
   override protected def isSplitable(context: JobContext, file: Path): Boolean = false
 
   override def createRecordReader(
       split: InputSplit,
-      context: TaskAttemptContext): RecordReader[String, String] = {
-
+      context: TaskAttemptContext): RecordReader[Text, Text] = {
     val reader =
       new ConfigurableCombineFileRecordReader(split, context, classOf[WholeTextFileRecordReader])
     reader.setConf(getConf)
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
index 31bde8a78f..b56b2aa88a 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
@@ -49,7 +49,7 @@ private[spark] class WholeTextFileRecordReader(
     split: CombineFileSplit,
     context: TaskAttemptContext,
     index: Integer)
-  extends RecordReader[String, String] with Configurable {
+  extends RecordReader[Text, Text] with Configurable {
 
   private[this] val path = split.getPath(index)
   private[this] val fs = path.getFileSystem(
@@ -58,8 +58,8 @@ private[spark] class WholeTextFileRecordReader(
   // True means the current file has been processed, then skip it.
   private[this] var processed = false
 
-  private[this] val key = path.toString
-  private[this] var value: String = null
+  private[this] val key: Text = new Text(path.toString)
+  private[this] var value: Text = null
 
   override def initialize(split: InputSplit, context: TaskAttemptContext): Unit = {}
 
@@ -67,9 +67,9 @@ private[spark] class WholeTextFileRecordReader(
 
   override def getProgress: Float = if (processed) 1.0f else 0.0f
 
-  override def getCurrentKey: String = key
+  override def getCurrentKey: Text = key
 
-  override def getCurrentValue: String = value
+  override def getCurrentValue: Text = value
 
   override def nextKeyValue(): Boolean = {
     if (!processed) {
@@ -83,7 +83,7 @@ private[spark] class WholeTextFileRecordReader(
         ByteStreams.toByteArray(fileIn)
       }
 
-      value = new Text(innerBuffer).toString
+      value = new Text(innerBuffer)
       Closeables.close(fileIn, false)
       processed = true
       true
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 9c4b70844b..d1960990da 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -28,12 +28,11 @@ import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.input.WholeTextFileInputFormat
 import org.apache.spark._
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
-import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, Utils}
+import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.storage.StorageLevel
 
@@ -59,7 +58,6 @@ private[spark] class NewHadoopPartition(
  * @param inputFormatClass Storage format of the data to be read.
  * @param keyClass Class of the key associated with the inputFormatClass.
  * @param valueClass Class of the value associated with the inputFormatClass.
- * @param conf The Hadoop configuration.
  */
 @DeveloperApi
 class NewHadoopRDD[K, V](
@@ -282,32 +280,3 @@ private[spark] object NewHadoopRDD {
     }
   }
 }
-
-private[spark] class WholeTextFileRDD(
-    sc : SparkContext,
-    inputFormatClass: Class[_ <: WholeTextFileInputFormat],
-    keyClass: Class[String],
-    valueClass: Class[String],
-    conf: Configuration,
-    minPartitions: Int)
-  extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {
-
-  override def getPartitions: Array[Partition] = {
-    val inputFormat = inputFormatClass.newInstance
-    val conf = getConf
-    inputFormat match {
-      case configurable: Configurable =>
-        configurable.setConf(conf)
-      case _ =>
-    }
-    val jobContext = newJobContext(conf, jobId)
-    inputFormat.setMinPartitions(jobContext, minPartitions)
-    val rawSplits = inputFormat.getSplits(jobContext).toArray
-    val result = new Array[Partition](rawSplits.size)
-    for (i <- 0 until rawSplits.size) {
-      result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
-    }
-    result
-  }
-}
-
diff --git a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala
new file mode 100644
index 0000000000..e3f14fe7ef
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.{Text, Writable}
+import org.apache.hadoop.mapreduce.InputSplit
+
+import org.apache.spark.{Partition, SparkContext}
+import org.apache.spark.input.WholeTextFileInputFormat
+
+/**
+ * An RDD that reads a bunch of text files in, and each text file becomes one record.
+ */
+private[spark] class WholeTextFileRDD(
+    sc : SparkContext,
+    inputFormatClass: Class[_ <: WholeTextFileInputFormat],
+    keyClass: Class[Text],
+    valueClass: Class[Text],
+    conf: Configuration,
+    minPartitions: Int)
+  extends NewHadoopRDD[Text, Text](sc, inputFormatClass, keyClass, valueClass, conf) {
+
+  override def getPartitions: Array[Partition] = {
+    val inputFormat = inputFormatClass.newInstance
+    val conf = getConf
+    inputFormat match {
+      case configurable: Configurable =>
+        configurable.setConf(conf)
+      case _ =>
+    }
+    val jobContext = newJobContext(conf, jobId)
+    inputFormat.setMinPartitions(jobContext, minPartitions)
+    val rawSplits = inputFormat.getSplits(jobContext).toArray
+    val result = new Array[Partition](rawSplits.size)
+    for (i <- 0 until rawSplits.size) {
+      result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+    }
+    result
+  }
+}
-- 
GitLab