From 5df99bd364561c6f4c02308149ba5eb71f89247e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh <viirya@gmail.com> Date: Fri, 7 Jul 2017 13:12:20 +0800 Subject: [PATCH] [SPARK-20703][SQL][FOLLOW-UP] Associate metrics with data writes onto DataFrameWriter operations ## What changes were proposed in this pull request? Remove time metrics since it seems no way to measure it in non per-row tracking. ## How was this patch tested? Existing tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #18558 from viirya/SPARK-20703-followup. --- .../command/DataWritingCommand.scala | 10 --------- .../datasources/FileFormatWriter.scala | 22 +++---------------- .../sql/hive/execution/SQLMetricsSuite.scala | 3 --- 3 files changed, 3 insertions(+), 32 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 0c381a2c02..700f7f81dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -30,7 +30,6 @@ trait DataWritingCommand extends RunnableCommand { override lazy val metrics: Map[String, SQLMetric] = { val sparkContext = SparkContext.getActive.get Map( - "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing time (ms)"), "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"), "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of written output"), "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), @@ -47,23 +46,14 @@ trait DataWritingCommand extends RunnableCommand { var numFiles = 0 var totalNumBytes: Long = 0L var totalNumOutput: Long = 0L - var totalWritingTime: Long = 0L writeSummaries.foreach { summary => numPartitions += summary.updatedPartitions.size numFiles += summary.numOutputFile totalNumBytes += summary.numOutputBytes totalNumOutput += summary.numOutputRows - totalWritingTime += summary.totalWritingTime } - val avgWritingTime = if (numFiles > 0) { - (totalWritingTime / numFiles).toLong - } else { - 0L - } - - metrics("avgTime").add(avgWritingTime) metrics("numFiles").add(numFiles) metrics("numOutputBytes").add(totalNumBytes) metrics("numOutputRows").add(totalNumOutput) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 6486663062..9eb9eae699 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -275,8 +275,6 @@ object FileFormatWriter extends Logging { /** * The data structures used to measure metrics during writing. */ - protected var totalWritingTime: Long = 0L - protected var timeOnCurrentFile: Long = 0L protected var numOutputRows: Long = 0L protected var numOutputBytes: Long = 0L @@ -343,9 +341,7 @@ object FileFormatWriter extends Logging { } val internalRow = iter.next() - val startTime = System.nanoTime() currentWriter.write(internalRow) - timeOnCurrentFile += (System.nanoTime() - startTime) recordsInFile += 1 } releaseResources() @@ -355,17 +351,13 @@ object FileFormatWriter extends Logging { updatedPartitions = Set.empty, numOutputFile = fileCounter + 1, numOutputBytes = numOutputBytes, - numOutputRows = numOutputRows, - totalWritingTime = totalWritingTime) + numOutputRows = numOutputRows) } override def releaseResources(): Unit = { if (currentWriter != null) { try { - val startTime = System.nanoTime() currentWriter.close() - totalWritingTime += (timeOnCurrentFile + System.nanoTime() - startTime) / 1000 / 1000 - timeOnCurrentFile = 0 numOutputBytes += getFileSize(taskAttemptContext.getConfiguration, currentPath) } finally { currentWriter = null @@ -504,9 +496,7 @@ object FileFormatWriter extends Logging { releaseResources() newOutputWriter(currentPartColsAndBucketId, getPartPath, fileCounter, updatedPartitions) } - val startTime = System.nanoTime() currentWriter.write(getOutputRow(row)) - timeOnCurrentFile += (System.nanoTime() - startTime) recordsInFile += 1 } if (currentPartColsAndBucketId != null) { @@ -519,17 +509,13 @@ object FileFormatWriter extends Logging { updatedPartitions = updatedPartitions.toSet, numOutputFile = totalFileCounter, numOutputBytes = numOutputBytes, - numOutputRows = numOutputRows, - totalWritingTime = totalWritingTime) + numOutputRows = numOutputRows) } override def releaseResources(): Unit = { if (currentWriter != null) { try { - val startTime = System.nanoTime() currentWriter.close() - totalWritingTime += (timeOnCurrentFile + System.nanoTime() - startTime) / 1000 / 1000 - timeOnCurrentFile = 0 numOutputBytes += getFileSize(taskAttemptContext.getConfiguration, currentPath) } finally { currentWriter = null @@ -547,11 +533,9 @@ object FileFormatWriter extends Logging { * @param numOutputFile the total number of files. * @param numOutputRows the number of output rows. * @param numOutputBytes the bytes of output data. - * @param totalWritingTime the total writing time in ms. */ case class ExecutedWriteSummary( updatedPartitions: Set[String], numOutputFile: Int, numOutputRows: Long, - numOutputBytes: Long, - totalWritingTime: Long) + numOutputBytes: Long) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala index 1ef1988d4c..24c038587d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala @@ -65,9 +65,6 @@ class SQLMetricsSuite extends SQLTestUtils with TestHiveSingleton { val totalNumBytesMetric = executedNode.metrics.find(_.name == "bytes of written output").get val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "").toInt assert(totalNumBytes > 0) - val writingTimeMetric = executedNode.metrics.find(_.name == "average writing time (ms)").get - val writingTime = metrics(writingTimeMetric.accumulatorId).replaceAll(",", "").toInt - assert(writingTime >= 0) } private def testMetricsNonDynamicPartition( -- GitLab