diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 49af1bcee5ef88f0bb9f2b73b555b9c6be268d50..9d05ac7cb39be3f8097649414420fe60497648c3 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -209,13 +209,13 @@ class DataFrameReader(OptionUtils):
         :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                      set, it uses the default value, ``PERMISSIVE``.
 
-                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
-                 record, and puts the malformed string into a field configured by \
-                 ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
-                 a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
-                 schema. If a schema does not have the field, it drops corrupt records during \
-                 parsing. When inferring a schema, it implicitly adds a \
-                 ``columnNameOfCorruptRecord`` field in an output schema.
+                * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
+                  into a field configured by ``columnNameOfCorruptRecord``, and sets other \
+                  fields to ``null``. To keep corrupt records, an user can set a string type \
+                  field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
+                  schema does not have the field, it drops corrupt records during parsing. \
+                  When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \
+                  field in an output schema.
                 *  ``DROPMALFORMED`` : ignores the whole corrupted records.
                 *  ``FAILFAST`` : throws an exception when it meets corrupted records.
 
@@ -393,13 +393,15 @@ class DataFrameReader(OptionUtils):
         :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                      set, it uses the default value, ``PERMISSIVE``.
 
-                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
-                  record, and puts the malformed string into a field configured by \
-                  ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
-                  a string type field named ``columnNameOfCorruptRecord`` in an \
-                  user-defined schema. If a schema does not have the field, it drops corrupt \
-                  records during parsing. When a length of parsed CSV tokens is shorter than \
-                  an expected length of a schema, it sets `null` for extra fields.
+                * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
+                  into a field configured by ``columnNameOfCorruptRecord``, and sets other \
+                  fields to ``null``. To keep corrupt records, an user can set a string type \
+                  field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
+                  schema does not have the field, it drops corrupt records during parsing. \
+                  A record with less/more tokens than schema is not a corrupted record to CSV. \
+                  When it meets a record having fewer tokens than the length of the schema, \
+                  sets ``null`` to extra fields. When the record has more tokens than the \
+                  length of the schema, it drops extra tokens.
                 * ``DROPMALFORMED`` : ignores the whole corrupted records.
                 * ``FAILFAST`` : throws an exception when it meets corrupted records.
 
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index e2a97acb5e2a7aac5bb28218227133a01d69f84c..cc622decfd682832547048d14d1582b96b70f939 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -442,13 +442,13 @@ class DataStreamReader(OptionUtils):
         :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                      set, it uses the default value, ``PERMISSIVE``.
 
-                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
-                 record, and puts the malformed string into a field configured by \
-                 ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
-                 a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
-                 schema. If a schema does not have the field, it drops corrupt records during \
-                 parsing. When inferring a schema, it implicitly adds a \
-                 ``columnNameOfCorruptRecord`` field in an output schema.
+                * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
+                  into a field configured by ``columnNameOfCorruptRecord``, and sets other \
+                  fields to ``null``. To keep corrupt records, an user can set a string type \
+                  field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
+                  schema does not have the field, it drops corrupt records during parsing. \
+                  When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \
+                  field in an output schema.
                 *  ``DROPMALFORMED`` : ignores the whole corrupted records.
                 *  ``FAILFAST`` : throws an exception when it meets corrupted records.
 
@@ -621,13 +621,15 @@ class DataStreamReader(OptionUtils):
         :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                      set, it uses the default value, ``PERMISSIVE``.
 
-                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
-                  record, and puts the malformed string into a field configured by \
-                  ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
-                  a string type field named ``columnNameOfCorruptRecord`` in an \
-                  user-defined schema. If a schema does not have the field, it drops corrupt \
-                  records during parsing. When a length of parsed CSV tokens is shorter than \
-                  an expected length of a schema, it sets `null` for extra fields.
+                * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
+                  into a field configured by ``columnNameOfCorruptRecord``, and sets other \
+                  fields to ``null``. To keep corrupt records, an user can set a string type \
+                  field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
+                  schema does not have the field, it drops corrupt records during parsing. \
+                  A record with less/more tokens than schema is not a corrupted record to CSV. \
+                  When it meets a record having fewer tokens than the length of the schema, \
+                  sets ``null`` to extra fields. When the record has more tokens than the \
+                  length of the schema, it drops extra tokens.
                 * ``DROPMALFORMED`` : ignores the whole corrupted records.
                 * ``FAILFAST`` : throws an exception when it meets corrupted records.
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index bd144c9575c722ec6122f9eda1f12766b1a4e9c2..7f6956994f31fe1b69e360e4d975d15ba3c92b32 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -357,6 +357,9 @@ class JacksonParser(
       }
     } catch {
       case e @ (_: RuntimeException | _: JsonProcessingException) =>
+        // JSON parser currently doesn't support partial results for corrupted records.
+        // For such records, all fields other than the field configured by
+        // `columnNameOfCorruptRecord` are set to `null`.
         throw BadRecordException(() => recordLiteral(record), () => None, e)
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 4274f120a375ad9c4135ccd7c724675b379283fa..0139913aaa4e244c6a5d014dd30e0b54edc90c15 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -345,12 +345,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
    * during parsing.
    *   <ul>
-   *     <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
-   *     the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
-   *     corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
-   *     in an user-defined schema. If a schema does not have the field, it drops corrupt records
-   *     during parsing. When inferring a schema, it implicitly adds a `columnNameOfCorruptRecord`
-   *     field in an output schema.</li>
+   *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
+   *     field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To
+   *     keep corrupt records, an user can set a string type field named
+   *     `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have the
+   *     field, it drops corrupt records during parsing. When inferring a schema, it implicitly
+   *     adds a `columnNameOfCorruptRecord` field in an output schema.</li>
    *     <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
    *     <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
    *   </ul>
@@ -550,12 +550,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
    *    during parsing. It supports the following case-insensitive modes.
    *   <ul>
-   *     <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
-   *     the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
+   *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
+   *     field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep
    *     corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
    *     in an user-defined schema. If a schema does not have the field, it drops corrupt records
-   *     during parsing. When a length of parsed CSV tokens is shorter than an expected length
-   *     of a schema, it sets `null` for extra fields.</li>
+   *     during parsing. A record with less/more tokens than schema is not a corrupted record to
+   *     CSV. When it meets a record having fewer tokens than the length of the schema, sets
+   *     `null` to extra fields. When the record has more tokens than the length of the schema,
+   *     it drops extra tokens.</li>
    *     <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
    *     <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
    *   </ul>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 7d6d7e7eef92655eb2e36d5c8624a9108092f228..3d6cc30f2ba833dc30b7348a7844efa95a230241 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -203,6 +203,8 @@ class UnivocityParser(
           case _: BadRecordException => None
         }
       }
+      // For records with less or more tokens than the schema, tries to return partial results
+      // if possible.
       throw BadRecordException(
         () => getCurrentInput,
         () => getPartialResult(),
@@ -218,6 +220,9 @@ class UnivocityParser(
         row
       } catch {
         case NonFatal(e) =>
+          // For corrupted records with the number of tokens same as the schema,
+          // CSV reader doesn't support partial results. All fields other than the field
+          // configured by `columnNameOfCorruptRecord` are set to `null`.
           throw BadRecordException(() => getCurrentInput, () => None, e)
       }
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index f23851655350a521e2dc7f65c872c45a44d13ef9..61e22fac854f9c66c1e1dcb2ab5b085e656977e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -236,12 +236,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
    * during parsing.
    *   <ul>
-   *     <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
-   *     the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
-   *     corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
-   *     in an user-defined schema. If a schema does not have the field, it drops corrupt records
-   *     during parsing. When inferring a schema, it implicitly adds a `columnNameOfCorruptRecord`
-   *     field in an output schema.</li>
+   *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
+   *     field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To
+   *     keep corrupt records, an user can set a string type field named
+   *     `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have the
+   *     field, it drops corrupt records during parsing. When inferring a schema, it implicitly
+   *     adds a `columnNameOfCorruptRecord` field in an output schema.</li>
    *     <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
    *     <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
    *   </ul>
@@ -316,12 +316,14 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
    *    during parsing. It supports the following case-insensitive modes.
    *   <ul>
-   *     <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
-   *     the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
+   *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
+   *     field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep
    *     corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
    *     in an user-defined schema. If a schema does not have the field, it drops corrupt records
-   *     during parsing. When a length of parsed CSV tokens is shorter than an expected length
-   *     of a schema, it sets `null` for extra fields.</li>
+   *     during parsing. A record with less/more tokens than schema is not a corrupted record to
+   *     CSV. When it meets a record having fewer tokens than the length of the schema, sets
+   *     `null` to extra fields. When the record has more tokens than the length of the schema,
+   *     it drops extra tokens.</li>
    *     <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
    *     <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
    *   </ul>