diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGetters.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGetters.java
index 5f28d52a94bd7c7bafea170c9bc1906f71996b2d..bc345dcd00e49ec1de5089eb46da2b4e41870340 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGetters.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGetters.java
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.unsafe.types.Interval;
+import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 public interface SpecializedGetters {
@@ -46,7 +46,7 @@ public interface SpecializedGetters {
 
   byte[] getBinary(int ordinal);
 
-  Interval getInterval(int ordinal);
+  CalendarInterval getInterval(int ordinal);
 
   InternalRow getStruct(int ordinal, int numFields);
 
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 64a8edc34d681e402491105fd4c82f678c868072..6d684bac37573ec4f8adbf00222975dffc9b2af6 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -29,7 +29,7 @@ import org.apache.spark.unsafe.PlatformDependent;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
 import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
-import org.apache.spark.unsafe.types.Interval;
+import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 import static org.apache.spark.sql.types.DataTypes.*;
@@ -92,7 +92,7 @@ public final class UnsafeRow extends MutableRow {
       Arrays.asList(new DataType[]{
         StringType,
         BinaryType,
-        IntervalType
+        CalendarIntervalType
       }));
     _readableFieldTypes.addAll(settableFieldTypes);
     readableFieldTypes = Collections.unmodifiableSet(_readableFieldTypes);
@@ -265,7 +265,7 @@ public final class UnsafeRow extends MutableRow {
       return getBinary(ordinal);
     } else if (dataType instanceof StringType) {
       return getUTF8String(ordinal);
-    } else if (dataType instanceof IntervalType) {
+    } else if (dataType instanceof CalendarIntervalType) {
       return getInterval(ordinal);
     } else if (dataType instanceof StructType) {
       return getStruct(ordinal, ((StructType) dataType).size());
@@ -350,7 +350,7 @@ public final class UnsafeRow extends MutableRow {
   }
 
   @Override
-  public Interval getInterval(int ordinal) {
+  public CalendarInterval getInterval(int ordinal) {
     if (isNullAt(ordinal)) {
       return null;
     } else {
@@ -359,7 +359,7 @@ public final class UnsafeRow extends MutableRow {
       final int months = (int) PlatformDependent.UNSAFE.getLong(baseObject, baseOffset + offset);
       final long microseconds =
         PlatformDependent.UNSAFE.getLong(baseObject, baseOffset + offset + 8);
-      return new Interval(months, microseconds);
+      return new CalendarInterval(months, microseconds);
     }
   }
 
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRowWriters.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRowWriters.java
index 32faad374015c6ca7af7a153208bef62fecef801..c3259e21c4a785261eb3e8930903c30991892376 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRowWriters.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRowWriters.java
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.unsafe.PlatformDependent;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
 import org.apache.spark.unsafe.types.ByteArray;
-import org.apache.spark.unsafe.types.Interval;
+import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
@@ -131,7 +131,7 @@ public class UnsafeRowWriters {
   /** Writer for interval type. */
   public static class IntervalWriter {
 
-    public static int write(UnsafeRow target, int ordinal, int cursor, Interval input) {
+    public static int write(UnsafeRow target, int ordinal, int cursor, CalendarInterval input) {
       final long offset = target.getBaseOffset() + cursor;
 
       // Write the months and microseconds fields of Interval to the variable length portion.
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java b/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java
index 5703de42393de7a47f2ad2695f84df7857a9868b..17659d7d960b04be91072435461f20435e39efbf 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java
@@ -50,9 +50,9 @@ public class DataTypes {
   public static final DataType TimestampType = TimestampType$.MODULE$;
 
   /**
-   * Gets the IntervalType object.
+   * Gets the CalendarIntervalType object.
    */
-  public static final DataType IntervalType = IntervalType$.MODULE$;
+  public static final DataType CalendarIntervalType = CalendarIntervalType$.MODULE$;
 
   /**
    * Gets the DoubleType object.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
index e395a67434fa71d028306f478515054c09f9ce78..a5999e64ec554b1eb128a4aedbd58f0806bca3bf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.{Interval, UTF8String}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 
 /**
  * An abstract class for row used internal in Spark SQL, which only contain the columns as
@@ -61,7 +61,8 @@ abstract class InternalRow extends Serializable with SpecializedGetters {
   override def getDecimal(ordinal: Int): Decimal =
     getAs[Decimal](ordinal, DecimalType.SYSTEM_DEFAULT)
 
-  override def getInterval(ordinal: Int): Interval = getAs[Interval](ordinal, IntervalType)
+  override def getInterval(ordinal: Int): CalendarInterval =
+    getAs[CalendarInterval](ordinal, CalendarIntervalType)
 
   // This is only use for test and will throw a null pointer exception if the position is null.
   def getString(ordinal: Int): String = getUTF8String(ordinal).toString
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index e5f115f74bf3b96974f86caa77d7c6ff60d42e9a..f2498861c9573265b29a619c81ed7ebb90b687fc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.Interval
+import org.apache.spark.unsafe.types.CalendarInterval
 
 /**
  * A very simple SQL parser.  Based loosely on:
@@ -365,32 +365,32 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
 
   protected lazy val millisecond: Parser[Long] =
     integral <~ intervalUnit("millisecond") ^^ {
-      case num => num.toLong * Interval.MICROS_PER_MILLI
+      case num => num.toLong * CalendarInterval.MICROS_PER_MILLI
     }
 
   protected lazy val second: Parser[Long] =
     integral <~ intervalUnit("second") ^^ {
-      case num => num.toLong * Interval.MICROS_PER_SECOND
+      case num => num.toLong * CalendarInterval.MICROS_PER_SECOND
     }
 
   protected lazy val minute: Parser[Long] =
     integral <~ intervalUnit("minute") ^^ {
-      case num => num.toLong * Interval.MICROS_PER_MINUTE
+      case num => num.toLong * CalendarInterval.MICROS_PER_MINUTE
     }
 
   protected lazy val hour: Parser[Long] =
     integral <~ intervalUnit("hour") ^^ {
-      case num => num.toLong * Interval.MICROS_PER_HOUR
+      case num => num.toLong * CalendarInterval.MICROS_PER_HOUR
     }
 
   protected lazy val day: Parser[Long] =
     integral <~ intervalUnit("day") ^^ {
-      case num => num.toLong * Interval.MICROS_PER_DAY
+      case num => num.toLong * CalendarInterval.MICROS_PER_DAY
     }
 
   protected lazy val week: Parser[Long] =
     integral <~ intervalUnit("week") ^^ {
-      case num => num.toLong * Interval.MICROS_PER_WEEK
+      case num => num.toLong * CalendarInterval.MICROS_PER_WEEK
     }
 
   protected lazy val intervalLiteral: Parser[Literal] =
@@ -406,7 +406,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
           val months = Seq(year, month).map(_.getOrElse(0)).sum
           val microseconds = Seq(week, day, hour, minute, second, millisecond, microsecond)
             .map(_.getOrElse(0L)).sum
-          Literal.create(new Interval(months, microseconds), IntervalType)
+          Literal.create(new CalendarInterval(months, microseconds), CalendarIntervalType)
       }
 
   private def toNarrowestIntegerType(value: String): Any = {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
index 8304d4ccd47f7731183fe2dd7cd534095bc8310a..371681b5d494f8ebaeed002e8d50fd1bb6b6b67e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
@@ -48,7 +48,7 @@ case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean)
         case DoubleType => input.getDouble(ordinal)
         case StringType => input.getUTF8String(ordinal)
         case BinaryType => input.getBinary(ordinal)
-        case IntervalType => input.getInterval(ordinal)
+        case CalendarIntervalType => input.getInterval(ordinal)
         case t: StructType => input.getStruct(ordinal, t.size)
         case _ => input.get(ordinal, dataType)
       }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index bd8b0177eb00e19a3aa2c9dc2d9bf204e66c4dc9..c6e8af27667eedfe0eb97c617b4d6e160c0ed0e6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.{Interval, UTF8String}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 
 import scala.collection.mutable
 
@@ -55,7 +55,7 @@ object Cast {
 
     case (_, DateType) => true
 
-    case (StringType, IntervalType) => true
+    case (StringType, CalendarIntervalType) => true
 
     case (StringType, _: NumericType) => true
     case (BooleanType, _: NumericType) => true
@@ -225,7 +225,7 @@ case class Cast(child: Expression, dataType: DataType)
   // IntervalConverter
   private[this] def castToInterval(from: DataType): Any => Any = from match {
     case StringType =>
-      buildCast[UTF8String](_, s => Interval.fromString(s.toString))
+      buildCast[UTF8String](_, s => CalendarInterval.fromString(s.toString))
     case _ => _ => null
   }
 
@@ -398,7 +398,7 @@ case class Cast(child: Expression, dataType: DataType)
     case DateType => castToDate(from)
     case decimal: DecimalType => castToDecimal(from, decimal)
     case TimestampType => castToTimestamp(from)
-    case IntervalType => castToInterval(from)
+    case CalendarIntervalType => castToInterval(from)
     case BooleanType => castToBoolean(from)
     case ByteType => castToByte(from)
     case ShortType => castToShort(from)
@@ -438,7 +438,7 @@ case class Cast(child: Expression, dataType: DataType)
     case DateType => castToDateCode(from, ctx)
     case decimal: DecimalType => castToDecimalCode(from, decimal)
     case TimestampType => castToTimestampCode(from, ctx)
-    case IntervalType => castToIntervalCode(from)
+    case CalendarIntervalType => castToIntervalCode(from)
     case BooleanType => castToBooleanCode(from)
     case ByteType => castToByteCode(from)
     case ShortType => castToShortCode(from)
@@ -630,7 +630,7 @@ case class Cast(child: Expression, dataType: DataType)
   private[this] def castToIntervalCode(from: DataType): CastFunction = from match {
     case StringType =>
       (c, evPrim, evNull) =>
-        s"$evPrim = Interval.fromString($c.toString());"
+        s"$evPrim = CalendarInterval.fromString($c.toString());"
   }
 
   private[this] def decimalToTimestampCode(d: String): String =
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
index 4ec866475f8b0b834b9366e5450dc84fcbe652ee..6f8f4dd230f121b31a52df493046c1dd395e64c0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.util.TypeUtils
 import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.Interval
+import org.apache.spark.unsafe.types.CalendarInterval
 
 
 case class UnaryMinus(child: Expression) extends UnaryExpression with ExpectsInputTypes {
@@ -37,12 +37,12 @@ case class UnaryMinus(child: Expression) extends UnaryExpression with ExpectsInp
   override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = dataType match {
     case dt: DecimalType => defineCodeGen(ctx, ev, c => s"$c.unary_$$minus()")
     case dt: NumericType => defineCodeGen(ctx, ev, c => s"(${ctx.javaType(dt)})(-($c))")
-    case dt: IntervalType => defineCodeGen(ctx, ev, c => s"$c.negate()")
+    case dt: CalendarIntervalType => defineCodeGen(ctx, ev, c => s"$c.negate()")
   }
 
   protected override def nullSafeEval(input: Any): Any = {
-    if (dataType.isInstanceOf[IntervalType]) {
-      input.asInstanceOf[Interval].negate()
+    if (dataType.isInstanceOf[CalendarIntervalType]) {
+      input.asInstanceOf[CalendarInterval].negate()
     } else {
       numeric.negate(input)
     }
@@ -121,8 +121,8 @@ case class Add(left: Expression, right: Expression) extends BinaryArithmetic {
   private lazy val numeric = TypeUtils.getNumeric(dataType)
 
   protected override def nullSafeEval(input1: Any, input2: Any): Any = {
-    if (dataType.isInstanceOf[IntervalType]) {
-      input1.asInstanceOf[Interval].add(input2.asInstanceOf[Interval])
+    if (dataType.isInstanceOf[CalendarIntervalType]) {
+      input1.asInstanceOf[CalendarInterval].add(input2.asInstanceOf[CalendarInterval])
     } else {
       numeric.plus(input1, input2)
     }
@@ -134,7 +134,7 @@ case class Add(left: Expression, right: Expression) extends BinaryArithmetic {
     case ByteType | ShortType =>
       defineCodeGen(ctx, ev,
         (eval1, eval2) => s"(${ctx.javaType(dataType)})($eval1 $symbol $eval2)")
-    case IntervalType =>
+    case CalendarIntervalType =>
       defineCodeGen(ctx, ev, (eval1, eval2) => s"$eval1.add($eval2)")
     case _ =>
       defineCodeGen(ctx, ev, (eval1, eval2) => s"$eval1 $symbol $eval2")
@@ -150,8 +150,8 @@ case class Subtract(left: Expression, right: Expression) extends BinaryArithmeti
   private lazy val numeric = TypeUtils.getNumeric(dataType)
 
   protected override def nullSafeEval(input1: Any, input2: Any): Any = {
-    if (dataType.isInstanceOf[IntervalType]) {
-      input1.asInstanceOf[Interval].subtract(input2.asInstanceOf[Interval])
+    if (dataType.isInstanceOf[CalendarIntervalType]) {
+      input1.asInstanceOf[CalendarInterval].subtract(input2.asInstanceOf[CalendarInterval])
     } else {
       numeric.minus(input1, input2)
     }
@@ -163,7 +163,7 @@ case class Subtract(left: Expression, right: Expression) extends BinaryArithmeti
     case ByteType | ShortType =>
       defineCodeGen(ctx, ev,
         (eval1, eval2) => s"(${ctx.javaType(dataType)})($eval1 $symbol $eval2)")
-    case IntervalType =>
+    case CalendarIntervalType =>
       defineCodeGen(ctx, ev, (eval1, eval2) => s"$eval1.subtract($eval2)")
     case _ =>
       defineCodeGen(ctx, ev, (eval1, eval2) => s"$eval1 $symbol $eval2")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 2f02c90b1d5b34e70828e26bf7fd6463594f09e3..092f4c9fb0bd2243f680016ee656f979c4c2f22b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -108,7 +108,7 @@ class CodeGenContext {
       case _ if isPrimitiveType(jt) => s"$row.get${primitiveTypeName(jt)}($ordinal)"
       case StringType => s"$row.getUTF8String($ordinal)"
       case BinaryType => s"$row.getBinary($ordinal)"
-      case IntervalType => s"$row.getInterval($ordinal)"
+      case CalendarIntervalType => s"$row.getInterval($ordinal)"
       case t: StructType => s"$row.getStruct($ordinal, ${t.size})"
       case _ => s"($jt)$row.get($ordinal)"
     }
@@ -150,7 +150,7 @@ class CodeGenContext {
     case dt: DecimalType => "Decimal"
     case BinaryType => "byte[]"
     case StringType => "UTF8String"
-    case IntervalType => "Interval"
+    case CalendarIntervalType => "CalendarInterval"
     case _: StructType => "InternalRow"
     case _: ArrayType => s"scala.collection.Seq"
     case _: MapType => s"scala.collection.Map"
@@ -293,7 +293,7 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
       classOf[UnsafeRow].getName,
       classOf[UTF8String].getName,
       classOf[Decimal].getName,
-      classOf[Interval].getName
+      classOf[CalendarInterval].getName
     ))
     evaluator.setExtendedClass(classOf[GeneratedClass])
     try {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
index 9a4c00e86a3ec81b8e2add10da54fcc9f59f4e4d..dc725c28aaa27057c43b703ff1388765c0891518 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
@@ -39,7 +39,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
   /** Returns true iff we support this data type. */
   def canSupport(dataType: DataType): Boolean = dataType match {
     case t: AtomicType if !t.isInstanceOf[DecimalType] => true
-    case _: IntervalType => true
+    case _: CalendarIntervalType => true
     case t: StructType => t.toSeq.forall(field => canSupport(field.dataType))
     case NullType => true
     case _ => false
@@ -75,7 +75,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
           s" + (${exprs(i).isNull} ? 0 : $StringWriter.getSize(${exprs(i).primitive}))"
         case BinaryType =>
           s" + (${exprs(i).isNull} ? 0 : $BinaryWriter.getSize(${exprs(i).primitive}))"
-        case IntervalType =>
+        case CalendarIntervalType =>
           s" + (${exprs(i).isNull} ? 0 : 16)"
         case _: StructType =>
           s" + (${exprs(i).isNull} ? 0 : $StructWriter.getSize(${exprs(i).primitive}))"
@@ -91,7 +91,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
           s"$cursor += $StringWriter.write($ret, $i, $cursor, ${exprs(i).primitive})"
         case BinaryType =>
           s"$cursor += $BinaryWriter.write($ret, $i, $cursor, ${exprs(i).primitive})"
-        case IntervalType =>
+        case CalendarIntervalType =>
           s"$cursor += $IntervalWriter.write($ret, $i, $cursor, ${exprs(i).primitive})"
         case t: StructType =>
           s"$cursor += $StructWriter.write($ret, $i, $cursor, ${exprs(i).primitive})"
@@ -173,7 +173,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
           s" + (${ev.isNull} ? 0 : $StringWriter.getSize(${ev.primitive}))"
         case BinaryType =>
           s" + (${ev.isNull} ? 0 : $BinaryWriter.getSize(${ev.primitive}))"
-        case IntervalType =>
+        case CalendarIntervalType =>
           s" + (${ev.isNull} ? 0 : 16)"
         case _: StructType =>
           s" + (${ev.isNull} ? 0 : $StructWriter.getSize(${ev.primitive}))"
@@ -189,7 +189,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
           s"$cursor += $StringWriter.write($primitive, $i, $cursor, ${exprs(i).primitive})"
         case BinaryType =>
           s"$cursor += $BinaryWriter.write($primitive, $i, $cursor, ${exprs(i).primitive})"
-        case IntervalType =>
+        case CalendarIntervalType =>
           s"$cursor += $IntervalWriter.write($primitive, $i, $cursor, ${exprs(i).primitive})"
         case t: StructType =>
           s"$cursor += $StructWriter.write($primitive, $i, $cursor, ${exprs(i).primitive})"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index 064a1720c36e8268aa6b290e8bae1618284f35e0..34bad23802ba408162c6909822e3325659a17af8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -42,7 +42,7 @@ object Literal {
     case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType)
     case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType)
     case a: Array[Byte] => Literal(a, BinaryType)
-    case i: Interval => Literal(i, IntervalType)
+    case i: CalendarInterval => Literal(i, CalendarIntervalType)
     case null => Literal(null, NullType)
     case _ =>
       throw new RuntimeException("Unsupported literal type " + v.getClass + " " + v)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala
index 40bf4b299c9900d5047c706bab7eb53b668fcfcc..e0667c629486da0df95e2096a1265ac717d94796 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala
@@ -95,7 +95,7 @@ private[sql] object TypeCollection {
    * Types that include numeric types and interval type. They are only used in unary_minus,
    * unary_positive, add and subtract operations.
    */
-  val NumericAndInterval = TypeCollection(NumericType, IntervalType)
+  val NumericAndInterval = TypeCollection(NumericType, CalendarIntervalType)
 
   def apply(types: AbstractDataType*): TypeCollection = new TypeCollection(types)
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntervalType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/CalendarIntervalType.scala
similarity index 64%
rename from sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntervalType.scala
rename to sql/catalyst/src/main/scala/org/apache/spark/sql/types/CalendarIntervalType.scala
index 87c6e9e6e5e2cf3dc899d0759eebd57cc5b7ab05..3565f52c21f69c82dad892be7fe72036c3b49f21 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntervalType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/CalendarIntervalType.scala
@@ -22,16 +22,19 @@ import org.apache.spark.annotation.DeveloperApi
 
 /**
  * :: DeveloperApi ::
- * The data type representing time intervals.
+ * The data type representing calendar time intervals. The calendar time interval is stored
+ * internally in two components: number of months the number of microseconds.
  *
- * Please use the singleton [[DataTypes.IntervalType]].
+ * Note that calendar intervals are not comparable.
+ *
+ * Please use the singleton [[DataTypes.CalendarIntervalType]].
  */
 @DeveloperApi
-class IntervalType private() extends DataType {
+class CalendarIntervalType private() extends DataType {
 
-  override def defaultSize: Int = 4096
+  override def defaultSize: Int = 16
 
-  private[spark] override def asNullable: IntervalType = this
+  private[spark] override def asNullable: CalendarIntervalType = this
 }
 
-case object IntervalType extends IntervalType
+case object CalendarIntervalType extends CalendarIntervalType
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
index ad15136ee9a2f81abff4273625235e7ce3fe9652..8acd4c685e2bca34429c595a8b09f4eeef958d7c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
@@ -53,7 +53,7 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite {
   }
 
   test("check types for unary arithmetic") {
-    assertError(UnaryMinus('stringField), "type (numeric or interval)")
+    assertError(UnaryMinus('stringField), "type (numeric or calendarinterval)")
     assertError(Abs('stringField), "expected to be of type numeric")
     assertError(BitwiseNot('stringField), "expected to be of type integral")
   }
@@ -78,8 +78,9 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite {
     assertErrorForDifferingTypes(MaxOf('intField, 'booleanField))
     assertErrorForDifferingTypes(MinOf('intField, 'booleanField))
 
-    assertError(Add('booleanField, 'booleanField), "accepts (numeric or interval) type")
-    assertError(Subtract('booleanField, 'booleanField), "accepts (numeric or interval) type")
+    assertError(Add('booleanField, 'booleanField), "accepts (numeric or calendarinterval) type")
+    assertError(Subtract('booleanField, 'booleanField),
+      "accepts (numeric or calendarinterval) type")
     assertError(Multiply('booleanField, 'booleanField), "accepts numeric type")
     assertError(Divide('booleanField, 'booleanField), "accepts numeric type")
     assertError(Remainder('booleanField, 'booleanField), "accepts numeric type")
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala
index 4454d51b758770425d51c7b70d89e3783af44fb5..1d9ee5ddf3a5a307cfb8c5e116334958172c5d60 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala
@@ -116,7 +116,7 @@ class HiveTypeCoercionSuite extends PlanTest {
     shouldNotCast(IntegerType, MapType)
     shouldNotCast(IntegerType, StructType)
 
-    shouldNotCast(IntervalType, StringType)
+    shouldNotCast(CalendarIntervalType, StringType)
 
     // Don't implicitly cast complex types to string.
     shouldNotCast(ArrayType(StringType), StringType)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
index 408353cf70a49b29406e5474aea146b57a51b8d7..0e0213be0f57b4df87caf6f84e41cb7ae22388b5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
@@ -719,12 +719,13 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
   }
 
   test("case between string and interval") {
-    import org.apache.spark.unsafe.types.Interval
+    import org.apache.spark.unsafe.types.CalendarInterval
 
-    checkEvaluation(Cast(Literal("interval -3 month 7 hours"), IntervalType),
-      new Interval(-3, 7 * Interval.MICROS_PER_HOUR))
+    checkEvaluation(Cast(Literal("interval -3 month 7 hours"), CalendarIntervalType),
+      new CalendarInterval(-3, 7 * CalendarInterval.MICROS_PER_HOUR))
     checkEvaluation(Cast(Literal.create(
-      new Interval(15, -3 * Interval.MICROS_PER_DAY), IntervalType), StringType),
+      new CalendarInterval(15, -3 * CalendarInterval.MICROS_PER_DAY), CalendarIntervalType),
+      StringType),
       "interval 1 years 3 months -3 days")
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index b02e60dc85cdd65d70829c8b1a4110e287672084..2294a670c735ff48e85a3528e9d18280722cf427 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -220,137 +220,6 @@ case class TakeOrderedAndProject(
   override def outputOrdering: Seq[SortOrder] = sortOrder
 }
 
-/**
- * :: DeveloperApi ::
- * Performs a sort on-heap.
- * @param global when true performs a global sort of all partitions by shuffling the data first
- *               if necessary.
- */
-@DeveloperApi
-case class Sort(
-    sortOrder: Seq[SortOrder],
-    global: Boolean,
-    child: SparkPlan)
-  extends UnaryNode {
-  override def requiredChildDistribution: Seq[Distribution] =
-    if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
-
-  protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
-    child.execute().mapPartitions( { iterator =>
-      val ordering = newOrdering(sortOrder, child.output)
-      iterator.map(_.copy()).toArray.sorted(ordering).iterator
-    }, preservesPartitioning = true)
-  }
-
-  override def output: Seq[Attribute] = child.output
-
-  override def outputOrdering: Seq[SortOrder] = sortOrder
-}
-
-/**
- * :: DeveloperApi ::
- * Performs a sort, spilling to disk as needed.
- * @param global when true performs a global sort of all partitions by shuffling the data first
- *               if necessary.
- */
-@DeveloperApi
-case class ExternalSort(
-    sortOrder: Seq[SortOrder],
-    global: Boolean,
-    child: SparkPlan)
-  extends UnaryNode {
-
-  override def requiredChildDistribution: Seq[Distribution] =
-    if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
-
-  protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
-    child.execute().mapPartitions( { iterator =>
-      val ordering = newOrdering(sortOrder, child.output)
-      val sorter = new ExternalSorter[InternalRow, Null, InternalRow](ordering = Some(ordering))
-      sorter.insertAll(iterator.map(r => (r.copy, null)))
-      val baseIterator = sorter.iterator.map(_._1)
-      // TODO(marmbrus): The complex type signature below thwarts inference for no reason.
-      CompletionIterator[InternalRow, Iterator[InternalRow]](baseIterator, sorter.stop())
-    }, preservesPartitioning = true)
-  }
-
-  override def output: Seq[Attribute] = child.output
-
-  override def outputOrdering: Seq[SortOrder] = sortOrder
-}
-
-/**
- * :: DeveloperApi ::
- * Optimized version of [[ExternalSort]] that operates on binary data (implemented as part of
- * Project Tungsten).
- *
- * @param global when true performs a global sort of all partitions by shuffling the data first
- *               if necessary.
- * @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will
- *                           spill every `frequency` records.
- */
-@DeveloperApi
-case class UnsafeExternalSort(
-    sortOrder: Seq[SortOrder],
-    global: Boolean,
-    child: SparkPlan,
-    testSpillFrequency: Int = 0)
-  extends UnaryNode {
-
-  private[this] val schema: StructType = child.schema
-
-  override def requiredChildDistribution: Seq[Distribution] =
-    if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
-
-  protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
-    assert(codegenEnabled, "UnsafeExternalSort requires code generation to be enabled")
-    def doSort(iterator: Iterator[InternalRow]): Iterator[InternalRow] = {
-      val ordering = newOrdering(sortOrder, child.output)
-      val boundSortExpression = BindReferences.bindReference(sortOrder.head, child.output)
-      // Hack until we generate separate comparator implementations for ascending vs. descending
-      // (or choose to codegen them):
-      val prefixComparator = {
-        val comp = SortPrefixUtils.getPrefixComparator(boundSortExpression)
-        if (sortOrder.head.direction == Descending) {
-          new PrefixComparator {
-            override def compare(p1: Long, p2: Long): Int = -1 * comp.compare(p1, p2)
-          }
-        } else {
-          comp
-        }
-      }
-      val prefixComputer = {
-        val prefixComputer = SortPrefixUtils.getPrefixComputer(boundSortExpression)
-        new UnsafeExternalRowSorter.PrefixComputer {
-          override def computePrefix(row: InternalRow): Long = prefixComputer(row)
-        }
-      }
-      val sorter = new UnsafeExternalRowSorter(schema, ordering, prefixComparator, prefixComputer)
-      if (testSpillFrequency > 0) {
-        sorter.setTestSpillFrequency(testSpillFrequency)
-      }
-      sorter.sort(iterator)
-    }
-    child.execute().mapPartitions(doSort, preservesPartitioning = true)
-  }
-
-  override def output: Seq[Attribute] = child.output
-
-  override def outputOrdering: Seq[SortOrder] = sortOrder
-
-  override def outputsUnsafeRows: Boolean = true
-}
-
-@DeveloperApi
-object UnsafeExternalSort {
-  /**
-   * Return true if UnsafeExternalSort can sort rows with the given schema, false otherwise.
-   */
-  def supportsSchema(schema: StructType): Boolean = {
-    UnsafeExternalRowSorter.supportsSchema(schema)
-  }
-}
-
 
 /**
  * :: DeveloperApi ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index e73b3704d4dfe80a72d53c1163bae47f15e5cef5..0cdb407ad57b92c46aab3c6cf5dc3016d49c4ad1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -308,7 +308,7 @@ private[sql] object ResolvedDataSource {
       mode: SaveMode,
       options: Map[String, String],
       data: DataFrame): ResolvedDataSource = {
-    if (data.schema.map(_.dataType).exists(_.isInstanceOf[IntervalType])) {
+    if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
       throw new AnalysisException("Cannot save interval data type into external storage.")
     }
     val clazz: Class[_] = lookupDataSource(provider)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
new file mode 100644
index 0000000000000000000000000000000000000000..f82208868c3e31556a8bf613b4f5020638d73c52
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.errors._
+import org.apache.spark.sql.catalyst.expressions.{Descending, BindReferences, Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{UnspecifiedDistribution, OrderedDistribution, Distribution}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.CompletionIterator
+import org.apache.spark.util.collection.ExternalSorter
+import org.apache.spark.util.collection.unsafe.sort.PrefixComparator
+
+////////////////////////////////////////////////////////////////////////////////////////////////////
+// This file defines various sort operators.
+////////////////////////////////////////////////////////////////////////////////////////////////////
+
+
+/**
+ * Performs a sort on-heap.
+ * @param global when true performs a global sort of all partitions by shuffling the data first
+ *               if necessary.
+ */
+case class Sort(
+    sortOrder: Seq[SortOrder],
+    global: Boolean,
+    child: SparkPlan)
+  extends UnaryNode {
+  override def requiredChildDistribution: Seq[Distribution] =
+    if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
+
+  protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
+    child.execute().mapPartitions( { iterator =>
+      val ordering = newOrdering(sortOrder, child.output)
+      iterator.map(_.copy()).toArray.sorted(ordering).iterator
+    }, preservesPartitioning = true)
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputOrdering: Seq[SortOrder] = sortOrder
+}
+
+/**
+ * Performs a sort, spilling to disk as needed.
+ * @param global when true performs a global sort of all partitions by shuffling the data first
+ *               if necessary.
+ */
+case class ExternalSort(
+    sortOrder: Seq[SortOrder],
+    global: Boolean,
+    child: SparkPlan)
+  extends UnaryNode {
+
+  override def requiredChildDistribution: Seq[Distribution] =
+    if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
+
+  protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
+    child.execute().mapPartitions( { iterator =>
+      val ordering = newOrdering(sortOrder, child.output)
+      val sorter = new ExternalSorter[InternalRow, Null, InternalRow](ordering = Some(ordering))
+      sorter.insertAll(iterator.map(r => (r.copy(), null)))
+      val baseIterator = sorter.iterator.map(_._1)
+      // TODO(marmbrus): The complex type signature below thwarts inference for no reason.
+      CompletionIterator[InternalRow, Iterator[InternalRow]](baseIterator, sorter.stop())
+    }, preservesPartitioning = true)
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputOrdering: Seq[SortOrder] = sortOrder
+}
+
+/**
+ * Optimized version of [[ExternalSort]] that operates on binary data (implemented as part of
+ * Project Tungsten).
+ *
+ * @param global when true performs a global sort of all partitions by shuffling the data first
+ *               if necessary.
+ * @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will
+ *                           spill every `frequency` records.
+ */
+case class UnsafeExternalSort(
+    sortOrder: Seq[SortOrder],
+    global: Boolean,
+    child: SparkPlan,
+    testSpillFrequency: Int = 0)
+  extends UnaryNode {
+
+  private[this] val schema: StructType = child.schema
+
+  override def requiredChildDistribution: Seq[Distribution] =
+    if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
+
+  protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
+    assert(codegenEnabled, "UnsafeExternalSort requires code generation to be enabled")
+    def doSort(iterator: Iterator[InternalRow]): Iterator[InternalRow] = {
+      val ordering = newOrdering(sortOrder, child.output)
+      val boundSortExpression = BindReferences.bindReference(sortOrder.head, child.output)
+      // Hack until we generate separate comparator implementations for ascending vs. descending
+      // (or choose to codegen them):
+      val prefixComparator = {
+        val comp = SortPrefixUtils.getPrefixComparator(boundSortExpression)
+        if (sortOrder.head.direction == Descending) {
+          new PrefixComparator {
+            override def compare(p1: Long, p2: Long): Int = -1 * comp.compare(p1, p2)
+          }
+        } else {
+          comp
+        }
+      }
+      val prefixComputer = {
+        val prefixComputer = SortPrefixUtils.getPrefixComputer(boundSortExpression)
+        new UnsafeExternalRowSorter.PrefixComputer {
+          override def computePrefix(row: InternalRow): Long = prefixComputer(row)
+        }
+      }
+      val sorter = new UnsafeExternalRowSorter(schema, ordering, prefixComparator, prefixComputer)
+      if (testSpillFrequency > 0) {
+        sorter.setTestSpillFrequency(testSpillFrequency)
+      }
+      sorter.sort(iterator)
+    }
+    child.execute().mapPartitions(doSort, preservesPartitioning = true)
+  }
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputOrdering: Seq[SortOrder] = sortOrder
+
+  override def outputsUnsafeRows: Boolean = true
+}
+
+@DeveloperApi
+object UnsafeExternalSort {
+  /**
+   * Return true if UnsafeExternalSort can sort rows with the given schema, false otherwise.
+   */
+  def supportsSchema(schema: StructType): Boolean = {
+    UnsafeExternalRowSorter.supportsSchema(schema)
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index d13dde1cdc8b21b576d6f1f0cde2aeb8df4c0a7c..535011fe3db5b2fda292412a368e0f6279382d8a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1577,10 +1577,10 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
   }
 
   test("SPARK-8753: add interval type") {
-    import org.apache.spark.unsafe.types.Interval
+    import org.apache.spark.unsafe.types.CalendarInterval
 
     val df = sql("select interval 3 years -3 month 7 week 123 microseconds")
-    checkAnswer(df, Row(new Interval(12 * 3 - 3, 7L * 1000 * 1000 * 3600 * 24 * 7 + 123 )))
+    checkAnswer(df, Row(new CalendarInterval(12 * 3 - 3, 7L * 1000 * 1000 * 3600 * 24 * 7 + 123 )))
     withTempPath(f => {
       // Currently we don't yet support saving out values of interval data type.
       val e = intercept[AnalysisException] {
@@ -1602,20 +1602,20 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
   }
 
   test("SPARK-8945: add and subtract expressions for interval type") {
-    import org.apache.spark.unsafe.types.Interval
-    import org.apache.spark.unsafe.types.Interval.MICROS_PER_WEEK
+    import org.apache.spark.unsafe.types.CalendarInterval
+    import org.apache.spark.unsafe.types.CalendarInterval.MICROS_PER_WEEK
 
     val df = sql("select interval 3 years -3 month 7 week 123 microseconds as i")
-    checkAnswer(df, Row(new Interval(12 * 3 - 3, 7L * MICROS_PER_WEEK + 123)))
+    checkAnswer(df, Row(new CalendarInterval(12 * 3 - 3, 7L * MICROS_PER_WEEK + 123)))
 
-    checkAnswer(df.select(df("i") + new Interval(2, 123)),
-      Row(new Interval(12 * 3 - 3 + 2, 7L * MICROS_PER_WEEK + 123 + 123)))
+    checkAnswer(df.select(df("i") + new CalendarInterval(2, 123)),
+      Row(new CalendarInterval(12 * 3 - 3 + 2, 7L * MICROS_PER_WEEK + 123 + 123)))
 
-    checkAnswer(df.select(df("i") - new Interval(2, 123)),
-      Row(new Interval(12 * 3 - 3 - 2, 7L * MICROS_PER_WEEK + 123 - 123)))
+    checkAnswer(df.select(df("i") - new CalendarInterval(2, 123)),
+      Row(new CalendarInterval(12 * 3 - 3 - 2, 7L * MICROS_PER_WEEK + 123 - 123)))
 
     // unary minus
     checkAnswer(df.select(-df("i")),
-      Row(new Interval(-(12 * 3 - 3), -(7L * MICROS_PER_WEEK + 123))))
+      Row(new CalendarInterval(-(12 * 3 - 3), -(7L * MICROS_PER_WEEK + 123))))
   }
 }
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/types/Interval.java b/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
similarity index 87%
rename from unsafe/src/main/java/org/apache/spark/unsafe/types/Interval.java
rename to unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
index 71b1a85a818ea58a85c8f563ee4b557dcfc38bf3..92a5e4f86f234abda8297a89f2104b3334791b40 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/types/Interval.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
@@ -24,7 +24,7 @@ import java.util.regex.Pattern;
 /**
  * The internal representation of interval type.
  */
-public final class Interval implements Serializable {
+public final class CalendarInterval implements Serializable {
   public static final long MICROS_PER_MILLI = 1000L;
   public static final long MICROS_PER_SECOND = MICROS_PER_MILLI * 1000;
   public static final long MICROS_PER_MINUTE = MICROS_PER_SECOND * 60;
@@ -58,7 +58,7 @@ public final class Interval implements Serializable {
     }
   }
 
-  public static Interval fromString(String s) {
+  public static CalendarInterval fromString(String s) {
     if (s == null) {
       return null;
     }
@@ -75,40 +75,40 @@ public final class Interval implements Serializable {
       microseconds += toLong(m.group(7)) * MICROS_PER_SECOND;
       microseconds += toLong(m.group(8)) * MICROS_PER_MILLI;
       microseconds += toLong(m.group(9));
-      return new Interval((int) months, microseconds);
+      return new CalendarInterval((int) months, microseconds);
     }
   }
 
   public final int months;
   public final long microseconds;
 
-  public Interval(int months, long microseconds) {
+  public CalendarInterval(int months, long microseconds) {
     this.months = months;
     this.microseconds = microseconds;
   }
 
-  public Interval add(Interval that) {
+  public CalendarInterval add(CalendarInterval that) {
     int months = this.months + that.months;
     long microseconds = this.microseconds + that.microseconds;
-    return new Interval(months, microseconds);
+    return new CalendarInterval(months, microseconds);
   }
 
-  public Interval subtract(Interval that) {
+  public CalendarInterval subtract(CalendarInterval that) {
     int months = this.months - that.months;
     long microseconds = this.microseconds - that.microseconds;
-    return new Interval(months, microseconds);
+    return new CalendarInterval(months, microseconds);
   }
 
-  public Interval negate() {
-    return new Interval(-this.months, -this.microseconds);
+  public CalendarInterval negate() {
+    return new CalendarInterval(-this.months, -this.microseconds);
   }
 
   @Override
   public boolean equals(Object other) {
     if (this == other) return true;
-    if (other == null || !(other instanceof Interval)) return false;
+    if (other == null || !(other instanceof CalendarInterval)) return false;
 
-    Interval o = (Interval) other;
+    CalendarInterval o = (CalendarInterval) other;
     return this.months == o.months && this.microseconds == o.microseconds;
   }
 
diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/types/IntervalSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/types/IntervalSuite.java
index d29517cda66a399d45382d0303222992f6c5d3d5..e6733a7aae6f52972aa75c5ee271b8932f6b6a10 100644
--- a/unsafe/src/test/java/org/apache/spark/unsafe/types/IntervalSuite.java
+++ b/unsafe/src/test/java/org/apache/spark/unsafe/types/IntervalSuite.java
@@ -20,16 +20,16 @@ package org.apache.spark.unsafe.types;
 import org.junit.Test;
 
 import static junit.framework.Assert.*;
-import static org.apache.spark.unsafe.types.Interval.*;
+import static org.apache.spark.unsafe.types.CalendarInterval.*;
 
 public class IntervalSuite {
 
   @Test
   public void equalsTest() {
-    Interval i1 = new Interval(3, 123);
-    Interval i2 = new Interval(3, 321);
-    Interval i3 = new Interval(1, 123);
-    Interval i4 = new Interval(3, 123);
+    CalendarInterval i1 = new CalendarInterval(3, 123);
+    CalendarInterval i2 = new CalendarInterval(3, 321);
+    CalendarInterval i3 = new CalendarInterval(1, 123);
+    CalendarInterval i4 = new CalendarInterval(3, 123);
 
     assertNotSame(i1, i2);
     assertNotSame(i1, i3);
@@ -39,21 +39,21 @@ public class IntervalSuite {
 
   @Test
   public void toStringTest() {
-    Interval i;
+    CalendarInterval i;
 
-    i = new Interval(34, 0);
+    i = new CalendarInterval(34, 0);
     assertEquals(i.toString(), "interval 2 years 10 months");
 
-    i = new Interval(-34, 0);
+    i = new CalendarInterval(-34, 0);
     assertEquals(i.toString(), "interval -2 years -10 months");
 
-    i = new Interval(0, 3 * MICROS_PER_WEEK + 13 * MICROS_PER_HOUR + 123);
+    i = new CalendarInterval(0, 3 * MICROS_PER_WEEK + 13 * MICROS_PER_HOUR + 123);
     assertEquals(i.toString(), "interval 3 weeks 13 hours 123 microseconds");
 
-    i = new Interval(0, -3 * MICROS_PER_WEEK - 13 * MICROS_PER_HOUR - 123);
+    i = new CalendarInterval(0, -3 * MICROS_PER_WEEK - 13 * MICROS_PER_HOUR - 123);
     assertEquals(i.toString(), "interval -3 weeks -13 hours -123 microseconds");
 
-    i = new Interval(34, 3 * MICROS_PER_WEEK + 13 * MICROS_PER_HOUR + 123);
+    i = new CalendarInterval(34, 3 * MICROS_PER_WEEK + 13 * MICROS_PER_HOUR + 123);
     assertEquals(i.toString(), "interval 2 years 10 months 3 weeks 13 hours 123 microseconds");
   }
 
@@ -72,33 +72,33 @@ public class IntervalSuite {
     String input;
 
     input = "interval   -5  years  23   month";
-    Interval result = new Interval(-5 * 12 + 23, 0);
-    assertEquals(Interval.fromString(input), result);
+    CalendarInterval result = new CalendarInterval(-5 * 12 + 23, 0);
+    assertEquals(CalendarInterval.fromString(input), result);
 
     input = "interval   -5  years  23   month   ";
-    assertEquals(Interval.fromString(input), result);
+    assertEquals(CalendarInterval.fromString(input), result);
 
     input = "  interval   -5  years  23   month   ";
-    assertEquals(Interval.fromString(input), result);
+    assertEquals(CalendarInterval.fromString(input), result);
 
     // Error cases
     input = "interval   3month 1 hour";
-    assertEquals(Interval.fromString(input), null);
+    assertEquals(CalendarInterval.fromString(input), null);
 
     input = "interval 3 moth 1 hour";
-    assertEquals(Interval.fromString(input), null);
+    assertEquals(CalendarInterval.fromString(input), null);
 
     input = "interval";
-    assertEquals(Interval.fromString(input), null);
+    assertEquals(CalendarInterval.fromString(input), null);
 
     input = "int";
-    assertEquals(Interval.fromString(input), null);
+    assertEquals(CalendarInterval.fromString(input), null);
 
     input = "";
-    assertEquals(Interval.fromString(input), null);
+    assertEquals(CalendarInterval.fromString(input), null);
 
     input = null;
-    assertEquals(Interval.fromString(input), null);
+    assertEquals(CalendarInterval.fromString(input), null);
   }
 
   @Test
@@ -106,18 +106,18 @@ public class IntervalSuite {
     String input = "interval 3 month 1 hour";
     String input2 = "interval 2 month 100 hour";
 
-    Interval interval = Interval.fromString(input);
-    Interval interval2 = Interval.fromString(input2);
+    CalendarInterval interval = CalendarInterval.fromString(input);
+    CalendarInterval interval2 = CalendarInterval.fromString(input2);
 
-    assertEquals(interval.add(interval2), new Interval(5, 101 * MICROS_PER_HOUR));
+    assertEquals(interval.add(interval2), new CalendarInterval(5, 101 * MICROS_PER_HOUR));
 
     input = "interval -10 month -81 hour";
     input2 = "interval 75 month 200 hour";
 
-    interval = Interval.fromString(input);
-    interval2 = Interval.fromString(input2);
+    interval = CalendarInterval.fromString(input);
+    interval2 = CalendarInterval.fromString(input2);
 
-    assertEquals(interval.add(interval2), new Interval(65, 119 * MICROS_PER_HOUR));
+    assertEquals(interval.add(interval2), new CalendarInterval(65, 119 * MICROS_PER_HOUR));
   }
 
   @Test
@@ -125,25 +125,25 @@ public class IntervalSuite {
     String input = "interval 3 month 1 hour";
     String input2 = "interval 2 month 100 hour";
 
-    Interval interval = Interval.fromString(input);
-    Interval interval2 = Interval.fromString(input2);
+    CalendarInterval interval = CalendarInterval.fromString(input);
+    CalendarInterval interval2 = CalendarInterval.fromString(input2);
 
-    assertEquals(interval.subtract(interval2), new Interval(1, -99 * MICROS_PER_HOUR));
+    assertEquals(interval.subtract(interval2), new CalendarInterval(1, -99 * MICROS_PER_HOUR));
 
     input = "interval -10 month -81 hour";
     input2 = "interval 75 month 200 hour";
 
-    interval = Interval.fromString(input);
-    interval2 = Interval.fromString(input2);
+    interval = CalendarInterval.fromString(input);
+    interval2 = CalendarInterval.fromString(input2);
 
-    assertEquals(interval.subtract(interval2), new Interval(-85, -281 * MICROS_PER_HOUR));
+    assertEquals(interval.subtract(interval2), new CalendarInterval(-85, -281 * MICROS_PER_HOUR));
   }
 
   private void testSingleUnit(String unit, int number, int months, long microseconds) {
     String input1 = "interval " + number + " " + unit;
     String input2 = "interval " + number + " " + unit + "s";
-    Interval result = new Interval(months, microseconds);
-    assertEquals(Interval.fromString(input1), result);
-    assertEquals(Interval.fromString(input2), result);
+    CalendarInterval result = new CalendarInterval(months, microseconds);
+    assertEquals(CalendarInterval.fromString(input1), result);
+    assertEquals(CalendarInterval.fromString(input2), result);
   }
 }