diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 1ebdf49348a68f0f4a1cee921870c4d46ff566cd..f239b33e44ee6f7eeacacdc7d8ebc9e391974f14 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -54,6 +54,10 @@ trait FunctionRegistry {
 
   /** Checks if a function with a given name exists. */
   def functionExists(name: String): Boolean = lookupFunction(name).isDefined
+
+  /** Clear all registered functions. */
+  def clear(): Unit
+
 }
 
 class SimpleFunctionRegistry extends FunctionRegistry {
@@ -93,6 +97,10 @@ class SimpleFunctionRegistry extends FunctionRegistry {
     functionBuilders.remove(name).isDefined
   }
 
+  override def clear(): Unit = {
+    functionBuilders.clear()
+  }
+
   def copy(): SimpleFunctionRegistry = synchronized {
     val registry = new SimpleFunctionRegistry
     functionBuilders.iterator.foreach { case (name, (info, builder)) =>
@@ -132,6 +140,10 @@ object EmptyFunctionRegistry extends FunctionRegistry {
     throw new UnsupportedOperationException
   }
 
+  override def clear(): Unit = {
+    throw new UnsupportedOperationException
+  }
+
 }
 
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index c08ffbb235e4173da2f9e83f1dc7b51ae25eaa35..62a3b1c10590fa230edbc0f5f0e59e4ae18b9456 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -304,11 +304,18 @@ class SessionCatalog(
     dbTables ++ _tempTables
   }
 
+  // TODO: It's strange that we have both refresh and invalidate here.
+
   /**
    * Refresh the cache entry for a metastore table, if any.
    */
   def refreshTable(name: TableIdentifier): Unit = { /* no-op */ }
 
+  /**
+   * Invalidate the cache entry for a metastore table, if any.
+   */
+  def invalidateTable(name: TableIdentifier): Unit = { /* no-op */ }
+
   /**
    * Drop all existing temporary tables.
    * For testing only.
@@ -595,6 +602,11 @@ class SessionCatalog(
     }
   }
 
+  /**
+   * List all functions in the specified database, including temporary functions.
+   */
+  def listFunctions(db: String): Seq[FunctionIdentifier] = listFunctions(db, "*")
+
   /**
    * List all matching functions in the specified database, including temporary functions.
    */
@@ -609,4 +621,34 @@ class SessionCatalog(
     // So, the returned list may have two entries for the same function.
     dbFunctions ++ loadedFunctions
   }
+
+
+  // -----------------
+  // | Other methods |
+  // -----------------
+
+  /**
+   * Drop all existing databases (except "default") along with all associated tables,
+   * partitions and functions, and set the current database to "default".
+   *
+   * This is mainly used for tests.
+   */
+  private[sql] def reset(): Unit = {
+    val default = "default"
+    listDatabases().filter(_ != default).foreach { db =>
+      dropDatabase(db, ignoreIfNotExists = false, cascade = true)
+    }
+    tempTables.clear()
+    functionRegistry.clear()
+    // restore built-in functions
+    FunctionRegistry.builtin.listFunction().foreach { f =>
+      val expressionInfo = FunctionRegistry.builtin.lookupFunction(f)
+      val functionBuilder = FunctionRegistry.builtin.lookupFunctionBuilder(f)
+      require(expressionInfo.isDefined, s"built-in function '$f' is missing expression info")
+      require(functionBuilder.isDefined, s"built-in function '$f' is missing function builder")
+      functionRegistry.registerFunction(f, expressionInfo.get, functionBuilder.get)
+    }
+    setCurrentDatabase(default)
+  }
+
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringKeyHashMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringKeyHashMap.scala
index 191d5e6399fc95b67287977a1b47dd1d7e9b3ab5..d5d151a5802f69e05dcfef5fc1b77088daeeb497 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringKeyHashMap.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringKeyHashMap.scala
@@ -41,4 +41,6 @@ class StringKeyHashMap[T](normalizer: (String) => String) {
   def remove(key: String): Option[T] = base.remove(normalizer(key))
 
   def iterator: Iterator[(String, T)] = base.toIterator
+
+  def clear(): Unit = base.clear()
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 382cc61fac8807b0f9866f1d7b698d2318d65ecb..d3086fc91e3ec0068156ef0f9115f6593e114c2d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, AstBuilder, ParseException}
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@@ -378,8 +378,7 @@ class SparkSqlAstBuilder extends AstBuilder {
   override def visitRenameTable(ctx: RenameTableContext): LogicalPlan = withOrigin(ctx) {
     AlterTableRename(
       visitTableIdentifier(ctx.from),
-      visitTableIdentifier(ctx.to))(
-      command(ctx))
+      visitTableIdentifier(ctx.to))
   }
 
   /**
@@ -395,8 +394,7 @@ class SparkSqlAstBuilder extends AstBuilder {
       ctx: SetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
     AlterTableSetProperties(
       visitTableIdentifier(ctx.tableIdentifier),
-      visitTablePropertyList(ctx.tablePropertyList))(
-      command(ctx))
+      visitTablePropertyList(ctx.tablePropertyList))
   }
 
   /**
@@ -404,17 +402,16 @@ class SparkSqlAstBuilder extends AstBuilder {
    *
    * For example:
    * {{{
-   *   ALTER TABLE table UNSET TBLPROPERTIES IF EXISTS ('comment', 'key');
-   *   ALTER VIEW view UNSET TBLPROPERTIES IF EXISTS ('comment', 'key');
+   *   ALTER TABLE table UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');
+   *   ALTER VIEW view UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');
    * }}}
    */
   override def visitUnsetTableProperties(
       ctx: UnsetTablePropertiesContext): LogicalPlan = withOrigin(ctx) {
     AlterTableUnsetProperties(
       visitTableIdentifier(ctx.tableIdentifier),
-      visitTablePropertyList(ctx.tablePropertyList),
-      ctx.EXISTS != null)(
-      command(ctx))
+      visitTablePropertyList(ctx.tablePropertyList).keys.toSeq,
+      ctx.EXISTS != null)
   }
 
   /**
@@ -432,116 +429,41 @@ class SparkSqlAstBuilder extends AstBuilder {
       Option(ctx.STRING).map(string),
       Option(ctx.tablePropertyList).map(visitTablePropertyList),
       // TODO a partition spec is allowed to have optional values. This is currently violated.
-      Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))(
-      command(ctx))
+      Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
   }
 
-  /**
-   * Create an [[AlterTableStorageProperties]] command.
-   *
-   * For example:
-   * {{{
-   *   ALTER TABLE table CLUSTERED BY (col, ...) [SORTED BY (col, ...)] INTO n BUCKETS;
-   * }}}
-   */
+  // TODO: don't even bother parsing alter table commands related to bucketing and skewing
+
   override def visitBucketTable(ctx: BucketTableContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableStorageProperties(
-      visitTableIdentifier(ctx.tableIdentifier),
-      visitBucketSpec(ctx.bucketSpec))(
-      command(ctx))
+    throw new AnalysisException(
+      "Operation not allowed: ALTER TABLE ... CLUSTERED BY ... INTO N BUCKETS")
   }
 
-  /**
-   * Create an [[AlterTableNotClustered]] command.
-   *
-   * For example:
-   * {{{
-   *   ALTER TABLE table NOT CLUSTERED;
-   * }}}
-   */
   override def visitUnclusterTable(ctx: UnclusterTableContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableNotClustered(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
+    throw new AnalysisException("Operation not allowed: ALTER TABLE ... NOT CLUSTERED")
   }
 
-  /**
-   * Create an [[AlterTableNotSorted]] command.
-   *
-   * For example:
-   * {{{
-   *   ALTER TABLE table NOT SORTED;
-   * }}}
-   */
   override def visitUnsortTable(ctx: UnsortTableContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableNotSorted(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
+    throw new AnalysisException("Operation not allowed: ALTER TABLE ... NOT SORTED")
   }
 
-  /**
-   * Create an [[AlterTableSkewed]] command.
-   *
-   * For example:
-   * {{{
-   *   ALTER TABLE table SKEWED BY (col1, col2)
-   *   ON ((col1_value, col2_value) [, (col1_value, col2_value), ...])
-   *   [STORED AS DIRECTORIES];
-   * }}}
-   */
   override def visitSkewTable(ctx: SkewTableContext): LogicalPlan = withOrigin(ctx) {
-    val table = visitTableIdentifier(ctx.tableIdentifier)
-    val (cols, values, storedAsDirs) = visitSkewSpec(ctx.skewSpec)
-    AlterTableSkewed(table, cols, values, storedAsDirs)(command(ctx))
+    throw new AnalysisException("Operation not allowed: ALTER TABLE ... SKEWED BY ...")
   }
 
-  /**
-   * Create an [[AlterTableNotSorted]] command.
-   *
-   * For example:
-   * {{{
-   *   ALTER TABLE table NOT SKEWED;
-   * }}}
-   */
   override def visitUnskewTable(ctx: UnskewTableContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableNotSkewed(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
+    throw new AnalysisException("Operation not allowed: ALTER TABLE ... NOT SKEWED")
   }
 
-  /**
-   * Create an [[AlterTableNotStoredAsDirs]] command.
-   *
-   * For example:
-   * {{{
-   *   ALTER TABLE table NOT STORED AS DIRECTORIES
-   * }}}
-   */
   override def visitUnstoreTable(ctx: UnstoreTableContext): LogicalPlan = withOrigin(ctx) {
-    AlterTableNotStoredAsDirs(visitTableIdentifier(ctx.tableIdentifier))(command(ctx))
+    throw new AnalysisException(
+      "Operation not allowed: ALTER TABLE ... NOT STORED AS DIRECTORIES")
   }
 
-  /**
-   * Create an [[AlterTableSkewedLocation]] command.
-   *
-   * For example:
-   * {{{
-   *   ALTER TABLE table SET SKEWED LOCATION (col1="loc1" [, (col2, col3)="loc2", ...] );
-   * }}}
-   */
   override def visitSetTableSkewLocations(
       ctx: SetTableSkewLocationsContext): LogicalPlan = withOrigin(ctx) {
-    val skewedMap = ctx.skewedLocationList.skewedLocation.asScala.flatMap {
-      slCtx =>
-        val location = string(slCtx.STRING)
-        if (slCtx.constant != null) {
-          Seq(visitStringConstant(slCtx.constant) -> location)
-        } else {
-          // TODO this is similar to what was in the original implementation. However this does not
-          // make to much sense to me since we should be storing a tuple of values (not column
-          // names) for which we want a dedicated storage location.
-          visitConstantList(slCtx.constantList).map(_ -> location)
-        }
-    }.toMap
-
-    AlterTableSkewedLocation(
-      visitTableIdentifier(ctx.tableIdentifier),
-      skewedMap)(
-      command(ctx))
+    throw new AnalysisException(
+      "Operation not allowed: ALTER TABLE ... SET SKEWED LOCATION ...")
   }
 
   /**
@@ -703,8 +625,7 @@ class SparkSqlAstBuilder extends AstBuilder {
     AlterTableSetLocation(
       visitTableIdentifier(ctx.tableIdentifier),
       Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
-      visitLocationSpec(ctx.locationSpec))(
-      command(ctx))
+      visitLocationSpec(ctx.locationSpec))
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 68968819104e462832ff473cd4d6c1cecb3ecf40..0d38c41a3f45df920c7c5a84a6fa703011f94a65 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -18,12 +18,11 @@
 package org.apache.spark.sql.execution.command
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.execution.datasources.BucketSpec
 import org.apache.spark.sql.types._
 
 
@@ -175,67 +174,133 @@ case class DescribeDatabase(
   }
 }
 
-/** Rename in ALTER TABLE/VIEW: change the name of a table/view to a different name. */
+/**
+ * A command that renames a table/view.
+ *
+ * The syntax of this command is:
+ * {{{
+ *    ALTER TABLE table1 RENAME TO table2;
+ *    ALTER VIEW view1 RENAME TO view2;
+ * }}}
+ */
 case class AlterTableRename(
     oldName: TableIdentifier,
-    newName: TableIdentifier)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+    newName: TableIdentifier)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
+    catalog.invalidateTable(oldName)
+    catalog.renameTable(oldName, newName)
+    Seq.empty[Row]
+  }
 
-/** Set Properties in ALTER TABLE/VIEW: add metadata to a table/view. */
+}
+
+/**
+ * A command that sets table/view properties.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...);
+ *   ALTER VIEW view1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...);
+ * }}}
+ */
 case class AlterTableSetProperties(
     tableName: TableIdentifier,
-    properties: Map[String, String])(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+    properties: Map[String, String])
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
+    val table = catalog.getTable(tableName)
+    val newProperties = table.properties ++ properties
+    if (DDLUtils.isDatasourceTable(newProperties)) {
+      throw new AnalysisException(
+        "alter table properties is not supported for tables defined using the datasource API")
+    }
+    val newTable = table.copy(properties = newProperties)
+    catalog.alterTable(newTable)
+    Seq.empty[Row]
+  }
+
+}
 
-/** Unset Properties in ALTER TABLE/VIEW: remove metadata from a table/view. */
+/**
+ * A command that unsets table/view properties.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...);
+ *   ALTER VIEW view1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...);
+ * }}}
+ */
 case class AlterTableUnsetProperties(
     tableName: TableIdentifier,
-    properties: Map[String, String],
-    ifExists: Boolean)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+    propKeys: Seq[String],
+    ifExists: Boolean)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
+    val table = catalog.getTable(tableName)
+    if (DDLUtils.isDatasourceTable(table)) {
+      throw new AnalysisException(
+        "alter table properties is not supported for datasource tables")
+    }
+    if (!ifExists) {
+      propKeys.foreach { k =>
+        if (!table.properties.contains(k)) {
+          throw new AnalysisException(
+            s"attempted to unset non-existent property '$k' in table '$tableName'")
+        }
+      }
+    }
+    val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) }
+    val newTable = table.copy(properties = newProperties)
+    catalog.alterTable(newTable)
+    Seq.empty[Row]
+  }
+
+}
 
+/**
+ * A command that sets the serde class and/or serde properties of a table/view.
+ *
+ * The syntax of this command is:
+ * {{{
+ *   ALTER TABLE table [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props];
+ *   ALTER TABLE table [PARTITION spec] SET SERDEPROPERTIES serde_properties;
+ * }}}
+ */
 case class AlterTableSerDeProperties(
     tableName: TableIdentifier,
     serdeClassName: Option[String],
     serdeProperties: Option[Map[String, String]],
-    partition: Option[Map[String, String]])(sql: String)
-  extends NativeDDLCommand(sql) with Logging
-
-case class AlterTableStorageProperties(
-    tableName: TableIdentifier,
-    buckets: BucketSpec)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+    partition: Option[Map[String, String]])
+  extends RunnableCommand {
 
-case class AlterTableNotClustered(
-    tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
+  // should never happen if we parsed things correctly
+  require(serdeClassName.isDefined || serdeProperties.isDefined,
+    "alter table attempted to set neither serde class name nor serde properties")
 
-case class AlterTableNotSorted(
-    tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
+    val table = catalog.getTable(tableName)
+    // Do not support setting serde for datasource tables
+    if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) {
+      throw new AnalysisException(
+        "alter table serde is not supported for datasource tables")
+    }
+    val newTable = table.withNewStorage(
+      serde = serdeClassName.orElse(table.storage.serde),
+      serdeProperties = table.storage.serdeProperties ++ serdeProperties.getOrElse(Map()))
+    catalog.alterTable(newTable)
+    Seq.empty[Row]
+  }
 
-case class AlterTableSkewed(
-    tableName: TableIdentifier,
-    // e.g. (dt, country)
-    skewedCols: Seq[String],
-    // e.g. ('2008-08-08', 'us), ('2009-09-09', 'uk')
-    skewedValues: Seq[Seq[String]],
-    storedAsDirs: Boolean)(sql: String)
-  extends NativeDDLCommand(sql) with Logging {
-
-  require(skewedValues.forall(_.size == skewedCols.size),
-    "number of columns in skewed values do not match number of skewed columns provided")
 }
 
-case class AlterTableNotSkewed(
-    tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
-
-case class AlterTableNotStoredAsDirs(
-    tableName: TableIdentifier)(sql: String) extends NativeDDLCommand(sql) with Logging
-
-case class AlterTableSkewedLocation(
-    tableName: TableIdentifier,
-    skewedMap: Map[String, String])(sql: String)
-  extends NativeDDLCommand(sql) with Logging
-
 /**
  * Add Partition in ALTER TABLE/VIEW: add the table/view partitions.
  * 'partitionSpecsAndLocs': the syntax of ALTER VIEW is identical to ALTER TABLE,
@@ -292,11 +357,53 @@ case class AlterTableSetFileFormat(
     genericFormat: Option[String])(sql: String)
   extends NativeDDLCommand(sql) with Logging
 
+/**
+ * A command that sets the location of a table or a partition.
+ *
+ * For normal tables, this just sets the location URI in the table/partition's storage format.
+ * For datasource tables, this sets a "path" parameter in the table/partition's serde properties.
+ *
+ * The syntax of this command is:
+ * {{{
+ *    ALTER TABLE table_name [PARTITION partition_spec] SET LOCATION "loc";
+ * }}}
+ */
 case class AlterTableSetLocation(
     tableName: TableIdentifier,
     partitionSpec: Option[TablePartitionSpec],
-    location: String)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
+    location: String)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
+    val table = catalog.getTable(tableName)
+    partitionSpec match {
+      case Some(spec) =>
+        // Partition spec is specified, so we set the location only for this partition
+        val part = catalog.getPartition(tableName, spec)
+        val newPart =
+          if (DDLUtils.isDatasourceTable(table)) {
+            part.copy(storage = part.storage.copy(
+              serdeProperties = part.storage.serdeProperties ++ Map("path" -> location)))
+          } else {
+            part.copy(storage = part.storage.copy(locationUri = Some(location)))
+          }
+        catalog.alterPartitions(tableName, Seq(newPart))
+      case None =>
+        // No partition spec is specified, so we set the location for the table itself
+        val newTable =
+          if (DDLUtils.isDatasourceTable(table)) {
+            table.withNewStorage(
+              serdeProperties = table.storage.serdeProperties ++ Map("path" -> location))
+          } else {
+            table.withNewStorage(locationUri = Some(location))
+          }
+        catalog.alterTable(newTable)
+    }
+    Seq.empty[Row]
+  }
+
+}
 
 case class AlterTableTouch(
     tableName: TableIdentifier,
@@ -341,3 +448,16 @@ case class AlterTableReplaceCol(
     restrict: Boolean,
     cascade: Boolean)(sql: String)
   extends NativeDDLCommand(sql) with Logging
+
+
+private object DDLUtils {
+
+  def isDatasourceTable(props: Map[String, String]): Boolean = {
+    props.contains("spark.sql.sources.provider")
+  }
+
+  def isDatasourceTable(table: CatalogTable): Boolean = {
+    isDatasourceTable(table.properties)
+  }
+}
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index c42e8e723383daaf06bfcaf252ba993a1d780cd2..618c9a58a677a90f82747d48ef20690567eeea39 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -205,10 +205,10 @@ class DDLCommandSuite extends PlanTest {
     val parsed_view = parser.parsePlan(sql_view)
     val expected_table = AlterTableRename(
       TableIdentifier("table_name", None),
-      TableIdentifier("new_table_name", None))(sql_table)
+      TableIdentifier("new_table_name", None))
     val expected_view = AlterTableRename(
       TableIdentifier("table_name", None),
-      TableIdentifier("new_table_name", None))(sql_view)
+      TableIdentifier("new_table_name", None))
     comparePlans(parsed_table, expected_table)
     comparePlans(parsed_view, expected_view)
   }
@@ -235,14 +235,14 @@ class DDLCommandSuite extends PlanTest {
 
     val tableIdent = TableIdentifier("table_name", None)
     val expected1_table = AlterTableSetProperties(
-      tableIdent, Map("test" -> "test", "comment" -> "new_comment"))(sql1_table)
+      tableIdent, Map("test" -> "test", "comment" -> "new_comment"))
     val expected2_table = AlterTableUnsetProperties(
-      tableIdent, Map("comment" -> null, "test" -> null), ifExists = false)(sql2_table)
+      tableIdent, Seq("comment", "test"), ifExists = false)
     val expected3_table = AlterTableUnsetProperties(
-      tableIdent, Map("comment" -> null, "test" -> null), ifExists = true)(sql3_table)
-    val expected1_view = expected1_table.copy()(sql = sql1_view)
-    val expected2_view = expected2_table.copy()(sql = sql2_view)
-    val expected3_view = expected3_table.copy()(sql = sql3_view)
+      tableIdent, Seq("comment", "test"), ifExists = true)
+    val expected1_view = expected1_table
+    val expected2_view = expected2_table
+    val expected3_view = expected3_table
 
     comparePlans(parsed1_table, expected1_table)
     comparePlans(parsed2_table, expected2_table)
@@ -282,97 +282,24 @@ class DDLCommandSuite extends PlanTest {
     val parsed5 = parser.parsePlan(sql5)
     val tableIdent = TableIdentifier("table_name", None)
     val expected1 = AlterTableSerDeProperties(
-      tableIdent, Some("org.apache.class"), None, None)(sql1)
+      tableIdent, Some("org.apache.class"), None, None)
     val expected2 = AlterTableSerDeProperties(
       tableIdent,
       Some("org.apache.class"),
       Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
-      None)(sql2)
+      None)
     val expected3 = AlterTableSerDeProperties(
-      tableIdent, None, Some(Map("columns" -> "foo,bar", "field.delim" -> ",")), None)(sql3)
+      tableIdent, None, Some(Map("columns" -> "foo,bar", "field.delim" -> ",")), None)
     val expected4 = AlterTableSerDeProperties(
       tableIdent,
       Some("org.apache.class"),
       Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
-      Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))(sql4)
+      Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))
     val expected5 = AlterTableSerDeProperties(
       tableIdent,
       None,
       Some(Map("columns" -> "foo,bar", "field.delim" -> ",")),
-      Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))(sql5)
-    comparePlans(parsed1, expected1)
-    comparePlans(parsed2, expected2)
-    comparePlans(parsed3, expected3)
-    comparePlans(parsed4, expected4)
-    comparePlans(parsed5, expected5)
-  }
-
-  test("alter table: storage properties") {
-    val sql1 = "ALTER TABLE table_name CLUSTERED BY (dt, country) INTO 10 BUCKETS"
-    val sql2 = "ALTER TABLE table_name CLUSTERED BY (dt, country) SORTED BY " +
-      "(dt, country DESC) INTO 10 BUCKETS"
-    val sql3 = "ALTER TABLE table_name NOT CLUSTERED"
-    val sql4 = "ALTER TABLE table_name NOT SORTED"
-    val parsed1 = parser.parsePlan(sql1)
-    val parsed2 = parser.parsePlan(sql2)
-    val parsed3 = parser.parsePlan(sql3)
-    val parsed4 = parser.parsePlan(sql4)
-    val tableIdent = TableIdentifier("table_name", None)
-    val cols = List("dt", "country")
-    // TODO: also test the sort directions once we keep track of that
-    val expected1 = AlterTableStorageProperties(
-      tableIdent, BucketSpec(10, cols, Nil))(sql1)
-    val expected2 = AlterTableStorageProperties(
-      tableIdent, BucketSpec(10, cols, cols))(sql2)
-    val expected3 = AlterTableNotClustered(tableIdent)(sql3)
-    val expected4 = AlterTableNotSorted(tableIdent)(sql4)
-    comparePlans(parsed1, expected1)
-    comparePlans(parsed2, expected2)
-    comparePlans(parsed3, expected3)
-    comparePlans(parsed4, expected4)
-  }
-
-  test("alter table: skewed") {
-    val sql1 =
-      """
-       |ALTER TABLE table_name SKEWED BY (dt, country) ON
-       |(('2008-08-08', 'us'), ('2009-09-09', 'uk'), ('2010-10-10', 'cn')) STORED AS DIRECTORIES
-      """.stripMargin
-    val sql2 =
-      """
-       |ALTER TABLE table_name SKEWED BY (dt, country) ON
-       |('2008-08-08', 'us') STORED AS DIRECTORIES
-      """.stripMargin
-    val sql3 =
-      """
-       |ALTER TABLE table_name SKEWED BY (dt, country) ON
-       |(('2008-08-08', 'us'), ('2009-09-09', 'uk'))
-      """.stripMargin
-    val sql4 = "ALTER TABLE table_name NOT SKEWED"
-    val sql5 = "ALTER TABLE table_name NOT STORED AS DIRECTORIES"
-    val parsed1 = parser.parsePlan(sql1)
-    val parsed2 = parser.parsePlan(sql2)
-    val parsed3 = parser.parsePlan(sql3)
-    val parsed4 = parser.parsePlan(sql4)
-    val parsed5 = parser.parsePlan(sql5)
-    val tableIdent = TableIdentifier("table_name", None)
-    val expected1 = AlterTableSkewed(
-      tableIdent,
-      Seq("dt", "country"),
-      Seq(List("2008-08-08", "us"), List("2009-09-09", "uk"), List("2010-10-10", "cn")),
-      storedAsDirs = true)(sql1)
-    val expected2 = AlterTableSkewed(
-      tableIdent,
-      Seq("dt", "country"),
-      Seq(List("2008-08-08", "us")),
-      storedAsDirs = true)(sql2)
-    val expected3 = AlterTableSkewed(
-      tableIdent,
-      Seq("dt", "country"),
-      Seq(List("2008-08-08", "us"), List("2009-09-09", "uk")),
-      storedAsDirs = false)(sql3)
-    val expected4 = AlterTableNotSkewed(tableIdent)(sql4)
-    val expected5 = AlterTableNotStoredAsDirs(tableIdent)(sql5)
+      Some(Map("test" -> null, "dt" -> "2008-08-08", "country" -> "us")))
     comparePlans(parsed1, expected1)
     comparePlans(parsed2, expected2)
     comparePlans(parsed3, expected3)
@@ -380,30 +307,6 @@ class DDLCommandSuite extends PlanTest {
     comparePlans(parsed5, expected5)
   }
 
-  test("alter table: skewed location") {
-    val sql1 =
-      """
-       |ALTER TABLE table_name SET SKEWED LOCATION
-       |('123'='location1', 'test'='location2')
-      """.stripMargin
-    val sql2 =
-      """
-       |ALTER TABLE table_name SET SKEWED LOCATION
-       |(('2008-08-08', 'us')='location1', 'test'='location2')
-      """.stripMargin
-    val parsed1 = parser.parsePlan(sql1)
-    val parsed2 = parser.parsePlan(sql2)
-    val tableIdent = TableIdentifier("table_name", None)
-    val expected1 = AlterTableSkewedLocation(
-      tableIdent,
-      Map("123" -> "location1", "test" -> "location2"))(sql1)
-    val expected2 = AlterTableSkewedLocation(
-      tableIdent,
-      Map("2008-08-08" -> "location1", "us" -> "location1", "test" -> "location2"))(sql2)
-    comparePlans(parsed1, expected1)
-    comparePlans(parsed2, expected2)
-  }
-
   // ALTER TABLE table_name ADD [IF NOT EXISTS] PARTITION partition_spec
   // [LOCATION 'location1'] partition_spec [LOCATION 'location2'] ...;
   test("alter table: add partition") {
@@ -615,11 +518,11 @@ class DDLCommandSuite extends PlanTest {
     val expected1 = AlterTableSetLocation(
       tableIdent,
       None,
-      "new location")(sql1)
+      "new location")
     val expected2 = AlterTableSetLocation(
       tableIdent,
       Some(Map("dt" -> "2008-08-08", "country" -> "us")),
-      "new location")(sql2)
+      "new location")
     comparePlans(parsed1, expected1)
     comparePlans(parsed2, expected2)
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 885a04af59176f32e6f92e8b9fef33529be6d661..d8e2c94a8a4fae4549e56fa517d959d3ed5a014e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -19,34 +19,63 @@ package org.apache.spark.sql.execution.command
 
 import java.io.File
 
+import org.scalatest.BeforeAndAfterEach
+
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
-import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalog.TablePartitionSpec
 import org.apache.spark.sql.test.SharedSQLContext
 
-class DDLSuite extends QueryTest with SharedSQLContext {
-
+class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   private val escapedIdentifier = "`(.+)`".r
 
+  override def afterEach(): Unit = {
+    try {
+      // drop all databases, tables and functions after each test
+      sqlContext.sessionState.catalog.reset()
+    } finally {
+      super.afterEach()
+    }
+  }
+
   /**
    * Strip backticks, if any, from the string.
    */
-  def cleanIdentifier(ident: String): String = {
+  private def cleanIdentifier(ident: String): String = {
     ident match {
       case escapedIdentifier(i) => i
       case plainIdent => plainIdent
     }
   }
 
-  /**
-   * Drops database `databaseName` after calling `f`.
-   */
-  private def withDatabase(dbNames: String*)(f: => Unit): Unit = {
-    try f finally {
-      dbNames.foreach { name =>
-        sqlContext.sql(s"DROP DATABASE IF EXISTS $name CASCADE")
-      }
-      sqlContext.sessionState.catalog.setCurrentDatabase("default")
+  private def assertUnsupported(query: String): Unit = {
+    val e = intercept[AnalysisException] {
+      sql(query)
     }
+    assert(e.getMessage.toLowerCase.contains("operation not allowed"))
+  }
+
+  private def createDatabase(catalog: SessionCatalog, name: String): Unit = {
+    catalog.createDatabase(CatalogDatabase(name, "", "", Map()), ignoreIfExists = false)
+  }
+
+  private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {
+    catalog.createTable(CatalogTable(
+      identifier = name,
+      tableType = CatalogTableType.EXTERNAL_TABLE,
+      storage = CatalogStorageFormat(None, None, None, None, Map()),
+      schema = Seq()), ignoreIfExists = false)
+  }
+
+  private def createTablePartition(
+      catalog: SessionCatalog,
+      spec: TablePartitionSpec,
+      tableName: TableIdentifier): Unit = {
+    val part = CatalogTablePartition(spec, CatalogStorageFormat(None, None, None, None, Map()))
+    catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false)
   }
 
   test("Create/Drop Database") {
@@ -55,7 +84,7 @@ class DDLSuite extends QueryTest with SharedSQLContext {
     val databaseNames = Seq("db1", "`database`")
 
     databaseNames.foreach { dbName =>
-      withDatabase(dbName) {
+      try {
         val dbNameWithoutBackTicks = cleanIdentifier(dbName)
 
         sql(s"CREATE DATABASE $dbName")
@@ -67,6 +96,8 @@ class DDLSuite extends QueryTest with SharedSQLContext {
           Map.empty))
         sql(s"DROP DATABASE $dbName CASCADE")
         assert(!catalog.databaseExists(dbNameWithoutBackTicks))
+      } finally {
+        catalog.reset()
       }
     }
   }
@@ -76,8 +107,8 @@ class DDLSuite extends QueryTest with SharedSQLContext {
     val databaseNames = Seq("db1", "`database`")
 
     databaseNames.foreach { dbName =>
-      val dbNameWithoutBackTicks = cleanIdentifier(dbName)
-      withDatabase(dbName) {
+      try {
+        val dbNameWithoutBackTicks = cleanIdentifier(dbName)
         sql(s"CREATE DATABASE $dbName")
         val db1 = catalog.getDatabase(dbNameWithoutBackTicks)
         assert(db1 == CatalogDatabase(
@@ -90,6 +121,8 @@ class DDLSuite extends QueryTest with SharedSQLContext {
           sql(s"CREATE DATABASE $dbName")
         }.getMessage
         assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists."))
+      } finally {
+        catalog.reset()
       }
     }
   }
@@ -99,7 +132,7 @@ class DDLSuite extends QueryTest with SharedSQLContext {
     val databaseNames = Seq("db1", "`database`")
 
     databaseNames.foreach { dbName =>
-      withDatabase(dbName) {
+      try {
         val dbNameWithoutBackTicks = cleanIdentifier(dbName)
         val location =
           System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db"
@@ -129,6 +162,8 @@ class DDLSuite extends QueryTest with SharedSQLContext {
             Row("Description", "") ::
             Row("Location", location) ::
             Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
+      } finally {
+        catalog.reset()
       }
     }
   }
@@ -159,6 +194,131 @@ class DDLSuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  // TODO: test drop database in restrict mode
+
+  test("alter table: rename") {
+    val catalog = sqlContext.sessionState.catalog
+    val tableIdent1 = TableIdentifier("tab1", Some("dbx"))
+    val tableIdent2 = TableIdentifier("tab2", Some("dbx"))
+    createDatabase(catalog, "dbx")
+    createDatabase(catalog, "dby")
+    createTable(catalog, tableIdent1)
+    assert(catalog.listTables("dbx") == Seq(tableIdent1))
+    sql("ALTER TABLE dbx.tab1 RENAME TO dbx.tab2")
+    assert(catalog.listTables("dbx") == Seq(tableIdent2))
+    catalog.setCurrentDatabase("dbx")
+    // rename without explicitly specifying database
+    sql("ALTER TABLE tab2 RENAME TO tab1")
+    assert(catalog.listTables("dbx") == Seq(tableIdent1))
+    // table to rename does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE dbx.does_not_exist RENAME TO dbx.tab2")
+    }
+    // destination database is different
+    intercept[AnalysisException] {
+      sql("ALTER TABLE dbx.tab1 RENAME TO dby.tab2")
+    }
+  }
+
+  test("alter table: set location") {
+    testSetLocation(isDatasourceTable = false)
+  }
+
+  test("alter table: set location (datasource table)") {
+    testSetLocation(isDatasourceTable = true)
+  }
+
+  test("alter table: set properties") {
+    val catalog = sqlContext.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    assert(catalog.getTable(tableIdent).properties.isEmpty)
+    // set table properties
+    sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('andrew' = 'or14', 'kor' = 'bel')")
+    assert(catalog.getTable(tableIdent).properties == Map("andrew" -> "or14", "kor" -> "bel"))
+    // set table properties without explicitly specifying database
+    catalog.setCurrentDatabase("dbx")
+    sql("ALTER TABLE tab1 SET TBLPROPERTIES ('kor' = 'belle', 'kar' = 'bol')")
+    assert(catalog.getTable(tableIdent).properties ==
+      Map("andrew" -> "or14", "kor" -> "belle", "kar" -> "bol"))
+    // table to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE does_not_exist SET TBLPROPERTIES ('winner' = 'loser')")
+    }
+    // throw exception for datasource tables
+    convertToDatasourceTable(catalog, tableIdent)
+    val e = intercept[AnalysisException] {
+      sql("ALTER TABLE tab1 SET TBLPROPERTIES ('sora' = 'bol')")
+    }
+    assert(e.getMessage.contains("datasource"))
+  }
+
+  test("alter table: unset properties") {
+    val catalog = sqlContext.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    // unset table properties
+    sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan')")
+    sql("ALTER TABLE dbx.tab1 UNSET TBLPROPERTIES ('j')")
+    assert(catalog.getTable(tableIdent).properties == Map("p" -> "an", "c" -> "lan"))
+    // unset table properties without explicitly specifying database
+    catalog.setCurrentDatabase("dbx")
+    sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('p')")
+    assert(catalog.getTable(tableIdent).properties == Map("c" -> "lan"))
+    // table to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE does_not_exist UNSET TBLPROPERTIES ('c' = 'lan')")
+    }
+    // property to unset does not exist
+    val e = intercept[AnalysisException] {
+      sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('c', 'xyz')")
+    }
+    assert(e.getMessage.contains("xyz"))
+    // property to unset does not exist, but "IF EXISTS" is specified
+    sql("ALTER TABLE tab1 UNSET TBLPROPERTIES IF EXISTS ('c', 'xyz')")
+    assert(catalog.getTable(tableIdent).properties.isEmpty)
+    // throw exception for datasource tables
+    convertToDatasourceTable(catalog, tableIdent)
+    val e1 = intercept[AnalysisException] {
+      sql("ALTER TABLE tab1 UNSET TBLPROPERTIES ('sora')")
+    }
+    assert(e1.getMessage.contains("datasource"))
+  }
+
+  test("alter table: set serde") {
+    testSetSerde(isDatasourceTable = false)
+  }
+
+  test("alter table: set serde (datasource table)") {
+    testSetSerde(isDatasourceTable = true)
+  }
+
+  test("alter table: bucketing is not supported") {
+    val catalog = sqlContext.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    assertUnsupported("ALTER TABLE dbx.tab1 CLUSTERED BY (blood, lemon, grape) INTO 11 BUCKETS")
+    assertUnsupported("ALTER TABLE dbx.tab1 CLUSTERED BY (fuji) SORTED BY (grape) INTO 5 BUCKETS")
+    assertUnsupported("ALTER TABLE dbx.tab1 NOT CLUSTERED")
+    assertUnsupported("ALTER TABLE dbx.tab1 NOT SORTED")
+  }
+
+  test("alter table: skew is not supported") {
+    val catalog = sqlContext.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    assertUnsupported("ALTER TABLE dbx.tab1 SKEWED BY (dt, country) ON " +
+      "(('2008-08-08', 'us'), ('2009-09-09', 'uk'), ('2010-10-10', 'cn'))")
+    assertUnsupported("ALTER TABLE dbx.tab1 SKEWED BY (dt, country) ON " +
+      "(('2008-08-08', 'us'), ('2009-09-09', 'uk')) STORED AS DIRECTORIES")
+    assertUnsupported("ALTER TABLE dbx.tab1 NOT SKEWED")
+    assertUnsupported("ALTER TABLE dbx.tab1 NOT STORED AS DIRECTORIES")
+  }
+
   // TODO: ADD a testcase for Drop Database in Restric when we can create tables in SQLContext
 
   test("show tables") {
@@ -206,29 +366,129 @@ class DDLSuite extends QueryTest with SharedSQLContext {
   }
 
   test("show databases") {
-    withDatabase("showdb1A", "showdb2B") {
-      sql("CREATE DATABASE showdb1A")
-      sql("CREATE DATABASE showdb2B")
+    sql("CREATE DATABASE showdb1A")
+    sql("CREATE DATABASE showdb2B")
 
-      assert(
-        sql("SHOW DATABASES").count() >= 2)
+    assert(
+      sql("SHOW DATABASES").count() >= 2)
 
-      checkAnswer(
-        sql("SHOW DATABASES LIKE '*db1A'"),
-        Row("showdb1A") :: Nil)
+    checkAnswer(
+      sql("SHOW DATABASES LIKE '*db1A'"),
+      Row("showdb1A") :: Nil)
 
-      checkAnswer(
-        sql("SHOW DATABASES LIKE 'showdb1A'"),
-        Row("showdb1A") :: Nil)
+    checkAnswer(
+      sql("SHOW DATABASES LIKE 'showdb1A'"),
+      Row("showdb1A") :: Nil)
 
-      checkAnswer(
-        sql("SHOW DATABASES LIKE '*db1A|*db2B'"),
-        Row("showdb1A") ::
-          Row("showdb2B") :: Nil)
+    checkAnswer(
+      sql("SHOW DATABASES LIKE '*db1A|*db2B'"),
+      Row("showdb1A") ::
+        Row("showdb2B") :: Nil)
 
-      checkAnswer(
-        sql("SHOW DATABASES LIKE 'non-existentdb'"),
-        Nil)
+    checkAnswer(
+      sql("SHOW DATABASES LIKE 'non-existentdb'"),
+      Nil)
+  }
+
+  private def convertToDatasourceTable(
+      catalog: SessionCatalog,
+      tableIdent: TableIdentifier): Unit = {
+    catalog.alterTable(catalog.getTable(tableIdent).copy(
+      properties = Map("spark.sql.sources.provider" -> "csv")))
+  }
+
+  private def testSetLocation(isDatasourceTable: Boolean): Unit = {
+    val catalog = sqlContext.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    val partSpec = Map("a" -> "1")
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    createTablePartition(catalog, partSpec, tableIdent)
+    if (isDatasourceTable) {
+      convertToDatasourceTable(catalog, tableIdent)
+    }
+    assert(catalog.getTable(tableIdent).storage.locationUri.isEmpty)
+    assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty)
+    assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isEmpty)
+    assert(catalog.getPartition(tableIdent, partSpec).storage.serdeProperties.isEmpty)
+    // Verify that the location is set to the expected string
+    def verifyLocation(expected: String, spec: Option[TablePartitionSpec] = None): Unit = {
+      val storageFormat = spec
+        .map { s => catalog.getPartition(tableIdent, s).storage }
+        .getOrElse { catalog.getTable(tableIdent).storage }
+      if (isDatasourceTable) {
+        assert(storageFormat.serdeProperties.get("path") === Some(expected))
+      } else {
+        assert(storageFormat.locationUri === Some(expected))
+      }
+    }
+    // set table location
+    sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
+    verifyLocation("/path/to/your/lovely/heart")
+    // set table partition location
+    sql("ALTER TABLE dbx.tab1 PARTITION (a='1') SET LOCATION '/path/to/part/ways'")
+    verifyLocation("/path/to/part/ways", Some(partSpec))
+    // set table location without explicitly specifying database
+    catalog.setCurrentDatabase("dbx")
+    sql("ALTER TABLE tab1 SET LOCATION '/swanky/steak/place'")
+    verifyLocation("/swanky/steak/place")
+    // set table partition location without explicitly specifying database
+    sql("ALTER TABLE tab1 PARTITION (a='1') SET LOCATION 'vienna'")
+    verifyLocation("vienna", Some(partSpec))
+    // table to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE dbx.does_not_exist SET LOCATION '/mister/spark'")
+    }
+    // partition to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE dbx.tab1 PARTITION (b='2') SET LOCATION '/mister/spark'")
     }
   }
+
+  private def testSetSerde(isDatasourceTable: Boolean): Unit = {
+    val catalog = sqlContext.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    if (isDatasourceTable) {
+      convertToDatasourceTable(catalog, tableIdent)
+    }
+    assert(catalog.getTable(tableIdent).storage.serde.isEmpty)
+    assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty)
+    // set table serde and/or properties (should fail on datasource tables)
+    if (isDatasourceTable) {
+      val e1 = intercept[AnalysisException] {
+        sql("ALTER TABLE dbx.tab1 SET SERDE 'whatever'")
+      }
+      val e2 = intercept[AnalysisException] {
+        sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.madoop' " +
+          "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
+      }
+      assert(e1.getMessage.contains("datasource"))
+      assert(e2.getMessage.contains("datasource"))
+    } else {
+      sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.jadoop'")
+      assert(catalog.getTable(tableIdent).storage.serde == Some("org.apache.jadoop"))
+      assert(catalog.getTable(tableIdent).storage.serdeProperties.isEmpty)
+      sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.madoop' " +
+        "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
+      assert(catalog.getTable(tableIdent).storage.serde == Some("org.apache.madoop"))
+      assert(catalog.getTable(tableIdent).storage.serdeProperties ==
+        Map("k" -> "v", "kay" -> "vee"))
+    }
+    // set serde properties only
+    sql("ALTER TABLE dbx.tab1 SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
+    assert(catalog.getTable(tableIdent).storage.serdeProperties ==
+      Map("k" -> "vvv", "kay" -> "vee"))
+    // set things without explicitly specifying database
+    catalog.setCurrentDatabase("dbx")
+    sql("ALTER TABLE tab1 SET SERDEPROPERTIES ('kay' = 'veee')")
+    assert(catalog.getTable(tableIdent).storage.serdeProperties ==
+      Map("k" -> "vvv", "kay" -> "veee"))
+    // table to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE does_not_exist SET SERDEPROPERTIES ('x' = 'y')")
+    }
+  }
+
 }
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 4b4f88ece00ea8109bde039ce01da5308674fe15..b01f556f0addebb08d67f53a83f2dd24236dd2e5 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -360,6 +360,12 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "show_create_table_serde",
     "show_create_table_view",
 
+    // These tests try to change how a table is bucketed, which we don't support
+    "alter4",
+    "sort_merge_join_desc_5",
+    "sort_merge_join_desc_6",
+    "sort_merge_join_desc_7",
+
     // Index commands are not supported
     "drop_index",
     "drop_index_removes_partition_dirs",
@@ -381,7 +387,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "alias_casted_column",
     "alter2",
     "alter3",
-    "alter4",
     "alter5",
     "alter_merge_2",
     "alter_partition_format_loc",
@@ -880,9 +885,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "sort_merge_join_desc_2",
     "sort_merge_join_desc_3",
     "sort_merge_join_desc_4",
-    "sort_merge_join_desc_5",
-    "sort_merge_join_desc_6",
-    "sort_merge_join_desc_7",
     "stats0",
     "stats_aggregator_error_1",
     "stats_empty_partition",
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index d315f39a91e233ce80d291865cae2491d0481b1a..0cccc22e5a624b914996468668cbe3c69b4cfba2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -94,7 +94,7 @@ private[sql] class HiveSessionCatalog(
     metastoreCatalog.refreshTable(name)
   }
 
-  def invalidateTable(name: TableIdentifier): Unit = {
+  override def invalidateTable(name: TableIdentifier): Unit = {
     metastoreCatalog.invalidateTable(name)
   }