diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 9f54d709a022dbbe44b715168e74b1b096216cd2..8549187a66369c370585c85b21614448e7e9f313 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -242,6 +242,9 @@ package object dsl {
       def array(dataType: DataType): AttributeReference =
         AttributeReference(s, ArrayType(dataType), nullable = true)()
 
+      def array(arrayType: ArrayType): AttributeReference =
+        AttributeReference(s, arrayType)()
+
       /** Creates a new AttributeReference of type map */
       def map(keyType: DataType, valueType: DataType): AttributeReference =
         map(MapType(keyType, valueType))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
index 74dfd10189d816c86405e44c978354466b856af7..82ab111aa2259e89c3fad14d03f1544fab8b7c9d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
@@ -475,6 +475,12 @@ case class OptimizeCodegen(conf: CatalystConf) extends Rule[LogicalPlan] {
 object SimplifyCasts extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
     case Cast(e, dataType) if e.dataType == dataType => e
+    case c @ Cast(e, dataType) => (e.dataType, dataType) match {
+      case (ArrayType(from, false), ArrayType(to, true)) if from == to => e
+      case (MapType(fromKey, fromValue, false), MapType(toKey, toValue, true))
+        if fromKey == toKey && fromValue == toValue => e
+      case _ => c
+      }
   }
 }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCastsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCastsSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..e84f11272d214b209d116669df907dbaf7231636
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyCastsSuite.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types._
+
+class SimplifyCastsSuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches = Batch("SimplifyCasts", FixedPoint(50), SimplifyCasts) :: Nil
+  }
+
+  test("non-nullable element array to nullable element array cast") {
+    val input = LocalRelation('a.array(ArrayType(IntegerType, false)))
+    val plan = input.select('a.cast(ArrayType(IntegerType, true)).as("casted")).analyze
+    val optimized = Optimize.execute(plan)
+    val expected = input.select('a.as("casted")).analyze
+    comparePlans(optimized, expected)
+  }
+
+  test("nullable element to non-nullable element array cast") {
+    val input = LocalRelation('a.array(ArrayType(IntegerType, true)))
+    val plan = input.select('a.cast(ArrayType(IntegerType, false)).as("casted")).analyze
+    val optimized = Optimize.execute(plan)
+    comparePlans(optimized, plan)
+  }
+
+  test("non-nullable value map to nullable value map cast") {
+    val input = LocalRelation('m.map(MapType(StringType, StringType, false)))
+    val plan = input.select('m.cast(MapType(StringType, StringType, true))
+      .as("casted")).analyze
+    val optimized = Optimize.execute(plan)
+    val expected = input.select('m.as("casted")).analyze
+    comparePlans(optimized, expected)
+  }
+
+  test("nullable value map to non-nullable value map cast") {
+    val input = LocalRelation('m.map(MapType(StringType, StringType, true)))
+    val plan = input.select('m.cast(MapType(StringType, StringType, false))
+      .as("casted")).analyze
+    val optimized = Optimize.execute(plan)
+    comparePlans(optimized, plan)
+  }
+}
+