diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 438662bb157f0e9dfc415c0966b69d4ce4bd03ae..bae9e69df8e2b08e6dfd7f3019c2b715109d452b 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -162,6 +162,14 @@ class DataFrameReader(object):
                 (e.g. 00012)
             * ``allowBackslashEscapingAnyCharacter`` (default ``false``): allows accepting quoting \
                 of all character using backslash quoting mechanism
+            *  ``mode`` (default ``PERMISSIVE``): allows a mode for dealing with corrupt records \
+                during parsing.
+                *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
+                  record and puts the malformed string into a new field configured by \
+                 ``spark.sql.columnNameOfCorruptRecord``. When a schema is set by user, it sets \
+                 ``null`` for extra fields.
+                *  ``DROPMALFORMED`` : ignores the whole corrupted records.
+                *  ``FAILFAST`` : throws an exception when it meets corrupted records.
 
         >>> df1 = sqlContext.read.json('python/test_support/sql/people.json')
         >>> df1.dtypes
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 1b5a4999a8ef15808418ef706addbd2e380f0cb5..0dc0d44d6cdcd108fea97f4bb78b517ddb6ca05d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -289,6 +289,15 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
    * </li>
    * <li>`allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers
    * (e.g. 00012)</li>
+   * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
+   * during parsing.<li>
+   * <ul>
+   *  <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the
+   *  malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When
+   *  a schema is set by user, it sets `null` for extra fields.</li>
+   *  <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
+   *  <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
+   * </ul>
    *
    * @since 1.4.0
    */
@@ -313,6 +322,15 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
    * (e.g. 00012)</li>
    * <li>`allowBackslashEscapingAnyCharacter` (default `false`): allows accepting quoting of all
    * character using backslash quoting mechanism</li>
+   * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
+   * during parsing.<li>
+   * <ul>
+   *  <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the
+   *  malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When
+   *  a schema is set by user, it sets `null` for extra fields.</li>
+   *  <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
+   *  <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
+   * </ul>
    *
    * @since 1.6.0
    */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
new file mode 100644
index 0000000000000000000000000000000000000000..468228053c9649a66265be9e73e9e4706e670dba
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+private[datasources] object ParseModes {
+  val PERMISSIVE_MODE = "PERMISSIVE"
+  val DROP_MALFORMED_MODE = "DROPMALFORMED"
+  val FAIL_FAST_MODE = "FAILFAST"
+
+  val DEFAULT = PERMISSIVE_MODE
+
+  def isValidMode(mode: String): Boolean = {
+    mode.toUpperCase match {
+      case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true
+      case _ => false
+    }
+  }
+
+  def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE
+  def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE
+  def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode))  {
+    mode.toUpperCase == PERMISSIVE_MODE
+  } else {
+    true // We default to permissive is the mode string is not valid
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index e009a37f2de72ed98239f181d6617d94adca5f26..95de02cf5c182ea7fbb5d46721bd09fc73fb1a61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.csv
 import java.nio.charset.StandardCharsets
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.CompressionCodecs
+import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
 
 private[sql] class CSVOptions(
     @transient private val parameters: Map[String, String])
@@ -62,7 +62,7 @@ private[sql] class CSVOptions(
 
   val delimiter = CSVTypeCast.toChar(
     parameters.getOrElse("sep", parameters.getOrElse("delimiter", ",")))
-  val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
+  private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
   val charset = parameters.getOrElse("encoding",
     parameters.getOrElse("charset", StandardCharsets.UTF_8.name()))
 
@@ -101,26 +101,3 @@ private[sql] class CSVOptions(
 
   val rowSeparator = "\n"
 }
-
-private[csv] object ParseModes {
-  val PERMISSIVE_MODE = "PERMISSIVE"
-  val DROP_MALFORMED_MODE = "DROPMALFORMED"
-  val FAIL_FAST_MODE = "FAILFAST"
-
-  val DEFAULT = PERMISSIVE_MODE
-
-  def isValidMode(mode: String): Boolean = {
-    mode.toUpperCase match {
-      case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true
-      case _ => false
-    }
-  }
-
-  def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE
-  def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE
-  def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode))  {
-    mode.toUpperCase == PERMISSIVE_MODE
-  } else {
-    true // We default to permissive is the mode string is not valid
-  }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index 0937a213c984f836bb14b76adf9e0f150ed5f376..945ed2c2113d7ca4b345da2fba2cc4c8aff824d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -40,6 +40,7 @@ private[sql] object InferSchema {
       configOptions: JSONOptions): StructType = {
     require(configOptions.samplingRatio > 0,
       s"samplingRatio (${configOptions.samplingRatio}) should be greater than 0")
+    val shouldHandleCorruptRecord = configOptions.permissive
     val schemaData = if (configOptions.samplingRatio > 0.99) {
       json
     } else {
@@ -50,21 +51,23 @@ private[sql] object InferSchema {
     val rootType = schemaData.mapPartitions { iter =>
       val factory = new JsonFactory()
       configOptions.setJacksonOptions(factory)
-      iter.map { row =>
+      iter.flatMap { row =>
         try {
           Utils.tryWithResource(factory.createParser(row)) { parser =>
             parser.nextToken()
-            inferField(parser, configOptions)
+            Some(inferField(parser, configOptions))
           }
         } catch {
+          case _: JsonParseException if shouldHandleCorruptRecord =>
+            Some(StructType(Seq(StructField(columnNameOfCorruptRecords, StringType))))
           case _: JsonParseException =>
-            StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))
+            None
         }
       }
     }.treeAggregate[DataType](
       StructType(Seq()))(
-      compatibleRootType(columnNameOfCorruptRecords),
-      compatibleRootType(columnNameOfCorruptRecords))
+      compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord),
+      compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord))
 
     canonicalizeType(rootType) match {
       case Some(st: StructType) => st
@@ -194,18 +197,21 @@ private[sql] object InferSchema {
    * Remove top-level ArrayType wrappers and merge the remaining schemas
    */
   private def compatibleRootType(
-      columnNameOfCorruptRecords: String): (DataType, DataType) => DataType = {
+      columnNameOfCorruptRecords: String,
+      shouldHandleCorruptRecord: Boolean): (DataType, DataType) => DataType = {
     // Since we support array of json objects at the top level,
     // we need to check the element type and find the root level data type.
-    case (ArrayType(ty1, _), ty2) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2)
-    case (ty1, ArrayType(ty2, _)) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2)
+    case (ArrayType(ty1, _), ty2) =>
+      compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord)(ty1, ty2)
+    case (ty1, ArrayType(ty2, _)) =>
+      compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord)(ty1, ty2)
     // If we see any other data type at the root level, we get records that cannot be
     // parsed. So, we use the struct as the data type and add the corrupt field to the schema.
     case (struct: StructType, NullType) => struct
     case (NullType, struct: StructType) => struct
-    case (struct: StructType, o) if !o.isInstanceOf[StructType] =>
+    case (struct: StructType, o) if !o.isInstanceOf[StructType] && shouldHandleCorruptRecord =>
       withCorruptField(struct, columnNameOfCorruptRecords)
-    case (o, struct: StructType) if !o.isInstanceOf[StructType] =>
+    case (o, struct: StructType) if !o.isInstanceOf[StructType] && shouldHandleCorruptRecord =>
       withCorruptField(struct, columnNameOfCorruptRecords)
     // If we get anything else, we call compatibleType.
     // Usually, when we reach here, ty1 and ty2 are two StructTypes.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
index e59dbd6b3d438175e53ca48b546f4dc4e5e73e09..93c3d47c1dcf4b6f133c2f35b21be57895116034 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.datasources.json
 
 import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
 
-import org.apache.spark.sql.execution.datasources.CompressionCodecs
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
 
 /**
  * Options for the JSON data source.
@@ -28,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.CompressionCodecs
  */
 private[sql] class JSONOptions(
     @transient private val parameters: Map[String, String])
-  extends Serializable  {
+  extends Logging with Serializable  {
 
   val samplingRatio =
     parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
@@ -49,6 +50,16 @@ private[sql] class JSONOptions(
   val allowBackslashEscapingAnyCharacter =
     parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
   val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
+  private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
+
+  // Parse mode flags
+  if (!ParseModes.isValidMode(parseMode)) {
+    logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
+  }
+
+  val failFast = ParseModes.isFailFastMode(parseMode)
+  val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
+  val permissive = ParseModes.isPermissiveMode(parseMode)
 
   /** Sets config options on a Jackson [[JsonFactory]]. */
   def setJacksonOptions(factory: JsonFactory): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 3252b6c77f8886e5333e00a0db995542fd2a93cc..00c14adf0704b52e9a274e07c674eb7979aafbb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import com.fasterxml.jackson.core._
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -34,7 +35,7 @@ import org.apache.spark.util.Utils
 
 private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
 
-object JacksonParser {
+object JacksonParser extends Logging {
 
   def parse(
       input: RDD[String],
@@ -257,13 +258,20 @@ object JacksonParser {
 
     def failedRecord(record: String): Seq[InternalRow] = {
       // create a row even if no corrupt record column is present
-      val row = new GenericMutableRow(schema.length)
-      for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) {
-        require(schema(corruptIndex).dataType == StringType)
-        row.update(corruptIndex, UTF8String.fromString(record))
+      if (configOptions.failFast) {
+        throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
+      }
+      if (configOptions.dropMalformed) {
+        logWarning(s"Dropping malformed line: $record")
+        Nil
+      } else {
+        val row = new GenericMutableRow(schema.length)
+        for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) {
+          require(schema(corruptIndex).dataType == StringType)
+          row.update(corruptIndex, UTF8String.fromString(record))
+        }
+        Seq(row)
       }
-
-      Seq(row)
     }
 
     val factory = new JsonFactory()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 6d942c4c902892704b516e3d04a1033de888ad89..0a5699b99cf0e18253756d6f74a874a8f70ab97d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.GzipCodec
 
+import org.apache.spark.SparkException
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -963,7 +964,56 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     )
   }
 
-  test("Corrupt records") {
+  test("Corrupt records: FAILFAST mode") {
+    val schema = StructType(
+        StructField("a", StringType, true) :: Nil)
+    // `FAILFAST` mode should throw an exception for corrupt records.
+    val exceptionOne = intercept[SparkException] {
+      sqlContext.read
+        .option("mode", "FAILFAST")
+        .json(corruptRecords)
+        .collect()
+    }
+    assert(exceptionOne.getMessage.contains("Malformed line in FAILFAST mode: {"))
+
+    val exceptionTwo = intercept[SparkException] {
+      sqlContext.read
+        .option("mode", "FAILFAST")
+        .schema(schema)
+        .json(corruptRecords)
+        .collect()
+    }
+    assert(exceptionTwo.getMessage.contains("Malformed line in FAILFAST mode: {"))
+  }
+
+  test("Corrupt records: DROPMALFORMED mode") {
+    val schemaOne = StructType(
+      StructField("a", StringType, true) ::
+        StructField("b", StringType, true) ::
+        StructField("c", StringType, true) :: Nil)
+    val schemaTwo = StructType(
+      StructField("a", StringType, true) :: Nil)
+    // `DROPMALFORMED` mode should skip corrupt records
+    val jsonDFOne = sqlContext.read
+      .option("mode", "DROPMALFORMED")
+      .json(corruptRecords)
+    checkAnswer(
+      jsonDFOne,
+      Row("str_a_4", "str_b_4", "str_c_4") :: Nil
+    )
+    assert(jsonDFOne.schema === schemaOne)
+
+    val jsonDFTwo = sqlContext.read
+      .option("mode", "DROPMALFORMED")
+      .schema(schemaTwo)
+      .json(corruptRecords)
+    checkAnswer(
+      jsonDFTwo,
+      Row("str_a_4") :: Nil)
+    assert(jsonDFTwo.schema === schemaTwo)
+  }
+
+  test("Corrupt records: PERMISSIVE mode") {
     // Test if we can query corrupt records.
     withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
       withTempTable("jsonTable") {