diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
index e61a8eb628891bb7701e17d02586d7cec6ccfe01..e76d4dc6125df5be79b4a08f8ded686f9e2750bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
@@ -25,7 +25,7 @@ import org.apache.commons.io.IOUtils
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
@@ -52,8 +52,13 @@ class RateSourceProvider extends StreamSourceProvider with DataSourceRegister {
       sqlContext: SQLContext,
       schema: Option[StructType],
       providerName: String,
-      parameters: Map[String, String]): (String, StructType) =
+      parameters: Map[String, String]): (String, StructType) = {
+    if (schema.nonEmpty) {
+      throw new AnalysisException("The rate source does not support a user-specified schema.")
+    }
+
     (shortName(), RateSourceProvider.SCHEMA)
+  }
 
   override def createSource(
       sqlContext: SQLContext,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
index 58bff27a05bf3c3c5ab2b9f53a284809a00b0e58..8e63207959575f552e4e2508ae0776c6c3bdef21 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
@@ -195,13 +195,17 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis
     if (!parameters.contains("port")) {
       throw new AnalysisException("Set a port to read from with option(\"port\", ...).")
     }
-    val schema =
+    if (schema.nonEmpty) {
+      throw new AnalysisException("The socket source does not support a user-specified schema.")
+    }
+
+    val sourceSchema =
       if (parseIncludeTimestamp(parameters)) {
         TextSocketSource.SCHEMA_TIMESTAMP
       } else {
         TextSocketSource.SCHEMA_REGULAR
       }
-    ("textSocket", schema)
+    ("textSocket", sourceSchema)
   }
 
   override def createSource(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
index bdba536425a431a8b3cf2223cf4e5bde2b22ee36..03d0f63fa4d7fba6624fddca1fe65969dbcc284f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming
 
 import java.util.concurrent.TimeUnit
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
 import org.apache.spark.util.ManualClock
@@ -179,4 +180,15 @@ class RateSourceSuite extends StreamTest {
     testIllegalOptionValue("rowsPerSecond", "-1", Seq("-1", "rowsPerSecond", "positive"))
     testIllegalOptionValue("numPartitions", "-1", Seq("-1", "numPartitions", "positive"))
   }
+
+  test("user-specified schema given") {
+    val exception = intercept[AnalysisException] {
+      spark.readStream
+        .format("rate")
+        .schema(spark.range(1).schema)
+        .load()
+    }
+    assert(exception.getMessage.contains(
+      "rate source does not support a user-specified schema"))
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
index 5174a0415304cedc073a50decd3b1788eec9def8..9ebf4d2835266ef34bbf174e3993dd4123d666dc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
@@ -148,6 +148,21 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
     }
   }
 
+  test("user-specified schema given") {
+    val provider = new TextSocketSourceProvider
+    val userSpecifiedSchema = StructType(
+      StructField("name", StringType) ::
+      StructField("area", StringType) :: Nil)
+    val exception = intercept[AnalysisException] {
+      provider.sourceSchema(
+        sqlContext, Some(userSpecifiedSchema),
+        "",
+        Map("host" -> "localhost", "port" -> "1234"))
+    }
+    assert(exception.getMessage.contains(
+      "socket source does not support a user-specified schema"))
+  }
+
   test("no server up") {
     val provider = new TextSocketSourceProvider
     val parameters = Map("host" -> "localhost", "port" -> "0")