diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 9eb9eae699e940bb6b7b8969968b083bbe9c2006..073e8788aff86a67e90c5593bfc1036caa6d4d5f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -236,7 +236,10 @@ object FileFormatWriter extends Logging {
     committer.setupTask(taskAttemptContext)
 
     val writeTask =
-      if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) {
+      if (sparkPartitionId != 0 && !iterator.hasNext) {
+        // In case of empty job, leave first partition to save meta for file format like parquet.
+        new EmptyDirectoryWriteTask
+      } else if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) {
         new SingleDirectoryWriteTask(description, taskAttemptContext, committer)
       } else {
         new DynamicPartitionWriteTask(description, taskAttemptContext, committer)
@@ -301,6 +304,20 @@ object FileFormatWriter extends Logging {
     }
   }
 
+  /** ExecuteWriteTask for empty partitions */
+  private class EmptyDirectoryWriteTask extends ExecuteWriteTask {
+
+    override def execute(iter: Iterator[InternalRow]): ExecutedWriteSummary = {
+      ExecutedWriteSummary(
+        updatedPartitions = Set.empty,
+        numOutputFile = 0,
+        numOutputBytes = 0,
+        numOutputRows = 0)
+    }
+
+    override def releaseResources(): Unit = {}
+  }
+
   /** Writes data to a single directory (used for non-dynamic-partition writes). */
   private class SingleDirectoryWriteTask(
       description: WriteJobDescription,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..a0c1ea63d3827019e8ae659d2c33585fb3b614fb
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FileFormatWriterSuite extends QueryTest with SharedSQLContext {
+
+  test("empty file should be skipped while write to file") {
+    withTempPath { path =>
+      spark.range(100).repartition(10).where("id = 50").write.parquet(path.toString)
+      val partFiles = path.listFiles()
+        .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
+      assert(partFiles.length === 2)
+    }
+  }
+}