diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 2ac2383d699c47a8f24cbf9358622ca1aa0f4eb2..ee231a934a3af83d5f2ce475a9aa39d7a9e9cfd7 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1308,6 +1308,13 @@ the following case-insensitive options:
      </td>
    </tr>
 
+  <tr>
+     <td><code>sessionInitStatement</code></td>
+     <td>
+       After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block). Use this to implement session initialization code. Example: <code>option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")</code>
+     </td>
+  </tr>
+
   <tr>
     <td><code>truncate</code></td>
     <td>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 96a8a51da18e54388a7eed6ebf4dd90384c583b7..05b00058618a290065ee83b83c69a2b158b20e6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -138,6 +138,8 @@ class JDBCOptions(
       case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ
       case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE
     }
+  // An option to execute custom SQL before fetching data from the remote DB
+  val sessionInitStatement = parameters.get(JDBC_SESSION_INIT_STATEMENT)
 }
 
 object JDBCOptions {
@@ -161,4 +163,5 @@ object JDBCOptions {
   val JDBC_CREATE_TABLE_COLUMN_TYPES = newOption("createTableColumnTypes")
   val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
   val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
+  val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 24e13697c0c9f92ec20d6f1395ae7ba758e45415..3274be91d48175c841648e71d347a2968cc7211f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -273,6 +273,21 @@ private[jdbc] class JDBCRDD(
     import scala.collection.JavaConverters._
     dialect.beforeFetch(conn, options.asProperties.asScala.toMap)
 
+    // This executes a generic SQL statement (or PL/SQL block) before reading
+    // the table/query via JDBC. Use this feature to initialize the database
+    // session environment, e.g. for optimizations and/or troubleshooting.
+    options.sessionInitStatement match {
+      case Some(sql) =>
+        val statement = conn.prepareStatement(sql)
+        logInfo(s"Executing sessionInitStatement: $sql")
+        try {
+          statement.execute()
+        } finally {
+          statement.close()
+        }
+      case None =>
+    }
+
     // H2's JDBC driver does not support the setSchema() method.  We pass a
     // fully-qualified table name in the SELECT statement.  I don't know how to
     // talk about a table in a completely portable way.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 4c43646889418fc171d7856c6d18ed0c08a88e0f..8dc11d80c30633ec93336142239c5a3cbf880a4a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1044,4 +1044,35 @@ class JDBCSuite extends SparkFunSuite
       assert(sql("select * from people_view").count() == 3)
     }
   }
+
+  test("SPARK-21519: option sessionInitStatement, run SQL to initialize the database session.") {
+    val initSQL1 = "SET @MYTESTVAR 21519"
+    val df1 = spark.read.format("jdbc")
+      .option("url", urlWithUserAndPass)
+      .option("dbtable", "(SELECT NVL(@MYTESTVAR, -1))")
+      .option("sessionInitStatement", initSQL1)
+      .load()
+    assert(df1.collect() === Array(Row(21519)))
+
+    val initSQL2 = "SET SCHEMA DUMMY"
+    val df2 = spark.read.format("jdbc")
+      .option("url", urlWithUserAndPass)
+      .option("dbtable", "TEST.PEOPLE")
+      .option("sessionInitStatement", initSQL2)
+      .load()
+    val e = intercept[SparkException] {df2.collect()}.getMessage
+    assert(e.contains("""Schema "DUMMY" not found"""))
+
+    sql(
+      s"""
+         |CREATE OR REPLACE TEMPORARY VIEW test_sessionInitStatement
+         |USING org.apache.spark.sql.jdbc
+         |OPTIONS (url '$urlWithUserAndPass',
+         |dbtable '(SELECT NVL(@MYTESTVAR1, -1), NVL(@MYTESTVAR2, -1))',
+         |sessionInitStatement 'SET @MYTESTVAR1 21519; SET @MYTESTVAR2 1234')
+       """.stripMargin)
+
+      val df3 = sql("SELECT * FROM test_sessionInitStatement")
+      assert(df3.collect() === Array(Row(21519, 1234)))
+    }
 }