diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3ddc0d5eeefb8ed2b1768fd17575734589bc682f..ee5637371fdca7a5b3025b490d21034da0fd8bb8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -427,9 +427,9 @@ class SparkContext(config: SparkConf) extends Logging { * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ - def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { + def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], - minSplits).map(pair => pair._2.toString) + minPartitions).map(pair => pair._2.toString) } /** @@ -457,9 +457,10 @@ class SparkContext(config: SparkConf) extends Logging { * * @note Small files are preferred, large file is also allowable, but may cause bad performance. * - * @param minSplits A suggestion value of the minimal splitting number for input data. + * @param minPartitions A suggestion value of the minimal splitting number for input data. */ - def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)] = { + def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): + RDD[(String, String)] = { val job = new NewHadoopJob(hadoopConfiguration) NewFileInputFormat.addInputPath(job, new Path(path)) val updateConf = job.getConfiguration @@ -469,7 +470,7 @@ class SparkContext(config: SparkConf) extends Logging { classOf[String], classOf[String], updateConf, - minSplits) + minPartitions) } /** @@ -481,7 +482,7 @@ class SparkContext(config: SparkConf) extends Logging { * @param inputFormatClass Class of the InputFormat * @param keyClass Class of the keys * @param valueClass Class of the values - * @param minSplits Minimum number of Hadoop Splits to generate. + * @param minPartitions Minimum number of Hadoop Splits to generate. * * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each * record, directly caching the returned RDD will create many references to the same object. @@ -493,11 +494,11 @@ class SparkContext(config: SparkConf) extends Logging { inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minSplits: Int = defaultMinSplits + minPartitions: Int = defaultMinPartitions ): RDD[(K, V)] = { // Add necessary security credentials to the JobConf before broadcasting it. SparkHadoopUtil.get.addCredentials(conf) - new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) + new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions) } /** Get an RDD for a Hadoop file with an arbitrary InputFormat @@ -512,7 +513,7 @@ class SparkContext(config: SparkConf) extends Logging { inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minSplits: Int = defaultMinSplits + minPartitions: Int = defaultMinPartitions ): RDD[(K, V)] = { // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) @@ -524,7 +525,7 @@ class SparkContext(config: SparkConf) extends Logging { inputFormatClass, keyClass, valueClass, - minSplits) + minPartitions) } /** @@ -532,7 +533,7 @@ class SparkContext(config: SparkConf) extends Logging { * values and the InputFormat so that users don't need to pass them directly. Instead, callers * can just write, for example, * {{{ - * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits) + * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minPartitions) * }}} * * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each @@ -541,13 +542,13 @@ class SparkContext(config: SparkConf) extends Logging { * a `map` function. */ def hadoopFile[K, V, F <: InputFormat[K, V]] - (path: String, minSplits: Int) + (path: String, minPartitions: Int) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { hadoopFile(path, fm.runtimeClass.asInstanceOf[Class[F]], km.runtimeClass.asInstanceOf[Class[K]], vm.runtimeClass.asInstanceOf[Class[V]], - minSplits) + minPartitions) } /** @@ -565,7 +566,7 @@ class SparkContext(config: SparkConf) extends Logging { */ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = - hadoopFile[K, V, F](path, defaultMinSplits) + hadoopFile[K, V, F](path, defaultMinPartitions) /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]] @@ -626,10 +627,10 @@ class SparkContext(config: SparkConf) extends Logging { def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], - minSplits: Int + minPartitions: Int ): RDD[(K, V)] = { val inputFormatClass = classOf[SequenceFileInputFormat[K, V]] - hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits) + hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions) } /** Get an RDD for a Hadoop SequenceFile with given key and value types. @@ -641,7 +642,7 @@ class SparkContext(config: SparkConf) extends Logging { * */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V] ): RDD[(K, V)] = - sequenceFile(path, keyClass, valueClass, defaultMinSplits) + sequenceFile(path, keyClass, valueClass, defaultMinPartitions) /** * Version of sequenceFile() for types implicitly convertible to Writables through a @@ -665,7 +666,7 @@ class SparkContext(config: SparkConf) extends Logging { * a `map` function. */ def sequenceFile[K, V] - (path: String, minSplits: Int = defaultMinSplits) + (path: String, minPartitions: Int = defaultMinPartitions) (implicit km: ClassTag[K], vm: ClassTag[V], kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]) : RDD[(K, V)] = { @@ -674,7 +675,7 @@ class SparkContext(config: SparkConf) extends Logging { val format = classOf[SequenceFileInputFormat[Writable, Writable]] val writables = hadoopFile(path, format, kc.writableClass(km).asInstanceOf[Class[Writable]], - vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits) + vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions) writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) } } @@ -688,9 +689,9 @@ class SparkContext(config: SparkConf) extends Logging { */ def objectFile[T: ClassTag]( path: String, - minSplits: Int = defaultMinSplits + minPartitions: Int = defaultMinPartitions ): RDD[T] = { - sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits) + sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions) .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes)) } @@ -1183,8 +1184,12 @@ class SparkContext(config: SparkConf) extends Logging { def defaultParallelism: Int = taskScheduler.defaultParallelism /** Default min number of partitions for Hadoop RDDs when not given by user */ + @deprecated("use defaultMinPartitions", "1.0.0") def defaultMinSplits: Int = math.min(defaultParallelism, 2) + /** Default min number of partitions for Hadoop RDDs when not given by user */ + def defaultMinPartitions: Int = math.min(defaultParallelism, 2) + private val nextShuffleId = new AtomicInteger(0) private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement() diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index e6a3f06b0ea42ecb463900d9ade66fd049e8b17c..cf30523ab523e7da3f4c8755e902b06476f2b3b0 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -109,9 +109,17 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */ def defaultParallelism: java.lang.Integer = sc.defaultParallelism - /** Default min number of partitions for Hadoop RDDs when not given by user */ + /** + * Default min number of partitions for Hadoop RDDs when not given by user. + * @deprecated As of Spark 1.0.0, defaultMinSplits is deprecated, use + * {@link #defaultMinPartitions()} instead + */ + @Deprecated def defaultMinSplits: java.lang.Integer = sc.defaultMinSplits + /** Default min number of partitions for Hadoop RDDs when not given by user */ + def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions + /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { implicit val ctag: ClassTag[T] = fakeClassTag @@ -153,7 +161,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ - def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits) + def textFile(path: String, minPartitions: Int): JavaRDD[String] = + sc.textFile(path, minPartitions) /** * Read a directory of text files from HDFS, a local file system (available on all nodes), or any @@ -180,17 +189,17 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * * @note Small files are preferred, large file is also allowable, but may cause bad performance. * - * @param minSplits A suggestion value of the minimal splitting number for input data. + * @param minPartitions A suggestion value of the minimal splitting number for input data. */ - def wholeTextFiles(path: String, minSplits: Int): JavaPairRDD[String, String] = - new JavaPairRDD(sc.wholeTextFiles(path, minSplits)) + def wholeTextFiles(path: String, minPartitions: Int): JavaPairRDD[String, String] = + new JavaPairRDD(sc.wholeTextFiles(path, minPartitions)) /** * Read a directory of text files from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI. Each file is read as a single record and returned in a * key-value pair, where the key is the path of each file, the value is the content of each file. * - * @see `wholeTextFiles(path: String, minSplits: Int)`. + * @see `wholeTextFiles(path: String, minPartitions: Int)`. */ def wholeTextFiles(path: String): JavaPairRDD[String, String] = new JavaPairRDD(sc.wholeTextFiles(path)) @@ -205,11 +214,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], - minSplits: Int + minPartitions: Int ): JavaPairRDD[K, V] = { implicit val ctagK: ClassTag[K] = ClassTag(keyClass) implicit val ctagV: ClassTag[V] = ClassTag(valueClass) - new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits)) + new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minPartitions)) } /** Get an RDD for a Hadoop SequenceFile. @@ -233,9 +242,9 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * slow if you use the default serializer (Java serialization), though the nice thing about it is * that there's very little effort required to save arbitrary objects. */ - def objectFile[T](path: String, minSplits: Int): JavaRDD[T] = { + def objectFile[T](path: String, minPartitions: Int): JavaRDD[T] = { implicit val ctag: ClassTag[T] = fakeClassTag - sc.objectFile(path, minSplits)(ctag) + sc.objectFile(path, minPartitions)(ctag) } /** @@ -265,11 +274,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork inputFormatClass: Class[F], keyClass: Class[K], valueClass: Class[V], - minSplits: Int + minPartitions: Int ): JavaPairRDD[K, V] = { implicit val ctagK: ClassTag[K] = ClassTag(keyClass) implicit val ctagV: ClassTag[V] = ClassTag(valueClass) - new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits)) + new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions)) } /** @@ -304,11 +313,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork inputFormatClass: Class[F], keyClass: Class[K], valueClass: Class[V], - minSplits: Int + minPartitions: Int ): JavaPairRDD[K, V] = { implicit val ctagK: ClassTag[K] = ClassTag(keyClass) implicit val ctagV: ClassTag[V] = ClassTag(valueClass) - new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)) + new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)) } /** Get an RDD for a Hadoop file with an arbitrary InputFormat diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index 80d055a89573be6906e588c4613f7322ac58f955..4cb450577796a67de9ecd3efaa935c5eb1f61d6b 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -48,14 +48,15 @@ private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[Str } /** - * Allow minSplits set by end-user in order to keep compatibility with old Hadoop API. + * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. */ - def setMaxSplitSize(context: JobContext, minSplits: Int) { + def setMaxSplitSize(context: JobContext, minPartitions: Int) { val files = listStatus(context) val totalLen = files.map { file => if (file.isDir) 0L else file.getLen }.sum - val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minSplits == 0) 1 else minSplits)).toLong + val maxSplitSize = Math.ceil(totalLen * 1.0 / + (if (minPartitions == 0) 1 else minPartitions)).toLong super.setMaxSplitSize(maxSplitSize) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 6811e1abb8b70c33df1d8711ce82f57f2b1d35f1..6547755764dcfb94a2ef6534a9759f52221a80ce 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -87,7 +87,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * @param inputFormatClass Storage format of the data to be read. * @param keyClass Class of the key associated with the inputFormatClass. * @param valueClass Class of the value associated with the inputFormatClass. - * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate. + * @param minPartitions Minimum number of HadoopRDD partitions (Hadoop Splits) to generate. */ @DeveloperApi class HadoopRDD[K, V]( @@ -97,7 +97,7 @@ class HadoopRDD[K, V]( inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minSplits: Int) + minPartitions: Int) extends RDD[(K, V)](sc, Nil) with Logging { def this( @@ -106,7 +106,7 @@ class HadoopRDD[K, V]( inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minSplits: Int) = { + minPartitions: Int) = { this( sc, sc.broadcast(new SerializableWritable(conf)) @@ -115,7 +115,7 @@ class HadoopRDD[K, V]( inputFormatClass, keyClass, valueClass, - minSplits) + minPartitions) } protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) @@ -169,7 +169,7 @@ class HadoopRDD[K, V]( if (inputFormat.isInstanceOf[Configurable]) { inputFormat.asInstanceOf[Configurable].setConf(jobConf) } - val inputSplits = inputFormat.getSplits(jobConf, minSplits) + val inputSplits = inputFormat.getSplits(jobConf, minPartitions) val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 8684b645bc361bccc1e5866374386c36c1690674..ac1ccc06f238ab5b5dfd5f7db6a94e4eb040ffad 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -158,7 +158,7 @@ private[spark] class WholeTextFileRDD( keyClass: Class[String], valueClass: Class[String], @transient conf: Configuration, - minSplits: Int) + minPartitions: Int) extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) { override def getPartitions: Array[Partition] = { @@ -169,7 +169,7 @@ private[spark] class WholeTextFileRDD( case _ => } val jobContext = newJobContext(conf, jobId) - inputFormat.setMaxSplitSize(jobContext, minSplits) + inputFormat.setMaxSplitSize(jobContext, minPartitions) val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[Partition](rawSplits.size) for (i <- 0 until rawSplits.size) { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index 2f3ac1039751515bba07d48ea15994f51515c828..3d6e7e0d5c95389c9ad5207df8fba0560039e12e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -57,7 +57,7 @@ object MLUtils { * @param labelParser parser for labels, default: 1.0 if label > 0.5 or 0.0 otherwise * @param numFeatures number of features, which will be determined from the input data if a * negative value is given. The default value is -1. - * @param minSplits min number of partitions, default: sc.defaultMinSplits + * @param minPartitions min number of partitions, default: sc.defaultMinPartitions * @return labeled data stored as an RDD[LabeledPoint] */ def loadLibSVMData( @@ -65,8 +65,8 @@ object MLUtils { path: String, labelParser: LabelParser, numFeatures: Int, - minSplits: Int): RDD[LabeledPoint] = { - val parsed = sc.textFile(path, minSplits) + minPartitions: Int): RDD[LabeledPoint] = { + val parsed = sc.textFile(path, minPartitions) .map(_.trim) .filter(!_.isEmpty) .map(_.split(' ')) @@ -101,7 +101,7 @@ object MLUtils { * with number of features determined automatically and the default number of partitions. */ def loadLibSVMData(sc: SparkContext, path: String): RDD[LabeledPoint] = - loadLibSVMData(sc, path, BinaryLabelParser, -1, sc.defaultMinSplits) + loadLibSVMData(sc, path, BinaryLabelParser, -1, sc.defaultMinPartitions) /** * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint], @@ -112,7 +112,7 @@ object MLUtils { sc: SparkContext, path: String, labelParser: LabelParser): RDD[LabeledPoint] = - loadLibSVMData(sc, path, labelParser, -1, sc.defaultMinSplits) + loadLibSVMData(sc, path, labelParser, -1, sc.defaultMinPartitions) /** * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint], @@ -124,7 +124,7 @@ object MLUtils { path: String, labelParser: LabelParser, numFeatures: Int): RDD[LabeledPoint] = - loadLibSVMData(sc, path, labelParser, numFeatures, sc.defaultMinSplits) + loadLibSVMData(sc, path, labelParser, numFeatures, sc.defaultMinPartitions) /** * :: Experimental :: diff --git a/python/pyspark/context.py b/python/pyspark/context.py index d8667e84fedff3dff95ea20bd8399d8fecc0ad66..f63cc4a55fb98b994d16e8aed1cd33a16d7bc2c4 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -248,14 +248,14 @@ class SparkContext(object): jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices) return RDD(jrdd, self, serializer) - def textFile(self, name, minSplits=None): + def textFile(self, name, minPartitions=None): """ Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings. """ - minSplits = minSplits or min(self.defaultParallelism, 2) - return RDD(self._jsc.textFile(name, minSplits), self, + minPartitions = minPartitions or min(self.defaultParallelism, 2) + return RDD(self._jsc.textFile(name, minPartitions), self, UTF8Deserializer()) def wholeTextFiles(self, path): diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 0da5eb754cb3fe1b5f7f36dbded6fc7d08a4a827..8cfde46186ca4ecff05d4c3c6ad2785de7465234 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -52,7 +52,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon // Choose the minimum number of splits. If mapred.map.tasks is set, then use that unless // it is smaller than what Spark suggests. private val _minSplitsPerRDD = math.max( - sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinSplits) + sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinPartitions) // TODO: set aws s3 credentials.