diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 18b78ab5067cbed34136626299f620dd8ede4136..40dc81e02dac31998a70ecfde41c13c8981d8f64 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -96,8 +96,10 @@ private[hive] class SparkExecuteStatementOperation(
       case DateType =>
         to += from.getAs[Date](ordinal)
       case TimestampType =>
-        to +=  from.getAs[Timestamp](ordinal)
-      case BinaryType | _: ArrayType | _: StructType | _: MapType =>
+        to += from.getAs[Timestamp](ordinal)
+      case BinaryType =>
+        to += from.getAs[Array[Byte]](ordinal)
+      case _: ArrayType | _: StructType | _: MapType =>
         val hiveString = HiveUtils.toHiveString((from.get(ordinal), dataTypes(ordinal)))
         to += hiveString
     }
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index f15f5b01e252e4b722a93c6a355fedd9b98fde2b..55a93ea06ba570532856f4d32ac5995fe55c406c 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -202,6 +202,25 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
     }
   }
 
+  test("SPARK-12143 regression: Binary type support") {
+    withJdbcStatement { statement =>
+      val queries = Seq(
+        "DROP TABLE IF EXISTS test_binary",
+        "CREATE TABLE test_binary(key INT, value STRING)",
+        s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_binary")
+
+      queries.foreach(statement.execute)
+
+      val expected: Array[Byte] = "val_238".getBytes
+      assertResult(expected) {
+        val resultSet = statement.executeQuery(
+          "SELECT CAST(value as BINARY) FROM test_date LIMIT 1")
+        resultSet.next()
+        resultSet.getObject(1)
+      }
+    }
+  }
+
   test("test multiple session") {
     import org.apache.spark.sql.internal.SQLConf
     var defaultV1: String = null