diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index b828532aba7a37d8eed752666cda5c4a05c2fe49..7d31ac54a7177c0a6df8324ccc25616deec2d453 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -60,6 +60,8 @@ private[spark] class SparkUI private ( var appId: String = _ + private var streamingJobProgressListener: Option[SparkListener] = None + /** Initialize all components of the server. */ def initialize() { val jobsTab = new JobsTab(this) @@ -124,6 +126,12 @@ private[spark] class SparkUI private ( def getApplicationInfo(appId: String): Option[ApplicationInfo] = { getApplicationInfoList.find(_.id == appId) } + + def getStreamingJobProgressListener: Option[SparkListener] = streamingJobProgressListener + + def setStreamingJobProgressListener(sparkListener: SparkListener): Unit = { + streamingJobProgressListener = Option(sparkListener) + } } private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 20f5c2789af2930cd17c9faee8d3ccdb72af739f..2314d7f45cb21c31013e0d99fc18ee21a5c97fbf 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -38,8 +38,12 @@ object MimaExcludes { lazy val v22excludes = v21excludes ++ Seq( // [SPARK-18663][SQL] Simplify CountMinSketch aggregate implementation ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.util.sketch.CountMinSketch.toByteArray"), + // [SPARK-18949] [SQL] Add repairTable API to Catalog - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions") + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions"), + + // [SPARK-18537] Add a REST api to spark streaming + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.streaming.scheduler.StreamingListener.onStreamingStarted") ) // Exclude rules for 2.1.x diff --git a/streaming/src/main/java/org/apache/spark/status/api/v1/streaming/BatchStatus.java b/streaming/src/main/java/org/apache/spark/status/api/v1/streaming/BatchStatus.java new file mode 100644 index 0000000000000000000000000000000000000000..1bbca5a2259ddfc6d3207f70c0a633bcdc992577 --- /dev/null +++ b/streaming/src/main/java/org/apache/spark/status/api/v1/streaming/BatchStatus.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.streaming; + +import org.apache.spark.util.EnumUtil; + +public enum BatchStatus { + COMPLETED, + QUEUED, + PROCESSING; + + public static BatchStatus fromString(String str) { + return EnumUtil.parseIgnoreCase(BatchStatus.class, str); + } +} diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala new file mode 100644 index 0000000000000000000000000000000000000000..3a51ae609303aa3ed44a6b19094a869e713b7db8 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.streaming + +import java.util.{ArrayList => JArrayList, Arrays => JArrays, Date, List => JList} +import javax.ws.rs.{GET, Produces, QueryParam} +import javax.ws.rs.core.MediaType + +import org.apache.spark.status.api.v1.streaming.AllBatchesResource._ +import org.apache.spark.streaming.ui.StreamingJobProgressListener + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class AllBatchesResource(listener: StreamingJobProgressListener) { + + @GET + def batchesList(@QueryParam("status") statusParams: JList[BatchStatus]): Seq[BatchInfo] = { + batchInfoList(listener, statusParams).sortBy(- _.batchId) + } +} + +private[v1] object AllBatchesResource { + + def batchInfoList( + listener: StreamingJobProgressListener, + statusParams: JList[BatchStatus] = new JArrayList[BatchStatus]()): Seq[BatchInfo] = { + + listener.synchronized { + val statuses = + if (statusParams.isEmpty) JArrays.asList(BatchStatus.values(): _*) else statusParams + val statusToBatches = Seq( + BatchStatus.COMPLETED -> listener.retainedCompletedBatches, + BatchStatus.QUEUED -> listener.waitingBatches, + BatchStatus.PROCESSING -> listener.runningBatches + ) + + val batchInfos = for { + (status, batches) <- statusToBatches + batch <- batches if statuses.contains(status) + } yield { + val batchId = batch.batchTime.milliseconds + val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption + + new BatchInfo( + batchId = batchId, + batchTime = new Date(batchId), + status = status.toString, + batchDuration = listener.batchDuration, + inputSize = batch.numRecords, + schedulingDelay = batch.schedulingDelay, + processingTime = batch.processingDelay, + totalDelay = batch.totalDelay, + numActiveOutputOps = batch.numActiveOutputOp, + numCompletedOutputOps = batch.numCompletedOutputOp, + numFailedOutputOps = batch.numFailedOutputOp, + numTotalOutputOps = batch.outputOperations.size, + firstFailureReason = firstFailureReason + ) + } + + batchInfos + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala new file mode 100644 index 0000000000000000000000000000000000000000..0eb649f0e1b7229aff65fc95b3007e25b5cee4e2 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.streaming + +import java.util.Date +import javax.ws.rs.{GET, PathParam, Produces} +import javax.ws.rs.core.MediaType + +import org.apache.spark.status.api.v1.NotFoundException +import org.apache.spark.status.api.v1.streaming.AllOutputOperationsResource._ +import org.apache.spark.streaming.Time +import org.apache.spark.streaming.ui.StreamingJobProgressListener + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class AllOutputOperationsResource(listener: StreamingJobProgressListener) { + + @GET + def operationsList(@PathParam("batchId") batchId: Long): Seq[OutputOperationInfo] = { + outputOperationInfoList(listener, batchId).sortBy(_.outputOpId) + } +} + +private[v1] object AllOutputOperationsResource { + + def outputOperationInfoList( + listener: StreamingJobProgressListener, + batchId: Long): Seq[OutputOperationInfo] = { + + listener.synchronized { + listener.getBatchUIData(Time(batchId)) match { + case Some(batch) => + for ((opId, op) <- batch.outputOperations) yield { + val jobIds = batch.outputOpIdSparkJobIdPairs + .filter(_.outputOpId == opId).map(_.sparkJobId).toSeq.sorted + + new OutputOperationInfo( + outputOpId = opId, + name = op.name, + description = op.description, + startTime = op.startTime.map(new Date(_)), + endTime = op.endTime.map(new Date(_)), + duration = op.duration, + failureReason = op.failureReason, + jobIds = jobIds + ) + } + case None => throw new NotFoundException("unknown batch: " + batchId) + } + }.toSeq + } +} diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala new file mode 100644 index 0000000000000000000000000000000000000000..5a276a9236a0f861079f2b5b0fba3937089ee933 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.streaming + +import java.util.Date +import javax.ws.rs.{GET, Produces} +import javax.ws.rs.core.MediaType + +import org.apache.spark.status.api.v1.streaming.AllReceiversResource._ +import org.apache.spark.streaming.ui.StreamingJobProgressListener + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class AllReceiversResource(listener: StreamingJobProgressListener) { + + @GET + def receiversList(): Seq[ReceiverInfo] = { + receiverInfoList(listener).sortBy(_.streamId) + } +} + +private[v1] object AllReceiversResource { + + def receiverInfoList(listener: StreamingJobProgressListener): Seq[ReceiverInfo] = { + listener.synchronized { + listener.receivedRecordRateWithBatchTime.map { case (streamId, eventRates) => + + val receiverInfo = listener.receiverInfo(streamId) + val streamName = receiverInfo.map(_.name) + .orElse(listener.streamName(streamId)).getOrElse(s"Stream-$streamId") + val avgEventRate = + if (eventRates.isEmpty) None else Some(eventRates.map(_._2).sum / eventRates.size) + + val (errorTime, errorMessage, error) = receiverInfo match { + case None => (None, None, None) + case Some(info) => + val someTime = + if (info.lastErrorTime >= 0) Some(new Date(info.lastErrorTime)) else None + val someMessage = + if (info.lastErrorMessage.length > 0) Some(info.lastErrorMessage) else None + val someError = + if (info.lastError.length > 0) Some(info.lastError) else None + + (someTime, someMessage, someError) + } + + new ReceiverInfo( + streamId = streamId, + streamName = streamName, + isActive = receiverInfo.map(_.active), + executorId = receiverInfo.map(_.executorId), + executorHost = receiverInfo.map(_.location), + lastErrorTime = errorTime, + lastErrorMessage = errorMessage, + lastError = error, + avgEventRate = avgEventRate, + eventRates = eventRates + ) + }.toSeq + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala new file mode 100644 index 0000000000000000000000000000000000000000..e64830a9459b12a09c47c08dc4a5bb93013d11d1 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.streaming + +import javax.ws.rs.{Path, PathParam} + +import org.apache.spark.status.api.v1.UIRootFromServletContext + +@Path("/v1") +private[v1] class ApiStreamingApp extends UIRootFromServletContext { + + @Path("applications/{appId}/streaming") + def getStreamingRoot(@PathParam("appId") appId: String): ApiStreamingRootResource = { + uiRoot.withSparkUI(appId, None) { ui => + new ApiStreamingRootResource(ui) + } + } + + @Path("applications/{appId}/{attemptId}/streaming") + def getStreamingRoot( + @PathParam("appId") appId: String, + @PathParam("attemptId") attemptId: String): ApiStreamingRootResource = { + uiRoot.withSparkUI(appId, Some(attemptId)) { ui => + new ApiStreamingRootResource(ui) + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala new file mode 100644 index 0000000000000000000000000000000000000000..1ccd586c848bdbe92e516c27de7a472335f587b0 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.streaming + +import javax.ws.rs.Path + +import org.apache.spark.status.api.v1.NotFoundException +import org.apache.spark.streaming.ui.StreamingJobProgressListener +import org.apache.spark.ui.SparkUI + +private[v1] class ApiStreamingRootResource(ui: SparkUI) { + + import org.apache.spark.status.api.v1.streaming.ApiStreamingRootResource._ + + @Path("statistics") + def getStreamingStatistics(): StreamingStatisticsResource = { + new StreamingStatisticsResource(getListener(ui)) + } + + @Path("receivers") + def getReceivers(): AllReceiversResource = { + new AllReceiversResource(getListener(ui)) + } + + @Path("receivers/{streamId: \\d+}") + def getReceiver(): OneReceiverResource = { + new OneReceiverResource(getListener(ui)) + } + + @Path("batches") + def getBatches(): AllBatchesResource = { + new AllBatchesResource(getListener(ui)) + } + + @Path("batches/{batchId: \\d+}") + def getBatch(): OneBatchResource = { + new OneBatchResource(getListener(ui)) + } + + @Path("batches/{batchId: \\d+}/operations") + def getOutputOperations(): AllOutputOperationsResource = { + new AllOutputOperationsResource(getListener(ui)) + } + + @Path("batches/{batchId: \\d+}/operations/{outputOpId: \\d+}") + def getOutputOperation(): OneOutputOperationResource = { + new OneOutputOperationResource(getListener(ui)) + } + +} + +private[v1] object ApiStreamingRootResource { + def getListener(ui: SparkUI): StreamingJobProgressListener = { + ui.getStreamingJobProgressListener match { + case Some(listener) => listener.asInstanceOf[StreamingJobProgressListener] + case None => throw new NotFoundException("no streaming listener attached to " + ui.getAppName) + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala new file mode 100644 index 0000000000000000000000000000000000000000..d3c689c790cfcb618ebbd6e1a091778b67add420 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.streaming + +import javax.ws.rs.{GET, PathParam, Produces} +import javax.ws.rs.core.MediaType + +import org.apache.spark.status.api.v1.NotFoundException +import org.apache.spark.streaming.ui.StreamingJobProgressListener + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class OneBatchResource(listener: StreamingJobProgressListener) { + + @GET + def oneBatch(@PathParam("batchId") batchId: Long): BatchInfo = { + val someBatch = AllBatchesResource.batchInfoList(listener) + .find { _.batchId == batchId } + someBatch.getOrElse(throw new NotFoundException("unknown batch: " + batchId)) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala new file mode 100644 index 0000000000000000000000000000000000000000..aabcdb29b0d4c2517f175ee2e3df50d24c941920 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.streaming + +import javax.ws.rs.{GET, PathParam, Produces} +import javax.ws.rs.core.MediaType + +import org.apache.spark.status.api.v1.NotFoundException +import org.apache.spark.streaming.ui.StreamingJobProgressListener +import org.apache.spark.streaming.ui.StreamingJobProgressListener._ + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class OneOutputOperationResource(listener: StreamingJobProgressListener) { + + @GET + def oneOperation( + @PathParam("batchId") batchId: Long, + @PathParam("outputOpId") opId: OutputOpId): OutputOperationInfo = { + + val someOutputOp = AllOutputOperationsResource.outputOperationInfoList(listener, batchId) + .find { _.outputOpId == opId } + someOutputOp.getOrElse(throw new NotFoundException("unknown output operation: " + opId)) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala new file mode 100644 index 0000000000000000000000000000000000000000..c0cc99da3a9c7e14bddc0dbbd1e0d934974a6da4 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.streaming + +import javax.ws.rs.{GET, PathParam, Produces} +import javax.ws.rs.core.MediaType + +import org.apache.spark.status.api.v1.NotFoundException +import org.apache.spark.streaming.ui.StreamingJobProgressListener + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class OneReceiverResource(listener: StreamingJobProgressListener) { + + @GET + def oneReceiver(@PathParam("streamId") streamId: Int): ReceiverInfo = { + val someReceiver = AllReceiversResource.receiverInfoList(listener) + .find { _.streamId == streamId } + someReceiver.getOrElse(throw new NotFoundException("unknown receiver: " + streamId)) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala new file mode 100644 index 0000000000000000000000000000000000000000..6cff87be59ca8a8499fcdb134213ffe61a87abe4 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.streaming + +import java.util.Date +import javax.ws.rs.{GET, Produces} +import javax.ws.rs.core.MediaType + +import org.apache.spark.streaming.ui.StreamingJobProgressListener + +@Produces(Array(MediaType.APPLICATION_JSON)) +private[v1] class StreamingStatisticsResource(listener: StreamingJobProgressListener) { + + @GET + def streamingStatistics(): StreamingStatistics = { + listener.synchronized { + val batches = listener.retainedBatches + val avgInputRate = avgRate(batches.map(_.numRecords * 1000.0 / listener.batchDuration)) + val avgSchedulingDelay = avgTime(batches.flatMap(_.schedulingDelay)) + val avgProcessingTime = avgTime(batches.flatMap(_.processingDelay)) + val avgTotalDelay = avgTime(batches.flatMap(_.totalDelay)) + + new StreamingStatistics( + startTime = new Date(listener.startTime), + batchDuration = listener.batchDuration, + numReceivers = listener.numReceivers, + numActiveReceivers = listener.numActiveReceivers, + numInactiveReceivers = listener.numInactiveReceivers, + numTotalCompletedBatches = listener.numTotalCompletedBatches, + numRetainedCompletedBatches = listener.retainedCompletedBatches.size, + numActiveBatches = listener.numUnprocessedBatches, + numProcessedRecords = listener.numTotalProcessedRecords, + numReceivedRecords = listener.numTotalReceivedRecords, + avgInputRate = avgInputRate, + avgSchedulingDelay = avgSchedulingDelay, + avgProcessingTime = avgProcessingTime, + avgTotalDelay = avgTotalDelay + ) + } + } + + private def avgRate(data: Seq[Double]): Option[Double] = { + if (data.isEmpty) None else Some(data.sum / data.size) + } + + private def avgTime(data: Seq[Long]): Option[Long] = { + if (data.isEmpty) None else Some(data.sum / data.size) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/api.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/api.scala new file mode 100644 index 0000000000000000000000000000000000000000..403b0eb0b5d60c89a2107719a3e0e8d0f893810b --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/api.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.api.v1.streaming + +import java.util.Date + +import org.apache.spark.streaming.ui.StreamingJobProgressListener._ + +class StreamingStatistics private[spark]( + val startTime: Date, + val batchDuration: Long, + val numReceivers: Int, + val numActiveReceivers: Int, + val numInactiveReceivers: Int, + val numTotalCompletedBatches: Long, + val numRetainedCompletedBatches: Long, + val numActiveBatches: Long, + val numProcessedRecords: Long, + val numReceivedRecords: Long, + val avgInputRate: Option[Double], + val avgSchedulingDelay: Option[Long], + val avgProcessingTime: Option[Long], + val avgTotalDelay: Option[Long]) + +class ReceiverInfo private[spark]( + val streamId: Int, + val streamName: String, + val isActive: Option[Boolean], + val executorId: Option[String], + val executorHost: Option[String], + val lastErrorTime: Option[Date], + val lastErrorMessage: Option[String], + val lastError: Option[String], + val avgEventRate: Option[Double], + val eventRates: Seq[(Long, Double)]) + +class BatchInfo private[spark]( + val batchId: Long, + val batchTime: Date, + val status: String, + val batchDuration: Long, + val inputSize: Long, + val schedulingDelay: Option[Long], + val processingTime: Option[Long], + val totalDelay: Option[Long], + val numActiveOutputOps: Int, + val numCompletedOutputOps: Int, + val numFailedOutputOps: Int, + val numTotalOutputOps: Int, + val firstFailureReason: Option[String]) + +class OutputOperationInfo private[spark]( + val outputOpId: OutputOpId, + val name: String, + val description: String, + val startTime: Option[Date], + val endTime: Option[Date], + val duration: Option[Long], + val failureReason: Option[String], + val jobIds: Seq[SparkJobId]) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 444261da8de6a31e24431a607ab826cac96fad40..0a4c141e5be38abe0b55ddcab0ddaa22d8c7023d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -45,7 +45,8 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContextState._ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.Receiver -import org.apache.spark.streaming.scheduler.{ExecutorAllocationManager, JobScheduler, StreamingListener} +import org.apache.spark.streaming.scheduler. + {ExecutorAllocationManager, JobScheduler, StreamingListener, StreamingListenerStreamingStarted} import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, Utils} @@ -583,6 +584,8 @@ class StreamingContext private[streaming] ( scheduler.start() } state = StreamingContextState.ACTIVE + scheduler.listenerBus.post( + StreamingListenerStreamingStarted(System.currentTimeMillis())) } catch { case NonFatal(e) => logError("Error starting the context, marking it as stopped", e) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala index db0bae9958d6176c032175d75f41112f26917571..28cb86c9f31fdb914fc715df387997c980010e05 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala @@ -21,6 +21,9 @@ import org.apache.spark.streaming.Time private[streaming] trait PythonStreamingListener{ + /** Called when the streaming has been started */ + def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted) { } + /** Called when a receiver has been started */ def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted) { } @@ -51,6 +54,11 @@ private[streaming] trait PythonStreamingListener{ private[streaming] class PythonStreamingListenerWrapper(listener: PythonStreamingListener) extends JavaStreamingListener { + /** Called when the streaming has been started */ + override def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = { + listener.onStreamingStarted(streamingStarted) + } + /** Called when a receiver has been started */ override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = { listener.onReceiverStarted(receiverStarted) @@ -99,6 +107,9 @@ private[streaming] class PythonStreamingListenerWrapper(listener: PythonStreamin */ private[streaming] class JavaStreamingListener { + /** Called when the streaming has been started */ + def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = { } + /** Called when a receiver has been started */ def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = { } @@ -131,6 +142,9 @@ private[streaming] class JavaStreamingListener { */ private[streaming] sealed trait JavaStreamingListenerEvent +private[streaming] class JavaStreamingListenerStreamingStarted(val time: Long) + extends JavaStreamingListenerEvent + private[streaming] class JavaStreamingListenerBatchSubmitted(val batchInfo: JavaBatchInfo) extends JavaStreamingListenerEvent diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala index b109b9f1cbeae57d599543a32b68356947a22463..ee8370d262609ce6ee77648dff51ccdfa55a3819 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala @@ -77,6 +77,11 @@ private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: Jav ) } + override def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit = { + javaStreamingListener.onStreamingStarted( + new JavaStreamingListenerStreamingStarted(streamingStarted.time)) + } + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { javaStreamingListener.onReceiverStarted( new JavaStreamingListenerReceiverStarted(toJavaReceiverInfo(receiverStarted.receiverInfo))) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 58fc78d552106abcf8d07172b2419b458abf274c..b57f9b772f8c61997f38c32904666d86598480e0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -29,6 +29,9 @@ import org.apache.spark.util.Distribution @DeveloperApi sealed trait StreamingListenerEvent +@DeveloperApi +case class StreamingListenerStreamingStarted(time: Long) extends StreamingListenerEvent + @DeveloperApi case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent @@ -66,6 +69,9 @@ case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo) @DeveloperApi trait StreamingListener { + /** Called when the streaming has been started */ + def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { } + /** Called when a receiver has been started */ def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index 39f6e711a67ad6df2fc4fc302f6447890c5ddd45..5fb0bd057d0f1dc6f9e3b5dff8bc96155791aecd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -65,6 +65,8 @@ private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus) listener.onOutputOperationStarted(outputOperationStarted) case outputOperationCompleted: StreamingListenerOutputOperationCompleted => listener.onOutputOperationCompleted(outputOperationCompleted) + case streamingStarted: StreamingListenerStreamingStarted => + listener.onStreamingStarted(streamingStarted) case _ => } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 61f852a0d31a766235a9f47bf07c40b6080286c3..95f582106c71f9d5863154b7fda99c18bab9cad4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -27,7 +27,7 @@ import org.apache.spark.scheduler._ import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.scheduler._ -private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) +private[spark] class StreamingJobProgressListener(ssc: StreamingContext) extends SparkListener with StreamingListener { private val waitingBatchUIData = new HashMap[Time, BatchUIData] @@ -39,6 +39,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) private var totalProcessedRecords = 0L private val receiverInfos = new HashMap[Int, ReceiverInfo] + private var _startTime = -1L + // Because onJobStart and onBatchXXX messages are processed in different threads, // we may not be able to get the corresponding BatchUIData when receiving onJobStart. So here we // cannot use a map of (Time, BatchUIData). @@ -66,6 +68,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) val batchDuration = ssc.graph.batchDuration.milliseconds + override def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { + _startTime = streamingStarted.time + } + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { synchronized { receiverInfos(receiverStarted.receiverInfo.streamId) = receiverStarted.receiverInfo @@ -152,6 +158,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) } } + def startTime: Long = _startTime + def numReceivers: Int = synchronized { receiverInfos.size } @@ -267,7 +275,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) } } -private[streaming] object StreamingJobProgressListener { +private[spark] object StreamingJobProgressListener { type SparkJobId = Int type OutputOpId = Int } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 46cd3092e9061d615c55dcbd7aff852eba9bab18..7abafd6ba79084a425afcb4658775ff401ba9f28 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -143,7 +143,8 @@ private[ui] class StreamingPage(parent: StreamingTab) import StreamingPage._ private val listener = parent.listener - private val startTime = System.currentTimeMillis() + + private def startTime: Long = listener.startTime /** Render the page */ def render(request: HttpServletRequest): Seq[Node] = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala index c5f8aada3fc4af4bc15ecb26c87bcbaf2be3b197..9d1b82a6341b1c41d1f6523d2f9acb7f2662b9c6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala @@ -38,6 +38,7 @@ private[spark] class StreamingTab(val ssc: StreamingContext) ssc.addStreamingListener(listener) ssc.sc.addSparkListener(listener) + parent.setStreamingJobProgressListener(listener) attachPage(new StreamingPage(this)) attachPage(new BatchPage(this)) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java index ff0be820e0a9a17258959076a5591ef839ef4255..63fd6c4422444b3887067fa8839c2ac946cb544c 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java @@ -22,6 +22,11 @@ import org.apache.spark.streaming.api.java.*; public class JavaStreamingListenerAPISuite extends JavaStreamingListener { + @Override + public void onStreamingStarted(JavaStreamingListenerStreamingStarted streamingStarted) { + super.onStreamingStarted(streamingStarted); + } + @Override public void onReceiverStarted(JavaStreamingListenerReceiverStarted receiverStarted) { JavaReceiverInfo receiverInfo = receiverStarted.receiverInfo(); diff --git a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala index 0295e059f7bc259262e998dce00d52cb0ef47c78..cfd4323531bdb93b334f28e84d51cab0dba519f9 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala @@ -29,6 +29,10 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { val listener = new TestJavaStreamingListener() val listenerWrapper = new JavaStreamingListenerWrapper(listener) + val streamingStarted = StreamingListenerStreamingStarted(1000L) + listenerWrapper.onStreamingStarted(streamingStarted) + assert(listener.streamingStarted.time === streamingStarted.time) + val receiverStarted = StreamingListenerReceiverStarted(ReceiverInfo( streamId = 2, name = "test", @@ -249,6 +253,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { class TestJavaStreamingListener extends JavaStreamingListener { + var streamingStarted: JavaStreamingListenerStreamingStarted = null var receiverStarted: JavaStreamingListenerReceiverStarted = null var receiverError: JavaStreamingListenerReceiverError = null var receiverStopped: JavaStreamingListenerReceiverStopped = null @@ -258,6 +263,10 @@ class TestJavaStreamingListener extends JavaStreamingListener { var outputOperationStarted: JavaStreamingListenerOutputOperationStarted = null var outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted = null + override def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = { + this.streamingStarted = streamingStarted + } + override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = { this.receiverStarted = receiverStarted } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala index 46ab3ac8de3d4ae99d0c5d1a843a64a6cff3d7ed..56b400850fdd46b1a2129acaec91f92eac2c7bf0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala @@ -62,6 +62,10 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { 0 -> StreamInputInfo(0, 300L), 1 -> StreamInputInfo(1, 300L, Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "test"))) + // onStreamingStarted + listener.onStreamingStarted(StreamingListenerStreamingStarted(100L)) + listener.startTime should be (100) + // onBatchSubmitted val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty) listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))