diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 4e662a52a7bb796315a6eeee90e95810692ebb48..a3691158ee7587e861ca1ff7c51fc6174bcbef29 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -59,14 +59,8 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
     val rdd = baseRdd(sparkSession, csvOptions, paths)
     val firstLine = findFirstLine(csvOptions, rdd)
     val firstRow = new CsvReader(csvOptions).parseLine(firstLine)
-
-    val header = if (csvOptions.headerFlag) {
-      firstRow.zipWithIndex.map { case (value, index) =>
-        if (value == null || value.isEmpty || value == csvOptions.nullValue) s"_c$index" else value
-      }
-    } else {
-      firstRow.zipWithIndex.map { case (value, index) => s"_c$index" }
-    }
+    val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+    val header = makeSafeHeader(firstRow, csvOptions, caseSensitive)
 
     val parsedRdd = tokenRdd(sparkSession, csvOptions, header, paths)
     val schema = if (csvOptions.inferSchemaFlag) {
@@ -74,13 +68,51 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
     } else {
       // By default fields are assumed to be StringType
       val schemaFields = header.map { fieldName =>
-        StructField(fieldName.toString, StringType, nullable = true)
+        StructField(fieldName, StringType, nullable = true)
       }
       StructType(schemaFields)
     }
     Some(schema)
   }
 
+  /**
+   * Generates a header from the given row which is null-safe and duplicate-safe.
+   */
+  private def makeSafeHeader(
+      row: Array[String],
+      options: CSVOptions,
+      caseSensitive: Boolean): Array[String] = {
+    if (options.headerFlag) {
+      val duplicates = {
+        val headerNames = row.filter(_ != null)
+          .map(name => if (caseSensitive) name else name.toLowerCase)
+        headerNames.diff(headerNames.distinct).distinct
+      }
+
+      row.zipWithIndex.map { case (value, index) =>
+        if (value == null || value.isEmpty || value == options.nullValue) {
+          // When there are empty strings or the values set in `nullValue`, put the
+          // index as the suffix.
+          s"_c$index"
+        } else if (!caseSensitive && duplicates.contains(value.toLowerCase)) {
+          // When there are case-insensitive duplicates, put the index as the suffix.
+          s"$value$index"
+        } else if (duplicates.contains(value)) {
+          // When there are duplicates, put the index as the suffix.
+          s"$value$index"
+        } else {
+          value
+        }
+      }
+    } else {
+      row.zipWithIndex.map { case (_, index) =>
+        // Uses default column names, "_c#" where # is its position of fields
+        // when header option is disabled.
+        s"_c$index"
+      }
+    }
+  }
+
   override def prepareWrite(
       sparkSession: SparkSession,
       job: Job,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 29aac9def6924447ee9cb319d1c6d90bfbd73d42..f7c22c6c93f7a3fc84b7532ecf8d32590a2af62a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.io.compress.GzipCodec
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.{DataFrame, QueryTest, Row, UDT}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 import org.apache.spark.sql.types._
 
@@ -856,4 +857,36 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       checkAnswer(stringTimestampsWithFormat, expectedStringTimestampsWithFormat)
     }
   }
+
+  test("load duplicated field names consistently with null or empty strings - case sensitive") {
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+      withTempPath { path =>
+        Seq("a,a,c,A,b,B").toDF().write.text(path.getAbsolutePath)
+        val actualSchema = spark.read
+          .format("csv")
+          .option("header", true)
+          .load(path.getAbsolutePath)
+          .schema
+        val fields = Seq("a0", "a1", "c", "A", "b", "B").map(StructField(_, StringType, true))
+        val expectedSchema = StructType(fields)
+        assert(actualSchema == expectedSchema)
+      }
+    }
+  }
+
+  test("load duplicated field names consistently with null or empty strings - case insensitive") {
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+      withTempPath { path =>
+        Seq("a,A,c,A,b,B").toDF().write.text(path.getAbsolutePath)
+        val actualSchema = spark.read
+          .format("csv")
+          .option("header", true)
+          .load(path.getAbsolutePath)
+          .schema
+        val fields = Seq("a0", "A1", "c", "A3", "b4", "B5").map(StructField(_, StringType, true))
+        val expectedSchema = StructType(fields)
+        assert(actualSchema == expectedSchema)
+      }
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
index dae92f626c2252ea84ec1feb6fc4bde9d1f42de1..51832a13cfe0b8e2156997d86190d53e2cf8a199 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
@@ -18,8 +18,6 @@
 package org.apache.spark.sql.execution.datasources.csv
 
 import java.math.BigDecimal
-import java.sql.{Date, Timestamp}
-import java.text.SimpleDateFormat
 import java.util.Locale
 
 import org.apache.spark.SparkFunSuite