diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index af309c0c6ce2c71393c5a8f05833ce0f4a9f13ea..3563472c7ae81641e18e1c623facd35c87c54189 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities
 import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
 import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc}
 import org.apache.hadoop.hive.serde2.Deserializer
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
 import org.apache.hadoop.hive.serde2.objectinspector.primitive._
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
@@ -116,7 +116,7 @@ class HadoopTableReader(
       val hconf = broadcastedHiveConf.value.value
       val deserializer = deserializerClass.newInstance()
       deserializer.initialize(hconf, tableDesc.getProperties)
-      HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow)
+      HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer)
     }
 
     deserializedHadoopRDD
@@ -189,9 +189,13 @@ class HadoopTableReader(
         val hconf = broadcastedHiveConf.value.value
         val deserializer = localDeserializer.newInstance()
         deserializer.initialize(hconf, partProps)
+        // get the table deserializer
+        val tableSerDe = tableDesc.getDeserializerClass.newInstance()
+        tableSerDe.initialize(hconf, tableDesc.getProperties)
 
         // fill the non partition key attributes
-        HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, mutableRow)
+        HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs,
+          mutableRow, tableSerDe)
       }
     }.toSeq
 
@@ -261,25 +265,36 @@ private[hive] object HadoopTableReader extends HiveInspectors {
    * Transform all given raw `Writable`s into `Row`s.
    *
    * @param iterator Iterator of all `Writable`s to be transformed
-   * @param deserializer The `Deserializer` associated with the input `Writable`
+   * @param rawDeser The `Deserializer` associated with the input `Writable`
    * @param nonPartitionKeyAttrs Attributes that should be filled together with their corresponding
    *                             positions in the output schema
    * @param mutableRow A reusable `MutableRow` that should be filled
+   * @param tableDeser Table Deserializer
    * @return An `Iterator[Row]` transformed from `iterator`
    */
   def fillObject(
       iterator: Iterator[Writable],
-      deserializer: Deserializer,
+      rawDeser: Deserializer,
       nonPartitionKeyAttrs: Seq[(Attribute, Int)],
-      mutableRow: MutableRow): Iterator[Row] = {
+      mutableRow: MutableRow,
+      tableDeser: Deserializer): Iterator[Row] = {
+
+    val soi = if (rawDeser.getObjectInspector.equals(tableDeser.getObjectInspector)) {
+      rawDeser.getObjectInspector.asInstanceOf[StructObjectInspector]
+    } else {
+      HiveShim.getConvertedOI(
+        rawDeser.getObjectInspector,
+        tableDeser.getObjectInspector).asInstanceOf[StructObjectInspector]
+    }
 
-    val soi = deserializer.getObjectInspector().asInstanceOf[StructObjectInspector]
     val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) =>
       soi.getStructFieldRef(attr.name) -> ordinal
     }.unzip
 
-    // Builds specific unwrappers ahead of time according to object inspector types to avoid pattern
-    // matching and branching costs per row.
+    /**
+     * Builds specific unwrappers ahead of time according to object inspector
+     * types to avoid pattern matching and branching costs per row.
+     */
     val unwrappers: Seq[(Any, MutableRow, Int) => Unit] = fieldRefs.map {
       _.getFieldObjectInspector match {
         case oi: BooleanObjectInspector =>
@@ -316,9 +331,11 @@ private[hive] object HadoopTableReader extends HiveInspectors {
       }
     }
 
+    val converter = ObjectInspectorConverters.getConverter(rawDeser.getObjectInspector, soi)
+
     // Map each tuple to a row object
     iterator.map { value =>
-      val raw = deserializer.deserialize(value)
+      val raw = converter.convert(rawDeser.deserialize(value))
       var i = 0
       while (i < fieldRefs.length) {
         val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 381cd2a29123e9a9cb2094d07a2406e823858642..aa6fb42de7f884aeefcf36cc9b86b3db49f5e40c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -32,9 +32,12 @@ import org.apache.spark.sql.hive.test.TestHive._
 
 case class TestData(key: Int, value: String)
 
+case class ThreeCloumntable(key: Int, value: String, key1: String)
+
 class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
   import org.apache.spark.sql.hive.test.TestHive.implicits._
 
+
   val testData = TestHive.sparkContext.parallelize(
     (1 to 100).map(i => TestData(i, i.toString))).toDF()
 
@@ -186,4 +189,43 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
 
     sql("DROP TABLE hiveTableWithStructValue")
   }
+
+  test("SPARK-5498:partition schema does not match table schema") {
+    val testData = TestHive.sparkContext.parallelize(
+      (1 to 10).map(i => TestData(i, i.toString))).toDF()
+    testData.registerTempTable("testData")
+
+    val testDatawithNull = TestHive.sparkContext.parallelize(
+      (1 to 10).map(i => ThreeCloumntable(i, i.toString,null))).toDF()
+
+    val tmpDir = Files.createTempDir()
+    sql(s"CREATE TABLE table_with_partition(key int,value string) PARTITIONED by (ds string) location '${tmpDir.toURI.toString}' ")
+    sql("INSERT OVERWRITE TABLE table_with_partition  partition (ds='1') SELECT key,value FROM testData")
+
+    // test schema the same between partition and table
+    sql("ALTER TABLE table_with_partition CHANGE COLUMN key key BIGINT")
+    checkAnswer(sql("select key,value from table_with_partition where ds='1' "),
+      testData.collect.toSeq
+    )
+    
+    // test difference type of field
+    sql("ALTER TABLE table_with_partition CHANGE COLUMN key key BIGINT")
+    checkAnswer(sql("select key,value from table_with_partition where ds='1' "),
+      testData.collect.toSeq
+    )
+
+    // add column to table
+    sql("ALTER TABLE table_with_partition ADD COLUMNS(key1 string)")
+    checkAnswer(sql("select key,value,key1 from table_with_partition where ds='1' "),
+      testDatawithNull.collect.toSeq
+    )
+
+    // change column name to table
+    sql("ALTER TABLE table_with_partition CHANGE COLUMN key keynew BIGINT")
+    checkAnswer(sql("select keynew,value from table_with_partition where ds='1' "),
+      testData.collect.toSeq
+    )
+
+    sql("DROP TABLE table_with_partition")
+  }
 }
diff --git a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
index 30646ddbc29d8596b9ec6421eb9834f7894db494..0ed93c2c5b1fa39e32bef2854155bc2f48d50a02 100644
--- a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
+++ b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, FileSinkDesc, TableDesc}
 import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.stats.StatsSetupConst
 import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Deserializer, io => hiveIo}
-import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, PrimitiveObjectInspector}
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, ObjectInspector, PrimitiveObjectInspector}
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.{HiveDecimalObjectInspector, PrimitiveObjectInspectorFactory}
 import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfo, TypeInfoFactory}
@@ -210,7 +210,7 @@ private[hive] object HiveShim {
 
   def getDataLocationPath(p: Partition) = p.getPartitionPath
 
-  def getAllPartitionsOf(client: Hive, tbl: Table) =  client.getAllPartitionsForPruner(tbl)
+  def getAllPartitionsOf(client: Hive, tbl: Table) = client.getAllPartitionsForPruner(tbl)
 
   def compatibilityBlackList = Seq(
     "decimal_.*",
@@ -244,6 +244,12 @@ private[hive] object HiveShim {
     }
   }
 
+  def getConvertedOI(
+      inputOI: ObjectInspector,
+      outputOI: ObjectInspector): ObjectInspector = {
+    ObjectInspectorConverters.getConvertedOI(inputOI, outputOI, true)
+  }
+
   def prepareWritable(w: Writable): Writable = {
     w
   }
diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
index f9fcbdae15745f7264f75a2b2621f8de5c08106f..7577309900209ab300fe00d990b2d12134c60c89 100644
--- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
+++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hive
 
+import java.util
 import java.util.{ArrayList => JArrayList}
 import java.util.Properties
 import java.rmi.server.UID
@@ -38,7 +39,7 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
 import org.apache.hadoop.hive.serde.serdeConstants
 import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfo, DecimalTypeInfo, TypeInfoFactory}
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.{HiveDecimalObjectInspector, PrimitiveObjectInspectorFactory}
-import org.apache.hadoop.hive.serde2.objectinspector.{PrimitiveObjectInspector, ObjectInspector}
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, PrimitiveObjectInspector, ObjectInspector}
 import org.apache.hadoop.hive.serde2.{Deserializer, ColumnProjectionUtils}
 import org.apache.hadoop.hive.serde2.{io => hiveIo}
 import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable
@@ -400,7 +401,11 @@ private[hive] object HiveShim {
       Decimal(hdoi.getPrimitiveJavaObject(data).bigDecimalValue(), hdoi.precision(), hdoi.scale())
     }
   }
- 
+
+  def getConvertedOI(inputOI: ObjectInspector, outputOI: ObjectInspector): ObjectInspector = {
+    ObjectInspectorConverters.getConvertedOI(inputOI, outputOI)
+  }
+
   /*
    * Bug introduced in hive-0.13. AvroGenericRecordWritable has a member recordReaderID that
    * is needed to initialize before serialization.