diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 8577803743c8e06614566327017b109c4efe2f93..fff86686b550c0393a568b324e237a4f3d00d2ea 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -40,7 +40,8 @@ import org.apache.spark.sql.types._
 import org.apache.spark.util.SerializableConfiguration
 
 private[libsvm] class LibSVMOutputWriter(
-    path: String,
+    stagingDir: String,
+    fileNamePrefix: String,
     dataSchema: StructType,
     context: TaskAttemptContext)
   extends OutputWriter {
@@ -50,11 +51,7 @@ private[libsvm] class LibSVMOutputWriter(
   private val recordWriter: RecordWriter[NullWritable, Text] = {
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
-        val configuration = context.getConfiguration
-        val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
-        val taskAttemptId = context.getTaskAttemptID
-        val split = taskAttemptId.getTaskID.getId
-        new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
+        new Path(stagingDir, fileNamePrefix + extension)
       }
     }.getRecordWriter(context)
   }
@@ -132,12 +129,11 @@ private[libsvm] class LibSVMFileFormat extends TextBasedFileFormat with DataSour
       dataSchema: StructType): OutputWriterFactory = {
     new OutputWriterFactory {
       override def newInstance(
-          path: String,
-          bucketId: Option[Int],
+          stagingDir: String,
+          fileNamePrefix: String,
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
-        if (bucketId.isDefined) { sys.error("LibSVM doesn't support bucketing") }
-        new LibSVMOutputWriter(path, dataSchema, context)
+        new LibSVMOutputWriter(stagingDir, fileNamePrefix, dataSchema, context)
       }
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala
index d2eec7b1413f8ca08fea09842f5edcbb193128b0..f4cefdab077e95bb91c937e87093159922d16d15 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala
@@ -34,18 +34,23 @@ abstract class OutputWriterFactory extends Serializable {
    * When writing to a [[HadoopFsRelation]], this method gets called by each task on executor side
    * to instantiate new [[OutputWriter]]s.
    *
-   * @param path Path of the file to which this [[OutputWriter]] is supposed to write.  Note that
-   *        this may not point to the final output file.  For example, `FileOutputFormat` writes to
-   *        temporary directories and then merge written files back to the final destination.  In
-   *        this case, `path` points to a temporary output file under the temporary directory.
+   * @param stagingDir Base path (directory) of the file to which this [[OutputWriter]] is supposed
+   *                   to write.  Note that this may not point to the final output file.  For
+   *                   example, `FileOutputFormat` writes to temporary directories and then merge
+   *                   written files back to the final destination.  In this case, `path` points to
+   *                   a temporary output file under the temporary directory.
+   * @param fileNamePrefix Prefix of the file name. The returned OutputWriter must make sure this
+   *                       prefix is used in the actual file name. For example, if the prefix is
+   *                       "part-1-2-3", then the file name must start with "part_1_2_3" but can
+   *                       end in arbitrary extension.
    * @param dataSchema Schema of the rows to be written. Partition columns are not included in the
    *        schema if the relation being written is partitioned.
    * @param context The Hadoop MapReduce task context.
    * @since 1.4.0
    */
   def newInstance(
-      path: String,
-      bucketId: Option[Int], // TODO: This doesn't belong here...
+      stagingDir: String,
+      fileNamePrefix: String,
       dataSchema: StructType,
       context: TaskAttemptContext): OutputWriter
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
index 54d0f3bd6291ad8f04511504e4ac1d5be07e2c70..bd56e511d0ccf9440e9a17f2829a0254dbeb051c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
@@ -46,6 +46,7 @@ object WriteOutput extends Logging {
 
   /** A shared job description for all the write tasks. */
   private class WriteJobDescription(
+      val uuid: String,  // prevent collision between different (appending) write jobs
       val serializableHadoopConf: SerializableConfiguration,
       val outputWriterFactory: OutputWriterFactory,
       val allColumns: Seq[Attribute],
@@ -102,6 +103,7 @@ object WriteOutput extends Logging {
       fileFormat.prepareWrite(sparkSession, job, options, dataColumns.toStructType)
 
     val description = new WriteJobDescription(
+      uuid = UUID.randomUUID().toString,
       serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
       outputWriterFactory = outputWriterFactory,
       allColumns = plan.output,
@@ -213,6 +215,11 @@ object WriteOutput extends Logging {
   private trait ExecuteWriteTask {
     def execute(iterator: Iterator[InternalRow]): Unit
     def releaseResources(): Unit
+
+    final def filePrefix(split: Int, uuid: String, bucketId: Option[Int]): String = {
+      val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
+      f"part-r-$split%05d-$uuid$bucketString"
+    }
   }
 
   /** Writes data to a single directory (used for non-dynamic-partition writes). */
@@ -222,9 +229,11 @@ object WriteOutput extends Logging {
       stagingPath: String) extends ExecuteWriteTask {
 
     private[this] var outputWriter: OutputWriter = {
+      val split = taskAttemptContext.getTaskAttemptID.getTaskID.getId
+
       val outputWriter = description.outputWriterFactory.newInstance(
-        path = stagingPath,
-        bucketId = None,
+        stagingDir = stagingPath,
+        fileNamePrefix = filePrefix(split, description.uuid, None),
         dataSchema = description.nonPartitionColumns.toStructType,
         context = taskAttemptContext)
       outputWriter.initConverter(dataSchema = description.nonPartitionColumns.toStructType)
@@ -287,29 +296,31 @@ object WriteOutput extends Logging {
       }
     }
 
-    private def getBucketIdFromKey(key: InternalRow): Option[Int] =
-      description.bucketSpec.map { _ => key.getInt(description.partitionColumns.length) }
-
     /**
      * Open and returns a new OutputWriter given a partition key and optional bucket id.
      * If bucket id is specified, we will append it to the end of the file name, but before the
      * file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet
      */
-    private def newOutputWriter(
-        key: InternalRow,
-        getPartitionString: UnsafeProjection): OutputWriter = {
+    private def newOutputWriter(key: InternalRow, partString: UnsafeProjection): OutputWriter = {
       val path =
         if (description.partitionColumns.nonEmpty) {
-          val partitionPath = getPartitionString(key).getString(0)
+          val partitionPath = partString(key).getString(0)
           new Path(stagingPath, partitionPath).toString
         } else {
           stagingPath
         }
-      val bucketId = getBucketIdFromKey(key)
 
+      // If the bucket spec is defined, the bucket column is right after the partition columns
+      val bucketId = if (description.bucketSpec.isDefined) {
+        Some(key.getInt(description.partitionColumns.length))
+      } else {
+        None
+      }
+
+      val split = taskAttemptContext.getTaskAttemptID.getTaskID.getId
       val newWriter = description.outputWriterFactory.newInstance(
-        path = path,
-        bucketId = bucketId,
+        stagingDir = path,
+        fileNamePrefix = filePrefix(split, description.uuid, bucketId),
         dataSchema = description.nonPartitionColumns.toStructType,
         context = taskAttemptContext)
       newWriter.initConverter(description.nonPartitionColumns.toStructType)
@@ -319,7 +330,7 @@ object WriteOutput extends Logging {
     override def execute(iter: Iterator[InternalRow]): Unit = {
       // We should first sort by partition columns, then bucket id, and finally sorting columns.
       val sortingExpressions: Seq[Expression] =
-      description.partitionColumns ++ bucketIdExpression ++ sortColumns
+        description.partitionColumns ++ bucketIdExpression ++ sortColumns
       val getSortingKey = UnsafeProjection.create(sortingExpressions, description.allColumns)
 
       val sortingKeySchema = StructType(sortingExpressions.map {
@@ -333,8 +344,8 @@ object WriteOutput extends Logging {
         description.nonPartitionColumns, description.allColumns)
 
       // Returns the partition path given a partition key.
-      val getPartitionString =
-      UnsafeProjection.create(Seq(Concat(partitionStringExpression)), description.partitionColumns)
+      val getPartitionString = UnsafeProjection.create(
+        Seq(Concat(partitionStringExpression)), description.partitionColumns)
 
       // Sorts the data before write, so that we only need one writer at the same time.
       val sorter = new UnsafeKVExternalSorter(
@@ -405,17 +416,6 @@ object WriteOutput extends Logging {
     job.getConfiguration.setBoolean("mapred.task.is.map", true)
     job.getConfiguration.setInt("mapred.task.partition", 0)
 
-    // This UUID is sent to executor side together with the serialized `Configuration` object within
-    // the `Job` instance.  `OutputWriters` on the executor side should use this UUID to generate
-    // unique task output files.
-    // This UUID is used to avoid output file name collision between different appending write jobs.
-    // These jobs may belong to different SparkContext instances. Concrete data source
-    // implementations may use this UUID to generate unique file names (e.g.,
-    // `part-r-<task-id>-<job-uuid>.parquet`). The reason why this ID is used to identify a job
-    // rather than a single task output file is that, speculative tasks must generate the same
-    // output file name as the original task.
-    job.getConfiguration.set(WriterContainer.DATASOURCE_WRITEJOBUUID, UUID.randomUUID().toString)
-
     val taskAttemptContext = new TaskAttemptContextImpl(job.getConfiguration, taskAttemptId)
     val outputCommitter = newOutputCommitter(
       job.getOutputFormatClass, taskAttemptContext, path, isAppend)
@@ -474,7 +474,3 @@ object WriteOutput extends Logging {
     }
   }
 }
-
-object WriterContainer {
-  val DATASOURCE_WRITEJOBUUID = "spark.sql.sources.writeJobUUID"
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 55cb26d6513af3ba386813bef32a64c374b4e498..eefacbf05ba0def2be74903f587a3297b7b95c7f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile, WriterContainer}
+import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile}
 import org.apache.spark.sql.types._
 
 object CSVRelation extends Logging {
@@ -170,17 +170,17 @@ object CSVRelation extends Logging {
 
 private[csv] class CSVOutputWriterFactory(params: CSVOptions) extends OutputWriterFactory {
   override def newInstance(
-      path: String,
-      bucketId: Option[Int],
+      stagingDir: String,
+      fileNamePrefix: String,
       dataSchema: StructType,
       context: TaskAttemptContext): OutputWriter = {
-    if (bucketId.isDefined) sys.error("csv doesn't support bucketing")
-    new CsvOutputWriter(path, dataSchema, context, params)
+    new CsvOutputWriter(stagingDir, fileNamePrefix, dataSchema, context, params)
   }
 }
 
 private[csv] class CsvOutputWriter(
-    path: String,
+    stagingDir: String,
+    fileNamePrefix: String,
     dataSchema: StructType,
     context: TaskAttemptContext,
     params: CSVOptions) extends OutputWriter with Logging {
@@ -199,11 +199,7 @@ private[csv] class CsvOutputWriter(
   private val recordWriter: RecordWriter[NullWritable, Text] = {
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
-        val configuration = context.getConfiguration
-        val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
-        val taskAttemptId = context.getTaskAttemptID
-        val split = taskAttemptId.getTaskID.getId
-        new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.csv$extension")
+        new Path(stagingDir, s"$fileNamePrefix.csv$extension")
       }
     }.getRecordWriter(context)
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 9fe38ccc9fdc6e43a8427f4f4f1d084bc296e6fd..cdbb2f729261324b57a5a24521ebe9e9a5ac8bc0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -82,11 +82,11 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
 
     new OutputWriterFactory {
       override def newInstance(
-          path: String,
-          bucketId: Option[Int],
+          stagingDir: String,
+          fileNamePrefix: String,
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
-        new JsonOutputWriter(path, parsedOptions, bucketId, dataSchema, context)
+        new JsonOutputWriter(stagingDir, parsedOptions, fileNamePrefix, dataSchema, context)
       }
     }
   }
@@ -153,9 +153,9 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
 }
 
 private[json] class JsonOutputWriter(
-    path: String,
+    stagingDir: String,
     options: JSONOptions,
-    bucketId: Option[Int],
+    fileNamePrefix: String,
     dataSchema: StructType,
     context: TaskAttemptContext)
   extends OutputWriter with Logging {
@@ -168,12 +168,7 @@ private[json] class JsonOutputWriter(
   private val recordWriter: RecordWriter[NullWritable, Text] = {
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
-        val configuration = context.getConfiguration
-        val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
-        val taskAttemptId = context.getTaskAttemptID
-        val split = taskAttemptId.getTaskID.getId
-        val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
-        new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString.json$extension")
+        new Path(stagingDir, s"$fileNamePrefix.json$extension")
       }
     }.getRecordWriter(context)
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 6faafed1e6290d1b85a2838037b7f6c3e4769929..87b944ba523ca2384a5c82721e4c9b4ce390497e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -27,7 +27,7 @@ import scala.util.{Failure, Try}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.parquet.{Log => ApacheParquetLog}
 import org.apache.parquet.filter2.compat.FilterCompat
@@ -45,7 +45,6 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -134,10 +133,10 @@ class ParquetFileFormat
     new OutputWriterFactory {
       override def newInstance(
           path: String,
-          bucketId: Option[Int],
+          fileNamePrefix: String,
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
-        new ParquetOutputWriter(path, bucketId, context)
+        new ParquetOutputWriter(path, fileNamePrefix, context)
       }
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
index f89ce05d82d9066c23696c484141ddbfe8e15169..39c199784cd6dd7921736abcd99cd1f649e6e646 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
@@ -26,7 +26,7 @@ import org.apache.parquet.hadoop.util.ContextUtil
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.{BucketingUtils, OutputWriter, OutputWriterFactory, WriterContainer}
+import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
@@ -122,13 +122,12 @@ private[parquet] class ParquetOutputWriterFactory(
   }
 
   /** Disable the use of the older API. */
-  def newInstance(
+  override def newInstance(
       path: String,
-      bucketId: Option[Int],
+      fileNamePrefix: String,
       dataSchema: StructType,
       context: TaskAttemptContext): OutputWriter = {
-    throw new UnsupportedOperationException(
-      "this version of newInstance not supported for " +
+    throw new UnsupportedOperationException("this version of newInstance not supported for " +
         "ParquetOutputWriterFactory")
   }
 }
@@ -136,33 +135,16 @@ private[parquet] class ParquetOutputWriterFactory(
 
 // NOTE: This class is instantiated and used on executor side only, no need to be serializable.
 private[parquet] class ParquetOutputWriter(
-    path: String,
-    bucketId: Option[Int],
+    stagingDir: String,
+    fileNamePrefix: String,
     context: TaskAttemptContext)
   extends OutputWriter {
 
   private val recordWriter: RecordWriter[Void, InternalRow] = {
     val outputFormat = {
       new ParquetOutputFormat[InternalRow]() {
-        // Here we override `getDefaultWorkFile` for two reasons:
-        //
-        //  1. To allow appending.  We need to generate unique output file names to avoid
-        //     overwriting existing files (either exist before the write job, or are just written
-        //     by other tasks within the same write job).
-        //
-        //  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` uses
-        //     `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
-        //     partitions in the case of dynamic partitioning.
         override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
-          val configuration = context.getConfiguration
-          val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
-          val taskAttemptId = context.getTaskAttemptID
-          val split = taskAttemptId.getTaskID.getId
-          val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
-          // It has the `.parquet` extension at the end because (de)compression tools
-          // such as gunzip would not be able to decompress this as the compression
-          // is not applied on this whole file but on each "page" in Parquet format.
-          new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
+          new Path(stagingDir, fileNamePrefix + extension)
         }
       }
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index 9f966673110155164a76d126786d8901a4448d30..6cd2351c5749ad94d715fe115b89cd51985bfd6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -73,14 +73,11 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
 
     new OutputWriterFactory {
       override def newInstance(
-          path: String,
-          bucketId: Option[Int],
+          stagingDir: String,
+          fileNamePrefix: String,
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
-        if (bucketId.isDefined) {
-          throw new AnalysisException("Text doesn't support bucketing")
-        }
-        new TextOutputWriter(path, dataSchema, context)
+        new TextOutputWriter(stagingDir, fileNamePrefix, dataSchema, context)
       }
     }
   }
@@ -124,7 +121,11 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
   }
 }
 
-class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemptContext)
+class TextOutputWriter(
+    stagingDir: String,
+    fileNamePrefix: String,
+    dataSchema: StructType,
+    context: TaskAttemptContext)
   extends OutputWriter {
 
   private[this] val buffer = new Text()
@@ -132,11 +133,7 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp
   private val recordWriter: RecordWriter[NullWritable, Text] = {
     new TextOutputFormat[NullWritable, Text]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
-        val configuration = context.getConfiguration
-        val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
-        val taskAttemptId = context.getTaskAttemptID
-        val split = taskAttemptId.getTaskID.getId
-        new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.txt$extension")
+        new Path(stagingDir, s"$fileNamePrefix.txt$extension")
       }
     }.getRecordWriter(context)
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 1af3280e18a89a878fbc4331faff924fcfdfea79..1ceacb458ae6e6f3bb0044ae19c36c179aa1a98f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -83,11 +83,11 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
 
     new OutputWriterFactory {
       override def newInstance(
-          path: String,
-          bucketId: Option[Int],
+          stagingDir: String,
+          fileNamePrefix: String,
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
-        new OrcOutputWriter(path, bucketId, dataSchema, context)
+        new OrcOutputWriter(stagingDir, fileNamePrefix, dataSchema, context)
       }
     }
   }
@@ -210,8 +210,8 @@ private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration)
 }
 
 private[orc] class OrcOutputWriter(
-    path: String,
-    bucketId: Option[Int],
+    stagingDir: String,
+    fileNamePrefix: String,
     dataSchema: StructType,
     context: TaskAttemptContext)
   extends OutputWriter {
@@ -226,10 +226,7 @@ private[orc] class OrcOutputWriter(
 
   private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
     recordWriterInstantiated = true
-    val uniqueWriteJobId = conf.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
-    val taskAttemptId = context.getTaskAttemptID
-    val partition = taskAttemptId.getTaskID.getId
-    val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
+
     val compressionExtension = {
       val name = conf.get(OrcRelation.ORC_COMPRESSION)
       OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "")
@@ -237,12 +234,12 @@ private[orc] class OrcOutputWriter(
     // It has the `.orc` extension at the end because (de)compression tools
     // such as gunzip would not be able to decompress this as the compression
     // is not applied on this whole file but on each "stream" in ORC format.
-    val filename = f"part-r-$partition%05d-$uniqueWriteJobId$bucketString$compressionExtension.orc"
+    val filename = s"$fileNamePrefix$compressionExtension.orc"
 
     new OrcOutputFormat().getRecordWriter(
-      new Path(path, filename).getFileSystem(conf),
+      new Path(stagingDir, filename).getFileSystem(conf),
       conf.asInstanceOf[JobConf],
-      new Path(path, filename).toString,
+      new Path(stagingDir, filename).toString,
       Reporter.NULL
     ).asInstanceOf[RecordWriter[NullWritable, Writable]]
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 997445114ba585a4ce958da70f24c781836d416c..2eafe18b858445907f8fb606c13b400d70f8c259 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -54,11 +54,6 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
     intercept[AnalysisException](df.write.bucketBy(2, "i").sortBy("j").saveAsTable("tt"))
   }
 
-  test("write bucketed data to unsupported data source") {
-    val df = Seq(Tuple1("a"), Tuple1("b")).toDF("i")
-    intercept[SparkException](df.write.bucketBy(3, "i").format("text").saveAsTable("tt"))
-  }
-
   test("write bucketed data using save()") {
     val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
index 5a8a7f0ab5d7bf984905b6593c34eb5ef6de756f..d5044684020e2b29957599d4f91fb0550c585ab9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
@@ -39,11 +39,11 @@ class CommitFailureTestSource extends SimpleTextSource {
       dataSchema: StructType): OutputWriterFactory =
     new OutputWriterFactory {
       override def newInstance(
-          path: String,
-          bucketId: Option[Int],
+          stagingDir: String,
+          fileNamePrefix: String,
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
-        new SimpleTextOutputWriter(path, context) {
+        new SimpleTextOutputWriter(stagingDir, fileNamePrefix, context) {
           var failed = false
           TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
             failed = true
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 906de6bbcbee57bf17af08a031c5c082707b3209..9e13b217ec3058ce1aa2120c8da675adff556b33 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.{NullWritable, Text}
 import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
 
 import org.apache.spark.sql.{sources, Row, SparkSession}
 import org.apache.spark.sql.catalyst.{expressions, InternalRow}
@@ -51,11 +51,11 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister {
     SimpleTextRelation.lastHadoopConf = Option(job.getConfiguration)
     new OutputWriterFactory {
       override def newInstance(
-          path: String,
-          bucketId: Option[Int],
+          stagingDir: String,
+          fileNamePrefix: String,
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
-        new SimpleTextOutputWriter(path, context)
+        new SimpleTextOutputWriter(stagingDir, fileNamePrefix, context)
       }
     }
   }
@@ -120,9 +120,11 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister {
   }
 }
 
-class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter {
+class SimpleTextOutputWriter(
+    stagingDir: String, fileNamePrefix: String, context: TaskAttemptContext)
+  extends OutputWriter {
   private val recordWriter: RecordWriter[NullWritable, Text] =
-    new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context)
+    new AppendingTextOutputFormat(new Path(stagingDir), fileNamePrefix).getRecordWriter(context)
 
   override def write(row: Row): Unit = {
     val serialized = row.toSeq.map { v =>
@@ -136,19 +138,15 @@ class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends
   }
 }
 
-class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullWritable, Text] {
-  val numberFormat = NumberFormat.getInstance()
+class AppendingTextOutputFormat(stagingDir: Path, fileNamePrefix: String)
+  extends TextOutputFormat[NullWritable, Text] {
 
+  val numberFormat = NumberFormat.getInstance()
   numberFormat.setMinimumIntegerDigits(5)
   numberFormat.setGroupingUsed(false)
 
   override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
-    val configuration = context.getConfiguration
-    val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
-    val taskAttemptId = context.getTaskAttemptID
-    val split = taskAttemptId.getTaskID.getId
-    val name = FileOutputFormat.getOutputName(context)
-    new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
+    new Path(stagingDir, fileNamePrefix)
   }
 }