diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 26f61e25da4d3088672385fee048ecad11dab84d..b4caf68f0afaa8574cc32c681c29d4e769ff9143 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1002,6 +1002,15 @@ private[spark] object Utils extends Logging { } } + /** + * Lists files recursively. + */ + def recursiveList(f: File): Array[File] = { + require(f.isDirectory) + val current = f.listFiles + current ++ current.filter(_.isDirectory).flatMap(recursiveList) + } + /** * Delete a file or directory and its contents recursively. * Don't follow directories if they are symlinks. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala new file mode 100644 index 0000000000000000000000000000000000000000..0c381a2c02986a4d6645a32b90882d7a413c350b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.SparkContext +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.ExecutedWriteSummary +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} + +/** + * A special `RunnableCommand` which writes data out and updates metrics. + */ +trait DataWritingCommand extends RunnableCommand { + + override lazy val metrics: Map[String, SQLMetric] = { + val sparkContext = SparkContext.getActive.get + Map( + "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing time (ms)"), + "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"), + "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of written output"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numParts" -> SQLMetrics.createMetric(sparkContext, "number of dynamic part") + ) + } + + /** + * Callback function that update metrics collected from the writing operation. + */ + protected def updateWritingMetrics(writeSummaries: Seq[ExecutedWriteSummary]): Unit = { + val sparkContext = SparkContext.getActive.get + var numPartitions = 0 + var numFiles = 0 + var totalNumBytes: Long = 0L + var totalNumOutput: Long = 0L + var totalWritingTime: Long = 0L + + writeSummaries.foreach { summary => + numPartitions += summary.updatedPartitions.size + numFiles += summary.numOutputFile + totalNumBytes += summary.numOutputBytes + totalNumOutput += summary.numOutputRows + totalWritingTime += summary.totalWritingTime + } + + val avgWritingTime = if (numFiles > 0) { + (totalWritingTime / numFiles).toLong + } else { + 0L + } + + metrics("avgTime").add(avgWritingTime) + metrics("numFiles").add(numFiles) + metrics("numOutputBytes").add(totalNumBytes) + metrics("numOutputRows").add(totalNumOutput) + metrics("numParts").add(numPartitions) + + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 81bc93e7ebcf405894d77f2552ebc5cae36f0887..7cd4baef89e75e95e9fb4952ae2d13b7755e3969 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.debug._ +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types._ @@ -37,6 +38,11 @@ import org.apache.spark.sql.types._ * wrapped in `ExecutedCommand` during execution. */ trait RunnableCommand extends logical.Command { + + // The map used to record the metrics of running the command. This will be passed to + // `ExecutedCommand` during query planning. + lazy val metrics: Map[String, SQLMetric] = Map.empty + def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = { throw new NotImplementedError } @@ -49,8 +55,14 @@ trait RunnableCommand extends logical.Command { /** * A physical operator that executes the run method of a `RunnableCommand` and * saves the result to prevent multiple executions. + * + * @param cmd the `RunnableCommand` this operator will run. + * @param children the children physical plans ran by the `RunnableCommand`. */ case class ExecutedCommandExec(cmd: RunnableCommand, children: Seq[SparkPlan]) extends SparkPlan { + + override lazy val metrics: Map[String, SQLMetric] = cmd.metrics + /** * A concrete command should override this lazy field to wrap up any side effects caused by the * command or any other computation that should be evaluated exactly once. The value of this field diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 0daffa93b4747a640a7e4ea3e3a739fc9ab0fc95..64866630623abedcf35ce1a5b27ab328b9499549 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -22,7 +22,7 @@ import java.util.{Date, UUID} import scala.collection.mutable import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl @@ -82,7 +82,7 @@ object FileFormatWriter extends Logging { } /** The result of a successful write task. */ - private case class WriteTaskResult(commitMsg: TaskCommitMessage, updatedPartitions: Set[String]) + private case class WriteTaskResult(commitMsg: TaskCommitMessage, summary: ExecutedWriteSummary) /** * Basic work flow of this command is: @@ -104,7 +104,7 @@ object FileFormatWriter extends Logging { hadoopConf: Configuration, partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], - refreshFunction: (Seq[TablePartitionSpec]) => Unit, + refreshFunction: (Seq[ExecutedWriteSummary]) => Unit, options: Map[String, String]): Unit = { val job = Job.getInstance(hadoopConf) @@ -196,12 +196,10 @@ object FileFormatWriter extends Logging { }) val commitMsgs = ret.map(_.commitMsg) - val updatedPartitions = ret.flatMap(_.updatedPartitions) - .distinct.map(PartitioningUtils.parsePathFragment) committer.commitJob(job, commitMsgs) logInfo(s"Job ${job.getJobID} committed.") - refreshFunction(updatedPartitions) + refreshFunction(ret.map(_.summary)) } catch { case cause: Throwable => logError(s"Aborting job ${job.getJobID}.", cause) committer.abortJob(job) @@ -247,9 +245,9 @@ object FileFormatWriter extends Logging { try { Utils.tryWithSafeFinallyAndFailureCallbacks(block = { // Execute the task to write rows out and commit the task. - val outputPartitions = writeTask.execute(iterator) + val summary = writeTask.execute(iterator) writeTask.releaseResources() - WriteTaskResult(committer.commitTask(taskAttemptContext), outputPartitions) + WriteTaskResult(committer.commitTask(taskAttemptContext), summary) })(catchBlock = { // If there is an error, release resource and then abort the task try { @@ -273,12 +271,36 @@ object FileFormatWriter extends Logging { * automatically trigger task aborts. */ private trait ExecuteWriteTask { + /** - * Writes data out to files, and then returns the list of partition strings written out. - * The list of partitions is sent back to the driver and used to update the catalog. + * The data structures used to measure metrics during writing. */ - def execute(iterator: Iterator[InternalRow]): Set[String] + protected var totalWritingTime: Long = 0L + protected var timeOnCurrentFile: Long = 0L + protected var numOutputRows: Long = 0L + protected var numOutputBytes: Long = 0L + + /** + * Writes data out to files, and then returns the summary of relative information which + * includes the list of partition strings written out. The list of partitions is sent back + * to the driver and used to update the catalog. Other information will be sent back to the + * driver too and used to update the metrics in UI. + */ + def execute(iterator: Iterator[InternalRow]): ExecutedWriteSummary def releaseResources(): Unit + + /** + * A helper function used to determine the size in bytes of a written file. + */ + protected def getFileSize(conf: Configuration, filePath: String): Long = { + if (filePath != null) { + val path = new Path(filePath) + val fs = path.getFileSystem(conf) + fs.getFileStatus(path).getLen() + } else { + 0L + } + } } /** Writes data to a single directory (used for non-dynamic-partition writes). */ @@ -288,24 +310,26 @@ object FileFormatWriter extends Logging { committer: FileCommitProtocol) extends ExecuteWriteTask { private[this] var currentWriter: OutputWriter = _ + private[this] var currentPath: String = _ private def newOutputWriter(fileCounter: Int): Unit = { val ext = description.outputWriterFactory.getFileExtension(taskAttemptContext) - val tmpFilePath = committer.newTaskTempFile( + currentPath = committer.newTaskTempFile( taskAttemptContext, None, f"-c$fileCounter%03d" + ext) currentWriter = description.outputWriterFactory.newInstance( - path = tmpFilePath, + path = currentPath, dataSchema = description.dataColumns.toStructType, context = taskAttemptContext) } - override def execute(iter: Iterator[InternalRow]): Set[String] = { + override def execute(iter: Iterator[InternalRow]): ExecutedWriteSummary = { var fileCounter = 0 var recordsInFile: Long = 0L newOutputWriter(fileCounter) + while (iter.hasNext) { if (description.maxRecordsPerFile > 0 && recordsInFile >= description.maxRecordsPerFile) { fileCounter += 1 @@ -314,21 +338,35 @@ object FileFormatWriter extends Logging { recordsInFile = 0 releaseResources() + numOutputRows += recordsInFile newOutputWriter(fileCounter) } val internalRow = iter.next() + val startTime = System.nanoTime() currentWriter.write(internalRow) + timeOnCurrentFile += (System.nanoTime() - startTime) recordsInFile += 1 } releaseResources() - Set.empty + numOutputRows += recordsInFile + + ExecutedWriteSummary( + updatedPartitions = Set.empty, + numOutputFile = fileCounter + 1, + numOutputBytes = numOutputBytes, + numOutputRows = numOutputRows, + totalWritingTime = totalWritingTime) } override def releaseResources(): Unit = { if (currentWriter != null) { try { + val startTime = System.nanoTime() currentWriter.close() + totalWritingTime += (timeOnCurrentFile + System.nanoTime() - startTime) / 1000 / 1000 + timeOnCurrentFile = 0 + numOutputBytes += getFileSize(taskAttemptContext.getConfiguration, currentPath) } finally { currentWriter = null } @@ -348,6 +386,8 @@ object FileFormatWriter extends Logging { // currentWriter is initialized whenever we see a new key private var currentWriter: OutputWriter = _ + private var currentPath: String = _ + /** Expressions that given partition columns build a path string like: col1=val/col2=val/... */ private def partitionPathExpression: Seq[Expression] = { desc.partitionColumns.zipWithIndex.flatMap { case (c, i) => @@ -403,19 +443,19 @@ object FileFormatWriter extends Logging { case _ => None } - val path = if (customPath.isDefined) { + currentPath = if (customPath.isDefined) { committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext) } else { committer.newTaskTempFile(taskAttemptContext, partDir, ext) } currentWriter = desc.outputWriterFactory.newInstance( - path = path, + path = currentPath, dataSchema = desc.dataColumns.toStructType, context = taskAttemptContext) } - override def execute(iter: Iterator[InternalRow]): Set[String] = { + override def execute(iter: Iterator[InternalRow]): ExecutedWriteSummary = { val getPartitionColsAndBucketId = UnsafeProjection.create( desc.partitionColumns ++ desc.bucketIdExpression, desc.allColumns) @@ -429,15 +469,22 @@ object FileFormatWriter extends Logging { // If anything below fails, we should abort the task. var recordsInFile: Long = 0L var fileCounter = 0 + var totalFileCounter = 0 var currentPartColsAndBucketId: UnsafeRow = null val updatedPartitions = mutable.Set[String]() + for (row <- iter) { val nextPartColsAndBucketId = getPartitionColsAndBucketId(row) if (currentPartColsAndBucketId != nextPartColsAndBucketId) { + if (currentPartColsAndBucketId != null) { + totalFileCounter += (fileCounter + 1) + } + // See a new partition or bucket - write to a new partition dir (or a new bucket file). currentPartColsAndBucketId = nextPartColsAndBucketId.copy() logDebug(s"Writing partition: $currentPartColsAndBucketId") + numOutputRows += recordsInFile recordsInFile = 0 fileCounter = 0 @@ -447,6 +494,8 @@ object FileFormatWriter extends Logging { recordsInFile >= desc.maxRecordsPerFile) { // Exceeded the threshold in terms of the number of records per file. // Create a new file by increasing the file counter. + + numOutputRows += recordsInFile recordsInFile = 0 fileCounter += 1 assert(fileCounter < MAX_FILE_COUNTER, @@ -455,18 +504,33 @@ object FileFormatWriter extends Logging { releaseResources() newOutputWriter(currentPartColsAndBucketId, getPartPath, fileCounter, updatedPartitions) } - + val startTime = System.nanoTime() currentWriter.write(getOutputRow(row)) + timeOnCurrentFile += (System.nanoTime() - startTime) recordsInFile += 1 } + if (currentPartColsAndBucketId != null) { + totalFileCounter += (fileCounter + 1) + } releaseResources() - updatedPartitions.toSet + numOutputRows += recordsInFile + + ExecutedWriteSummary( + updatedPartitions = updatedPartitions.toSet, + numOutputFile = totalFileCounter, + numOutputBytes = numOutputBytes, + numOutputRows = numOutputRows, + totalWritingTime = totalWritingTime) } override def releaseResources(): Unit = { if (currentWriter != null) { try { + val startTime = System.nanoTime() currentWriter.close() + totalWritingTime += (timeOnCurrentFile + System.nanoTime() - startTime) / 1000 / 1000 + timeOnCurrentFile = 0 + numOutputBytes += getFileSize(taskAttemptContext.getConfiguration, currentPath) } finally { currentWriter = null } @@ -474,3 +538,20 @@ object FileFormatWriter extends Logging { } } } + +/** + * Wrapper class for the metrics of writing data out. + * + * @param updatedPartitions the partitions updated during writing data out. Only valid + * for dynamic partition. + * @param numOutputFile the total number of files. + * @param numOutputRows the number of output rows. + * @param numOutputBytes the bytes of output data. + * @param totalWritingTime the total writing time in ms. + */ +case class ExecutedWriteSummary( + updatedPartitions: Set[String], + numOutputFile: Int, + numOutputRows: Long, + numOutputBytes: Long, + totalWritingTime: Long) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index ab26f2affbce5d7523231d933848fa38b74fbfac..0031567d3d2884cfbd3a59a5b102b68c833a0f01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -21,6 +21,7 @@ import java.io.IOException import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.SparkContext import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition} @@ -29,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} /** * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending. @@ -53,7 +55,7 @@ case class InsertIntoHadoopFsRelationCommand( mode: SaveMode, catalogTable: Option[CatalogTable], fileIndex: Option[FileIndex]) - extends RunnableCommand { + extends DataWritingCommand { import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName override def children: Seq[LogicalPlan] = query :: Nil @@ -123,8 +125,16 @@ case class InsertIntoHadoopFsRelationCommand( if (doInsertion) { - // Callback for updating metastore partition metadata after the insertion job completes. - def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = { + // Callback for updating metric and metastore partition metadata + // after the insertion job completes. + def refreshCallback(summary: Seq[ExecutedWriteSummary]): Unit = { + val updatedPartitions = summary.flatMap(_.updatedPartitions) + .distinct.map(PartitioningUtils.parsePathFragment) + + // Updating metrics. + updateWritingMetrics(summary) + + // Updating metastore partition metadata. if (partitionsTrackedByCatalog) { val newPartitions = updatedPartitions.toSet -- initialMatchingPartitions if (newPartitions.nonEmpty) { @@ -154,7 +164,7 @@ case class InsertIntoHadoopFsRelationCommand( hadoopConf = hadoopConf, partitionColumns = partitionColumns, bucketSpec = bucketSpec, - refreshFunction = refreshPartitionsCallback, + refreshFunction = refreshCallback, options = options) // refresh cached files in FileIndex diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index a2f3afe3ce23696348e5b2300fefaf57120a726f..6f998aa60faf50d3e0a947850a79e6d0f8a5f396 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -91,15 +91,15 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext { withTempDir { f => spark.range(start = 0, end = 4, step = 1, numPartitions = 1) .write.option("maxRecordsPerFile", 1).mode("overwrite").parquet(f.getAbsolutePath) - assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4) + assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4) spark.range(start = 0, end = 4, step = 1, numPartitions = 1) .write.option("maxRecordsPerFile", 2).mode("overwrite").parquet(f.getAbsolutePath) - assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 2) + assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 2) spark.range(start = 0, end = 4, step = 1, numPartitions = 1) .write.option("maxRecordsPerFile", -1).mode("overwrite").parquet(f.getAbsolutePath) - assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 1) + assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 1) } } @@ -111,7 +111,7 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext { .option("maxRecordsPerFile", 1) .mode("overwrite") .parquet(f.getAbsolutePath) - assert(recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4) + assert(Utils.recursiveList(f).count(_.getAbsolutePath.endsWith("parquet")) == 4) } } @@ -138,14 +138,14 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext { val df = Seq((1, ts)).toDF("i", "ts") withTempPath { f => df.write.partitionBy("ts").parquet(f.getAbsolutePath) - val files = recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet")) + val files = Utils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet")) assert(files.length == 1) checkPartitionValues(files.head, "2016-12-01 00:00:00") } withTempPath { f => df.write.option(DateTimeUtils.TIMEZONE_OPTION, "GMT") .partitionBy("ts").parquet(f.getAbsolutePath) - val files = recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet")) + val files = Utils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet")) assert(files.length == 1) // use timeZone option "GMT" to format partition value. checkPartitionValues(files.head, "2016-12-01 08:00:00") @@ -153,18 +153,11 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext { withTempPath { f => withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "GMT") { df.write.partitionBy("ts").parquet(f.getAbsolutePath) - val files = recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet")) + val files = Utils.recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet")) assert(files.length == 1) // if there isn't timeZone option, then use session local timezone. checkPartitionValues(files.head, "2016-12-01 08:00:00") } } } - - /** Lists files recursively. */ - private def recursiveList(f: File): Array[File] = { - require(f.isDirectory) - val current = f.listFiles - current ++ current.filter(_.isDirectory).flatMap(recursiveList) - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 223d37523239358f88c3f61ea7a2da722d21c3a8..cd263e8b6df8e299d4d56942d88d979469570393 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -31,14 +31,16 @@ import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.spark.SparkContext import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.command.{CommandUtils, RunnableCommand} +import org.apache.spark.sql.execution.command.{CommandUtils, DataWritingCommand} import org.apache.spark.sql.execution.datasources.FileFormatWriter +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion} @@ -80,7 +82,7 @@ case class InsertIntoHiveTable( partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, - ifPartitionNotExists: Boolean) extends RunnableCommand { + ifPartitionNotExists: Boolean) extends DataWritingCommand { override def children: Seq[LogicalPlan] = query :: Nil @@ -354,7 +356,7 @@ case class InsertIntoHiveTable( hadoopConf = hadoopConf, partitionColumns = partitionAttributes, bucketSpec = None, - refreshFunction = _ => (), + refreshFunction = updateWritingMetrics, options = Map.empty) if (partition.nonEmpty) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..1ef1988d4c605c1840751a1e52f95c409f38a274 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.File + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.util.Utils + +class SQLMetricsSuite extends SQLTestUtils with TestHiveSingleton { + import spark.implicits._ + + /** + * Get execution metrics for the SQL execution and verify metrics values. + * + * @param metricsValues the expected metric values (numFiles, numPartitions, numOutputRows). + * @param func the function can produce execution id after running. + */ + private def verifyWriteDataMetrics(metricsValues: Seq[Int])(func: => Unit): Unit = { + val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet + // Run the given function to trigger query execution. + func + spark.sparkContext.listenerBus.waitUntilEmpty(10000) + val executionIds = + spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds) + assert(executionIds.size == 1) + val executionId = executionIds.head + + val executionData = spark.sharedState.listener.getExecution(executionId).get + val executedNode = executionData.physicalPlanGraph.nodes.head + + val metricsNames = Seq( + "number of written files", + "number of dynamic part", + "number of output rows") + + val metrics = spark.sharedState.listener.getExecutionMetrics(executionId) + + metricsNames.zip(metricsValues).foreach { case (metricsName, expected) => + val sqlMetric = executedNode.metrics.find(_.name == metricsName) + assert(sqlMetric.isDefined) + val accumulatorId = sqlMetric.get.accumulatorId + val metricValue = metrics(accumulatorId).replaceAll(",", "").toInt + assert(metricValue == expected) + } + + val totalNumBytesMetric = executedNode.metrics.find(_.name == "bytes of written output").get + val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "").toInt + assert(totalNumBytes > 0) + val writingTimeMetric = executedNode.metrics.find(_.name == "average writing time (ms)").get + val writingTime = metrics(writingTimeMetric.accumulatorId).replaceAll(",", "").toInt + assert(writingTime >= 0) + } + + private def testMetricsNonDynamicPartition( + dataFormat: String, + tableName: String): Unit = { + withTable(tableName) { + Seq((1, 2)).toDF("i", "j") + .write.format(dataFormat).mode("overwrite").saveAsTable(tableName) + + val tableLocation = + new File(spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).location) + + // 2 files, 100 rows, 0 dynamic partition. + verifyWriteDataMetrics(Seq(2, 0, 100)) { + (0 until 100).map(i => (i, i + 1)).toDF("i", "j").repartition(2) + .write.format(dataFormat).mode("overwrite").insertInto(tableName) + } + assert(Utils.recursiveList(tableLocation).count(_.getName.startsWith("part-")) == 2) + } + } + + private def testMetricsDynamicPartition( + provider: String, + dataFormat: String, + tableName: String): Unit = { + withTempPath { dir => + spark.sql( + s""" + |CREATE TABLE $tableName(a int, b int) + |USING $provider + |PARTITIONED BY(a) + |LOCATION '${dir.toURI}' + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) + assert(table.location == makeQualifiedPath(dir.getAbsolutePath)) + + val df = spark.range(start = 0, end = 40, step = 1, numPartitions = 1) + .selectExpr("id a", "id b") + + // 40 files, 80 rows, 40 dynamic partitions. + verifyWriteDataMetrics(Seq(40, 40, 80)) { + df.union(df).repartition(2, $"a") + .write + .format(dataFormat) + .mode("overwrite") + .insertInto(tableName) + } + assert(Utils.recursiveList(dir).count(_.getName.startsWith("part-")) == 40) + } + } + + test("writing data out metrics: parquet") { + testMetricsNonDynamicPartition("parquet", "t1") + } + + test("writing data out metrics with dynamic partition: parquet") { + testMetricsDynamicPartition("parquet", "parquet", "t1") + } + + test("writing data out metrics: hive") { + testMetricsNonDynamicPartition("hive", "t1") + } + + test("writing data out metrics dynamic partition: hive") { + withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { + testMetricsDynamicPartition("hive", "hive", "t1") + } + } +}