diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 0efe3c4d456ee4bda8784add214914d6177ec25b..6e798a53adf2702d439add89b2c55fed67b48f30 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -29,6 +29,7 @@ trait CatalystConf {
   def groupByOrdinal: Boolean
 
   def optimizerMaxIterations: Int
+  def optimizerInSetConversionThreshold: Int
   def maxCaseBranchesForCodegen: Int
 
   /**
@@ -47,6 +48,7 @@ case class SimpleCatalystConf(
     orderByOrdinal: Boolean = true,
     groupByOrdinal: Boolean = true,
     optimizerMaxIterations: Int = 100,
+    optimizerInSetConversionThreshold: Int = 10,
     maxCaseBranchesForCodegen: Int = 20)
   extends CatalystConf {
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index e974f69ef12ef8849799b032362215b97e8574b8..660314f86e8f5205c7675fb9abb9229a54baf1d9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -87,7 +87,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
       CombineUnions,
       // Constant folding and strength reduction
       NullPropagation,
-      OptimizeIn,
+      OptimizeIn(conf),
       ConstantFolding,
       LikeSimplification,
       BooleanSimplification,
@@ -682,10 +682,11 @@ object ConstantFolding extends Rule[LogicalPlan] {
  * Replaces [[In (value, seq[Literal])]] with optimized version[[InSet (value, HashSet[Literal])]]
  * which is much faster
  */
-object OptimizeIn extends Rule[LogicalPlan] {
+case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case q: LogicalPlan => q transformExpressionsDown {
-      case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) && list.size > 10 =>
+      case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) &&
+          list.size > conf.optimizerInSetConversionThreshold =>
         val hSet = list.map(e => e.eval(EmptyRow))
         InSet(v, HashSet() ++ hSet)
     }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
index 641c89873dcc4817e5db1f1677226d80517ed844..d9655bbcc2ce1dff431ea6d774f25474b671a074 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedExtractValue}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
@@ -33,7 +34,7 @@ class ConstantFoldingSuite extends PlanTest {
       Batch("AnalysisNodes", Once,
         EliminateSubqueryAliases) ::
       Batch("ConstantFolding", Once,
-        OptimizeIn,
+        OptimizeIn(SimpleCatalystConf(true)),
         ConstantFolding,
         BooleanSimplification) :: Nil
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
index 0e43ce034fb485a2763b8c10d4ecb95fc336ef3d..f1a4ea8280ab759fe30ee908e40890ddad262f82 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala
@@ -17,11 +17,12 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.types._
@@ -36,7 +37,7 @@ class OptimizeInSuite extends PlanTest {
         NullPropagation,
         ConstantFolding,
         BooleanSimplification,
-        OptimizeIn) :: Nil
+        OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true))) :: Nil
   }
 
   val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
@@ -128,4 +129,23 @@ class OptimizeInSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
+  test("OptimizedIn test: Setting the threshold for turning Set into InSet.") {
+    val plan =
+      testRelation
+        .where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(2), Literal(3))))
+        .analyze
+
+    val notOptimizedPlan = OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true))(plan)
+    comparePlans(notOptimizedPlan, plan)
+
+    // Reduce the threshold to turning into InSet.
+    val optimizedPlan = OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true,
+        optimizerInSetConversionThreshold = 2))(plan)
+    optimizedPlan match {
+      case Filter(cond, _)
+        if cond.isInstanceOf[InSet] && cond.asInstanceOf[InSet].getHSet().size == 3 =>
+          // pass
+      case _ => fail("Unexpected result for OptimizedIn")
+    }
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 4fa7e231cc90b707d5c4475138f187123ff8aaba..2fe1818866d71e9f58e265b03e45db91e985debc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -54,10 +54,17 @@ object SQLConf {
 
   val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations")
     .internal()
-    .doc("The max number of iterations the optimizer and analyzer runs")
+    .doc("The max number of iterations the optimizer and analyzer runs.")
     .intConf
     .createWithDefault(100)
 
+  val OPTIMIZER_INSET_CONVERSION_THRESHOLD =
+    SQLConfigBuilder("spark.sql.optimizer.inSetConversionThreshold")
+      .internal()
+      .doc("The threshold of set size for InSet conversion.")
+      .intConf
+      .createWithDefault(10)
+
   val ALLOW_MULTIPLE_CONTEXTS = SQLConfigBuilder("spark.sql.allowMultipleContexts")
     .doc("When set to true, creating multiple SQLContexts/HiveContexts is allowed. " +
       "When set to false, only one SQLContext/HiveContext is allowed to be created " +
@@ -537,6 +544,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS)
 
+  def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)
+
   def checkpointLocation: String = getConf(CHECKPOINT_LOCATION)
 
   def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)