diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5f17fdf9467db42980336a289f91f690f8964548..d3273025b68850206e2a75ff33e9af0a64b3dac2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.spark.sql.execution.datasources
 
@@ -432,7 +432,7 @@ case class DataSource(
         }
 
         val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
-        PartitioningUtils.validatePartitionColumnDataTypes(
+        PartitioningUtils.validatePartitionColumn(
           data.schema, partitionColumns, caseSensitive)
 
         // If we are appending to a table that already exists, make sure the partitioning matches
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 74f2993754f8f5aeb2fb73be19f013630f7c2605..2340ff0afed7404a3895bc3bd5f4645561732ef1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -339,7 +339,7 @@ private[sql] object PartitioningUtils {
   private val upCastingOrder: Seq[DataType] =
     Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
 
-  def validatePartitionColumnDataTypes(
+  def validatePartitionColumn(
       schema: StructType,
       partitionColumns: Seq[String],
       caseSensitive: Boolean): Unit = {
@@ -350,6 +350,10 @@ private[sql] object PartitioningUtils {
         case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column")
       }
     }
+
+    if (partitionColumns.size == schema.fields.size) {
+      throw new AnalysisException(s"Cannot use all columns for partition columns")
+    }
   }
 
   def partitionColumnsSchema(
@@ -359,7 +363,7 @@ private[sql] object PartitioningUtils {
     val equality = columnNameEquality(caseSensitive)
     StructType(partitionColumns.map { col =>
       schema.find(f => equality(f.name, col)).getOrElse {
-        throw new RuntimeException(s"Partition column $col not found in schema $schema")
+        throw new AnalysisException(s"Partition column $col not found in schema $schema")
       }
     }).asNullable
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 9afd715016d88242841554169205851532139394..7ac62fb191d40689335649eff99f36a3810850f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -154,7 +154,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
           // OK
         }
 
-        PartitioningUtils.validatePartitionColumnDataTypes(
+        PartitioningUtils.validatePartitionColumn(
           r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis)
 
         // Get all input data source relations of the query.
@@ -205,7 +205,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
           // OK
         }
 
-        PartitioningUtils.validatePartitionColumnDataTypes(
+        PartitioningUtils.validatePartitionColumn(
           c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis)
 
         for {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index e19101032967b7ae002ecaaf25fc4ba877688643..efb04912d76bf7ecc81041f23eed9ad91cf7b2e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -91,7 +91,7 @@ class FileStreamSinkWriter(
     hadoopConf: Configuration,
     options: Map[String, String]) extends Serializable with Logging {
 
-  PartitioningUtils.validatePartitionColumnDataTypes(
+  PartitioningUtils.validatePartitionColumn(
     data.schema, partitionColumnNames, data.sqlContext.conf.caseSensitiveAnalysis)
 
   private val serializableConf = new SerializableConfiguration(hadoopConf)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index 431a943304f5bad405d6fe6b726f5cf56d5b1464..bf6063a4c457c84162fb6ffb8fbda66eee4ce5cf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -572,4 +572,16 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
 
     cq.awaitTermination(2000L)
   }
+
+  test("prevent all column partitioning") {
+    withTempDir { dir =>
+      val path = dir.getCanonicalPath
+      intercept[AnalysisException] {
+        spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path)
+      }
+      intercept[AnalysisException] {
+        spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path)
+      }
+    }
+  }
 }