diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f91392b3510e227aca04b6a1bd42898ba0e7d6e6..66c226e4913621e11fd11a63898d026d35fa2f5e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -244,6 +244,10 @@ class SparkContext( localProperties.set(new Properties()) } + /** + * Set a local property that affects jobs submitted from this thread, such as the + * Spark fair scheduler pool. + */ def setLocalProperty(key: String, value: String) { if (localProperties.get() == null) { localProperties.set(new Properties()) @@ -255,6 +259,10 @@ class SparkContext( } } + /** + * Get a local property set in this thread, or null if it is missing. See + * [[org.apache.spark.SparkContext.setLocalProperty]]. + */ def getLocalProperty(key: String): String = Option(localProperties.get).map(_.getProperty(key)).getOrElse(null) @@ -265,7 +273,7 @@ class SparkContext( } /** - * Assigns a group id to all the jobs started by this thread until the group id is set to a + * Assigns a group ID to all the jobs started by this thread until the group ID is set to a * different value or cleared. * * Often, a unit of execution in an application consists of multiple Spark actions or jobs. @@ -288,7 +296,7 @@ class SparkContext( setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId) } - /** Clear the job group id and its description. */ + /** Clear the current thread's job group ID and its description. */ def clearJobGroup() { setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null) setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala index da30cf619a1d0ecfabf501faecc0e2b0f0a64738..b0dedc6f4eb135f5751414921d21eb4fd2110080 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala @@ -207,13 +207,13 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav * e.g. for the array * [1,10,20,50] the buckets are [1,10) [10,20) [20,50] * e.g 1<=x<10 , 10<=x<20, 20<=x<50 - * And on the input of 1 and 50 we would have a histogram of 1,0,0 - * + * And on the input of 1 and 50 we would have a histogram of 1,0,0 + * * Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can be switched * from an O(log n) inseration to O(1) per element. (where n = # buckets) if you set evenBuckets * to true. * buckets must be sorted and not contain any duplicates. - * buckets array must be at least two elements + * buckets array must be at least two elements * All NaN entries are treated the same. If you have a NaN bucket it must be * the maximum value of the last position and all NaN entries will be counted * in that bucket. @@ -225,6 +225,12 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav def histogram(buckets: Array[Double], evenBuckets: Boolean): Array[Long] = { srdd.histogram(buckets.map(_.toDouble), evenBuckets) } + + /** Assign a name to this RDD */ + def setName(name: String): JavaDoubleRDD = { + srdd.setName(name) + this + } } object JavaDoubleRDD { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 55c87450ac65ae5256cba054cf4ce7684773c592..0fb7e195b34c4a9b530e2f718fa98f86d391303d 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -647,6 +647,12 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = { rdd.countApproxDistinctByKey(relativeSD, numPartitions) } + + /** Assign a name to this RDD */ + def setName(name: String): JavaPairRDD[K, V] = { + rdd.setName(name) + this + } } object JavaPairRDD { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 037cd1c774691e6cd2122d03dea488d1a57f0517..7d48ce01cf2cc9606c786e14018ad8bacf2f7876 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -127,6 +127,12 @@ JavaRDDLike[T, JavaRDD[T]] { wrapRDD(rdd.subtract(other, p)) override def toString = rdd.toString + + /** Assign a name to this RDD */ + def setName(name: String): JavaRDD[T] = { + rdd.setName(name) + this + } } object JavaRDD { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 924d8af0602f4a7fa8e33b637ebd5af23059ad81..ebbbbd88061a1df7c205638f0872d7b9f724e107 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -244,6 +244,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { new java.util.ArrayList(arr) } + /** + * Return an array that contains all of the elements in this RDD. + */ + def toArray(): JList[T] = collect() + /** * Return an array that contains all of the elements in a specific partition of this RDD. */ @@ -455,4 +460,5 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD) + def name(): String = rdd.name } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index e93b10fd7eecb71b59908a54a765d0e23b7da7b1..7a6f044965027d26bd7af468eb225784c76ccc31 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -425,6 +425,51 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def clearCallSite() { sc.clearCallSite() } + + /** + * Set a local property that affects jobs submitted from this thread, such as the + * Spark fair scheduler pool. + */ + def setLocalProperty(key: String, value: String): Unit = sc.setLocalProperty(key, value) + + /** + * Get a local property set in this thread, or null if it is missing. See + * [[org.apache.spark.api.java.JavaSparkContext.setLocalProperty]]. + */ + def getLocalProperty(key: String): String = sc.getLocalProperty(key) + + /** + * Assigns a group ID to all the jobs started by this thread until the group ID is set to a + * different value or cleared. + * + * Often, a unit of execution in an application consists of multiple Spark actions or jobs. + * Application programmers can use this method to group all those jobs together and give a + * group description. Once set, the Spark web UI will associate such jobs with this group. + * + * The application can also use [[org.apache.spark.api.java.JavaSparkContext.cancelJobGroup]] + * to cancel all running jobs in this group. For example, + * {{{ + * // In the main thread: + * sc.setJobGroup("some_job_to_cancel", "some job description"); + * rdd.map(...).count(); + * + * // In a separate thread: + * sc.cancelJobGroup("some_job_to_cancel"); + * }}} + */ + def setJobGroup(groupId: String, description: String): Unit = sc.setJobGroup(groupId, description) + + /** Clear the current thread's job group ID and its description. */ + def clearJobGroup(): Unit = sc.clearJobGroup() + + /** + * Cancel active jobs for the specified group. See + * [[org.apache.spark.api.java.JavaSparkContext.setJobGroup]] for more information. + */ + def cancelJobGroup(groupId: String): Unit = sc.cancelJobGroup(groupId) + + /** Cancel all jobs that have been scheduled or are running. */ + def cancelAllJobs(): Unit = sc.cancelAllJobs() } object JavaSparkContext { @@ -436,5 +481,12 @@ object JavaSparkContext { * Find the JAR from which a given class was loaded, to make it easy for users to pass * their JARs to SparkContext. */ - def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray + def jarOfClass(cls: Class[_]): Array[String] = SparkContext.jarOfClass(cls).toArray + + /** + * Find the JAR that contains the class of a particular object, to make it easy for users + * to pass their JARs to SparkContext. In most cases you can call jarOfObject(this) in + * your driver program. + */ + def jarOfObject(obj: AnyRef): Array[String] = SparkContext.jarOfObject(obj).toArray }