diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 093504c765ee863b791c146c0d1cb1eaf423af0e..118e0e9ace98a6e4467d3f4e1586f18951a3aaa8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.spark.sql
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
index bc8ef4ad7e2365e077a35fe015c599de4236e12e..9e913de48f72c08c3d6f36fd6838d0cc3c6f4da1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
@@ -19,13 +19,14 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.SequenceFile.CompressionType
-import org.apache.hadoop.io.compress.{BZip2Codec, GzipCodec, Lz4Codec, SnappyCodec}
+import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec}
 
 import org.apache.spark.util.Utils
 
 private[datasources] object CompressionCodecs {
   private val shortCompressionCodecNames = Map(
     "bzip2" -> classOf[BZip2Codec].getName,
+    "deflate" -> classOf[DeflateCodec].getName,
     "gzip" -> classOf[GzipCodec].getName,
     "lz4" -> classOf[Lz4Codec].getName,
     "snappy" -> classOf[SnappyCodec].getName)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 184cbb2f296b0442d25ab0b9d4a37a711de2ace2..a1806221b71481060a6dc85cacf3032914b23abc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -286,7 +286,7 @@ private[sql] class ParquetRelation(
       ParquetRelation
         .shortParquetCompressionCodecNames
         .getOrElse(
-          sqlContext.conf.parquetCompressionCodec.toUpperCase,
+          sqlContext.conf.parquetCompressionCodec.toLowerCase(),
           CompressionCodecName.UNCOMPRESSED).name())
 
     new BucketedOutputWriterFactory {
@@ -905,9 +905,9 @@ private[sql] object ParquetRelation extends Logging {
 
   // The parquet compression short names
   val shortParquetCompressionCodecNames = Map(
-    "NONE" -> CompressionCodecName.UNCOMPRESSED,
-    "UNCOMPRESSED" -> CompressionCodecName.UNCOMPRESSED,
-    "SNAPPY" -> CompressionCodecName.SNAPPY,
-    "GZIP" -> CompressionCodecName.GZIP,
-    "LZO" -> CompressionCodecName.LZO)
+    "none" -> CompressionCodecName.UNCOMPRESSED,
+    "uncompressed" -> CompressionCodecName.UNCOMPRESSED,
+    "snappy" -> CompressionCodecName.SNAPPY,
+    "gzip" -> CompressionCodecName.GZIP,
+    "lzo" -> CompressionCodecName.LZO)
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index 6ae42a30fb00c29ca5824ec15eaa472f6767cbf1..59e0e6a7cfa006ff0e8d21af2a7dd18460f4859b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.text
 
-import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.util.Utils
@@ -58,18 +58,21 @@ class TextSuite extends QueryTest with SharedSQLContext {
   }
 
   test("SPARK-13503 Support to specify the option for compression codec for TEXT") {
-    val df = sqlContext.read.text(testFile).withColumnRenamed("value", "adwrasdf")
+    val testDf = sqlContext.read.text(testFile)
 
-    val tempFile = Utils.createTempDir()
-    tempFile.delete()
-    df.write
-      .option("compression", "gZiP")
-      .text(tempFile.getCanonicalPath)
-    val compressedFiles = tempFile.listFiles()
-    assert(compressedFiles.exists(_.getName.endsWith(".gz")))
-    verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath))
+    Seq("bzip2", "deflate", "gzip").foreach { codecName =>
+      val tempDir = Utils.createTempDir()
+      val tempDirPath = tempDir.getAbsolutePath()
+      testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath)
+      verifyFrame(sqlContext.read.text(tempDirPath))
+    }
 
-    Utils.deleteRecursively(tempFile)
+    val errMsg = intercept[IllegalArgumentException] {
+      val tempDirPath = Utils.createTempDir().getAbsolutePath()
+      testDf.write.option("compression", "illegal").mode(SaveMode.Overwrite).text(tempDirPath)
+    }
+    assert(errMsg.getMessage === "Codec [illegal] is not available. " +
+      "Known codecs are bzip2, deflate, lz4, gzip, snappy.")
   }
 
   private def testFile: String = {