diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 1a829c6fafe03c7cfc92ce48ad6684e29a87bd74..f1093701ddc8984abc0cd71e2f5c1040c132dc8f 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -672,12 +672,12 @@ _acceptable_types = {
     ByteType: (int, long),
     ShortType: (int, long),
     IntegerType: (int, long),
-    LongType: (int, long),
+    LongType: (long,),
     FloatType: (float,),
     DoubleType: (float,),
     DecimalType: (decimal.Decimal,),
     StringType: (str, unicode),
-    TimestampType: (datetime.datetime, datetime.time, datetime.date),
+    TimestampType: (datetime.datetime,),
     ArrayType: (list, tuple, array),
     MapType: (dict,),
     StructType: (tuple, list),
@@ -1042,12 +1042,15 @@ class SQLContext:
         [Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')]
 
         >>> from datetime import datetime
-        >>> rdd = sc.parallelize([(127, -32768, 1.0,
+        >>> rdd = sc.parallelize([(127, -128L, -32768, 32767, 2147483647L, 1.0,
         ...     datetime(2010, 1, 1, 1, 1, 1),
         ...     {"a": 1}, (2,), [1, 2, 3], None)])
         >>> schema = StructType([
-        ...     StructField("byte", ByteType(), False),
-        ...     StructField("short", ShortType(), False),
+        ...     StructField("byte1", ByteType(), False),
+        ...     StructField("byte2", ByteType(), False),
+        ...     StructField("short1", ShortType(), False),
+        ...     StructField("short2", ShortType(), False),
+        ...     StructField("int", IntegerType(), False),
         ...     StructField("float", FloatType(), False),
         ...     StructField("time", TimestampType(), False),
         ...     StructField("map",
@@ -1056,11 +1059,19 @@ class SQLContext:
         ...         StructType([StructField("b", ShortType(), False)]), False),
         ...     StructField("list", ArrayType(ByteType(), False), False),
         ...     StructField("null", DoubleType(), True)])
-        >>> srdd = sqlCtx.applySchema(rdd, schema).map(
-        ...     lambda x: (x.byte, x.short, x.float, x.time,
+        >>> srdd = sqlCtx.applySchema(rdd, schema)
+        >>> results = srdd.map(
+        ...     lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int, x.float, x.time,
         ...         x.map["a"], x.struct.b, x.list, x.null))
-        >>> srdd.collect()[0]
-        (127, -32768, 1.0, ...(2010, 1, 1, 1, 1, 1), 1, 2, [1, 2, 3], None)
+        >>> results.collect()[0]
+        (127, -128, -32768, 32767, 2147483647, 1.0, ...(2010, 1, 1, 1, 1, 1), 1, 2, [1, 2, 3], None)
+
+        >>> srdd.registerTempTable("table2")
+        >>> sqlCtx.sql(
+        ...   "SELECT byte1 - 1 AS byte1, byte2 + 1 AS byte2, " +
+        ...     "short1 + 1 AS short1, short2 - 1 AS short2, int - 1 AS int, " +
+        ...     "float + 1.1 as float FROM table2").collect()
+        [Row(byte1=126, byte2=-127, short1=-32767, short2=32766, int=2147483646, float=2.1)]
 
         >>> rdd = sc.parallelize([(127, -32768, 1.0,
         ...     datetime(2010, 1, 1, 1, 1, 1),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index ecd5fbaa0b09405726fd70d0715d84cb4096922f..71d338d21d0f28798ff5559d9b43d373cf069e25 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -491,7 +491,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
         new java.sql.Timestamp(c.getTime().getTime())
 
       case (c: Int, ByteType) => c.toByte
+      case (c: Long, ByteType) => c.toByte
       case (c: Int, ShortType) => c.toShort
+      case (c: Long, ShortType) => c.toShort
+      case (c: Long, IntegerType) => c.toInt
       case (c: Double, FloatType) => c.toFloat
       case (c, StringType) if !c.isInstanceOf[String] => c.toString