diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
index 831fb4fe95fe7c6e0c12d5913b82290abf00d915..96e2aee4de15b3578f0d85bfca8bb836ad970b82 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
@@ -69,6 +69,7 @@ trait HiveTypeCoercion {
   val typeCoercionRules =
     PropagateTypes ::
     ConvertNaNs ::
+    InConversion ::
     WidenTypes ::
     PromoteStrings ::
     DecimalPrecision ::
@@ -287,6 +288,16 @@ trait HiveTypeCoercion {
     }
   }
 
+  /**
+   * Convert all expressions in in() list to the left operator type
+   */
+  object InConversion extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+      case i @ In(a, b) if b.exists(_.dataType != a.dataType) =>
+        i.makeCopy(Array(a, b.map(Cast(_, a.dataType))))
+    }
+  }
+
   // scalastyle:off
   /**
    * Calculates and propagates precision for fixed-precision decimals. Hive has a number of
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 709f7d672d931bad272b97eef25a0f1168227369..e4a60f53d6c09d6468af4d865c2ab43efbdbbf26 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -310,8 +310,8 @@ object OptimizeIn extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case q: LogicalPlan => q transformExpressionsDown {
       case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) =>
-          val hSet = list.map(e => e.eval(null))
-          InSet(v, HashSet() ++ hSet)
+        val hSet = list.map(e => e.eval(null))
+        InSet(v, HashSet() ++ hSet)
     }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 0ab8558c1db133d75ccf72f58065c3475fa12caa..208cec6a32d4da0b56ec437e8c1884516f523c96 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -120,6 +120,15 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
       Row(1, 1) :: Nil)
   }
 
+  test("SPARK-6201 IN type conversion") {
+    jsonRDD(sparkContext.parallelize(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}")))
+      .registerTempTable("d")
+
+    checkAnswer(
+      sql("select * from d where d.a in (1,2)"),
+      Seq(Row("1"), Row("2")))
+  }
+
   test("SPARK-3176 Added Parser of SQL ABS()") {
     checkAnswer(
       sql("SELECT ABS(-1.3)"),