diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 0c381a2c02986a4d6645a32b90882d7a413c350b..700f7f81dc8a9f6b2a34268cfd486aef8b42b76d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -30,7 +30,6 @@ trait DataWritingCommand extends RunnableCommand { override lazy val metrics: Map[String, SQLMetric] = { val sparkContext = SparkContext.getActive.get Map( - "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing time (ms)"), "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"), "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of written output"), "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), @@ -47,23 +46,14 @@ trait DataWritingCommand extends RunnableCommand { var numFiles = 0 var totalNumBytes: Long = 0L var totalNumOutput: Long = 0L - var totalWritingTime: Long = 0L writeSummaries.foreach { summary => numPartitions += summary.updatedPartitions.size numFiles += summary.numOutputFile totalNumBytes += summary.numOutputBytes totalNumOutput += summary.numOutputRows - totalWritingTime += summary.totalWritingTime } - val avgWritingTime = if (numFiles > 0) { - (totalWritingTime / numFiles).toLong - } else { - 0L - } - - metrics("avgTime").add(avgWritingTime) metrics("numFiles").add(numFiles) metrics("numOutputBytes").add(totalNumBytes) metrics("numOutputRows").add(totalNumOutput) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 64866630623abedcf35ce1a5b27ab328b9499549..9eb9eae699e940bb6b7b8969968b083bbe9c2006 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -275,8 +275,6 @@ object FileFormatWriter extends Logging { /** * The data structures used to measure metrics during writing. */ - protected var totalWritingTime: Long = 0L - protected var timeOnCurrentFile: Long = 0L protected var numOutputRows: Long = 0L protected var numOutputBytes: Long = 0L @@ -343,9 +341,7 @@ object FileFormatWriter extends Logging { } val internalRow = iter.next() - val startTime = System.nanoTime() currentWriter.write(internalRow) - timeOnCurrentFile += (System.nanoTime() - startTime) recordsInFile += 1 } releaseResources() @@ -355,17 +351,13 @@ object FileFormatWriter extends Logging { updatedPartitions = Set.empty, numOutputFile = fileCounter + 1, numOutputBytes = numOutputBytes, - numOutputRows = numOutputRows, - totalWritingTime = totalWritingTime) + numOutputRows = numOutputRows) } override def releaseResources(): Unit = { if (currentWriter != null) { try { - val startTime = System.nanoTime() currentWriter.close() - totalWritingTime += (timeOnCurrentFile + System.nanoTime() - startTime) / 1000 / 1000 - timeOnCurrentFile = 0 numOutputBytes += getFileSize(taskAttemptContext.getConfiguration, currentPath) } finally { currentWriter = null @@ -504,9 +496,7 @@ object FileFormatWriter extends Logging { releaseResources() newOutputWriter(currentPartColsAndBucketId, getPartPath, fileCounter, updatedPartitions) } - val startTime = System.nanoTime() currentWriter.write(getOutputRow(row)) - timeOnCurrentFile += (System.nanoTime() - startTime) recordsInFile += 1 } if (currentPartColsAndBucketId != null) { @@ -519,17 +509,13 @@ object FileFormatWriter extends Logging { updatedPartitions = updatedPartitions.toSet, numOutputFile = totalFileCounter, numOutputBytes = numOutputBytes, - numOutputRows = numOutputRows, - totalWritingTime = totalWritingTime) + numOutputRows = numOutputRows) } override def releaseResources(): Unit = { if (currentWriter != null) { try { - val startTime = System.nanoTime() currentWriter.close() - totalWritingTime += (timeOnCurrentFile + System.nanoTime() - startTime) / 1000 / 1000 - timeOnCurrentFile = 0 numOutputBytes += getFileSize(taskAttemptContext.getConfiguration, currentPath) } finally { currentWriter = null @@ -547,11 +533,9 @@ object FileFormatWriter extends Logging { * @param numOutputFile the total number of files. * @param numOutputRows the number of output rows. * @param numOutputBytes the bytes of output data. - * @param totalWritingTime the total writing time in ms. */ case class ExecutedWriteSummary( updatedPartitions: Set[String], numOutputFile: Int, numOutputRows: Long, - numOutputBytes: Long, - totalWritingTime: Long) + numOutputBytes: Long) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala index 1ef1988d4c605c1840751a1e52f95c409f38a274..24c038587d1d677a70ca94328b25976b72431df2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala @@ -65,9 +65,6 @@ class SQLMetricsSuite extends SQLTestUtils with TestHiveSingleton { val totalNumBytesMetric = executedNode.metrics.find(_.name == "bytes of written output").get val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "").toInt assert(totalNumBytes > 0) - val writingTimeMetric = executedNode.metrics.find(_.name == "average writing time (ms)").get - val writingTime = metrics(writingTimeMetric.accumulatorId).replaceAll(",", "").toInt - assert(writingTime >= 0) } private def testMetricsNonDynamicPartition(