diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 359a3e2aa8ad2c97c5329a95b6fe250c4acd7ea4..5ce1bf7432159df1cbf025be2f4b550d3c036e20 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, PERMISSIVE_MODE}
 import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -52,6 +53,11 @@ class JacksonParser(
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
+  private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length))
+
+  @transient
+  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+
   /**
    * This function deals with the cases it fails to parse. This function will be called
    * when exceptions are caught during converting. This functions also deals with `mode` option.
@@ -62,8 +68,39 @@ class JacksonParser(
       throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
     }
     if (options.dropMalformed) {
-      logWarning(s"Dropping malformed line: $record")
+      if (!isWarningPrintedForMalformedRecord) {
+        logWarning(
+          s"""Found at least one malformed records (sample: $record). The JSON reader will drop
+             |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which
+             |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE
+             |mode and use the default inferred schema.
+             |
+             |Code example to print all malformed records (scala):
+             |===================================================
+             |// The corrupted record exists in column ${columnNameOfCorruptRecord}
+             |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+             |
+           """.stripMargin)
+        isWarningPrintedForMalformedRecord = true
+      }
       Nil
+    } else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
+      if (!isWarningPrintedForMalformedRecord) {
+        logWarning(
+          s"""Found at least one malformed records (sample: $record). The JSON reader will replace
+             |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode.
+             |To find out which corrupted records have been replaced with null, please use the
+             |default inferred schema instead of providing a custom schema.
+             |
+             |Code example to print all malformed records (scala):
+             |===================================================
+             |// The corrupted record exists in column ${columnNameOfCorruptRecord}.
+             |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+             |
+           """.stripMargin)
+        isWarningPrintedForMalformedRecord = true
+      }
+      emptyRow
     } else {
       val row = new GenericMutableRow(schema.length)
       for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 63a9061210ca6a57f049a32e3f39fd3a9746c1e2..3d533c14e18e7d179459654fba5454fdeafa7e83 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1081,7 +1081,34 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     assert(jsonDFTwo.schema === schemaTwo)
   }
 
-  test("Corrupt records: PERMISSIVE mode") {
+  test("Corrupt records: PERMISSIVE mode, without designated column for malformed records") {
+    withTempView("jsonTable") {
+      val schema = StructType(
+        StructField("a", StringType, true) ::
+          StructField("b", StringType, true) ::
+          StructField("c", StringType, true) :: Nil)
+
+      val jsonDF = spark.read.schema(schema).json(corruptRecords)
+      jsonDF.createOrReplaceTempView("jsonTable")
+
+      checkAnswer(
+        sql(
+          """
+            |SELECT a, b, c
+            |FROM jsonTable
+          """.stripMargin),
+        Seq(
+          // Corrupted records are replaced with null
+          Row(null, null, null),
+          Row(null, null, null),
+          Row(null, null, null),
+          Row("str_a_4", "str_b_4", "str_c_4"),
+          Row(null, null, null))
+      )
+    }
+  }
+
+  test("Corrupt records: PERMISSIVE mode, with designated column for malformed records") {
     // Test if we can query corrupt records.
     withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
       withTempView("jsonTable") {