diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d8701812ebedacddb945e2340cbc09c110c4b84d..fe15052b62478dc07f9903dc0d6c8032988198ce 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -24,7 +24,6 @@ import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
 import java.util.concurrent.ConcurrentMap
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
 
-import scala.annotation.tailrec
 import scala.collection.JavaConverters._
 import scala.collection.Map
 import scala.collection.generic.Growable
@@ -34,7 +33,7 @@ import scala.reflect.{classTag, ClassTag}
 import scala.util.control.NonFatal
 
 import com.google.common.collect.MapMaker
-import org.apache.commons.lang.SerializationUtils
+import org.apache.commons.lang3.SerializationUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
@@ -334,7 +333,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     override protected def childValue(parent: Properties): Properties = {
       // Note: make a clone such that changes in the parent properties aren't reflected in
       // the those of the children threads, which has confusing semantics (SPARK-10563).
-      SerializationUtils.clone(parent).asInstanceOf[Properties]
+      SerializationUtils.clone(parent)
     }
     override protected def initialValue(): Properties = new Properties()
   }
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 270104f85b83813b2ca52f93a14c9e8321d6cf09..9a35183c637336a9a9f3297f05eb20255d6e8104 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -210,6 +210,12 @@ This file is divided into 3 sections:
     scala.collection.JavaConverters._ and use .asScala / .asJava methods</customMessage>
   </check>
 
+  <check customId="commonslang2" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true">
+    <parameters><parameter name="regex">org\.apache\.commons\.lang\.</parameter></parameters>
+    <customMessage>Use Commons Lang 3 classes (package org.apache.commons.lang3.*) instead
+    of Commons Lang 2 (package org.apache.commons.lang.*)</customMessage>
+  </check>
+
   <check level="error" class="org.scalastyle.scalariform.ImportOrderChecker" enabled="true">
     <parameters>
       <parameter name="groups">java,scala,3rdParty,spark</parameter>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
index 83fa447cf8c85a1aad395b58d44362990341ab08..66c4bf29ea4b23145462773d648234ed08e84adc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 3cc7a1a3cae8335cc95467aa7a246616a2e8ca6c..072445af4f41f45fca1774433eda51a1dc9c49e3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -23,7 +23,7 @@ import scala.collection.Map
 import scala.collection.mutable.Stack
 import scala.reflect.ClassTag
 
-import org.apache.commons.lang.ClassUtils
+import org.apache.commons.lang3.ClassUtils
 import org.json4s.JsonAST._
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 662a03d3b56ae73d9b00c208987687bd9d68a841..a18b881c78a09885c8983ec2de71270dbc808d35 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.parquet;
 
 import java.io.IOException;
 
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
@@ -228,7 +227,7 @@ public class VectorizedColumnReader {
             column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getInt(i)));
           }
         } else {
-          throw new NotImplementedException("Unimplemented type: " + column.dataType());
+          throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
         }
         break;
 
@@ -239,7 +238,7 @@ public class VectorizedColumnReader {
             column.putLong(i, dictionary.decodeToLong(dictionaryIds.getInt(i)));
           }
         } else {
-          throw new NotImplementedException("Unimplemented type: " + column.dataType());
+          throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
         }
         break;
 
@@ -262,7 +261,7 @@ public class VectorizedColumnReader {
             column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
           }
         } else {
-          throw new NotImplementedException();
+          throw new UnsupportedOperationException();
         }
         break;
       case BINARY:
@@ -293,12 +292,12 @@ public class VectorizedColumnReader {
             column.putByteArray(i, v.getBytes());
           }
         } else {
-          throw new NotImplementedException();
+          throw new UnsupportedOperationException();
         }
         break;
 
       default:
-        throw new NotImplementedException("Unsupported type: " + descriptor.getType());
+        throw new UnsupportedOperationException("Unsupported type: " + descriptor.getType());
     }
   }
 
@@ -327,7 +326,7 @@ public class VectorizedColumnReader {
       defColumn.readShorts(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
     } else {
-      throw new NotImplementedException("Unimplemented type: " + column.dataType());
+      throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
     }
   }
 
@@ -360,7 +359,7 @@ public class VectorizedColumnReader {
       defColumn.readDoubles(
           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
     } else {
-      throw new NotImplementedException("Unimplemented type: " + column.dataType());
+      throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
     }
   }
 
@@ -381,7 +380,7 @@ public class VectorizedColumnReader {
         }
       }
     } else {
-      throw new NotImplementedException("Unimplemented type: " + column.dataType());
+      throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
     }
   }
 
@@ -417,7 +416,7 @@ public class VectorizedColumnReader {
         }
       }
     } else {
-      throw new NotImplementedException("Unimplemented type: " + column.dataType());
+      throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
     }
   }
 
@@ -459,13 +458,13 @@ public class VectorizedColumnReader {
       @SuppressWarnings("deprecation")
       Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression
       if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) {
-        throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
+        throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
       }
       this.dataColumn = new VectorizedRleValuesReader();
       this.useDictionary = true;
     } else {
       if (dataEncoding != Encoding.PLAIN) {
-        throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
+        throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
       }
       this.dataColumn = new VectorizedPlainValuesReader();
       this.useDictionary = false;
@@ -485,7 +484,7 @@ public class VectorizedColumnReader {
 
     // Initialize the decoders.
     if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
-      throw new NotImplementedException("Unsupported encoding: " + page.getDlEncoding());
+      throw new UnsupportedOperationException("Unsupported encoding: " + page.getDlEncoding());
     }
     int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
     this.defColumn = new VectorizedRleValuesReader(bitWidth);
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 80c84b13365f022f28fe351e9bc04ecf8882eb34..bbbb796aca0def09ccb09cc74b47d5ffa17075e8 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -20,7 +20,6 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.io.api.Binary;
 
@@ -100,7 +99,7 @@ public abstract class ColumnVector implements AutoCloseable {
 
     @Override
     public ArrayData copy() {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
 
     // TODO: this is extremely expensive.
@@ -171,7 +170,7 @@ public abstract class ColumnVector implements AutoCloseable {
           }
         }
       } else {
-        throw new NotImplementedException("Type " + dt);
+        throw new UnsupportedOperationException("Type " + dt);
       }
       return list;
     }
@@ -181,7 +180,7 @@ public abstract class ColumnVector implements AutoCloseable {
 
     @Override
     public boolean getBoolean(int ordinal) {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
 
     @Override
@@ -189,7 +188,7 @@ public abstract class ColumnVector implements AutoCloseable {
 
     @Override
     public short getShort(int ordinal) {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
 
     @Override
@@ -200,7 +199,7 @@ public abstract class ColumnVector implements AutoCloseable {
 
     @Override
     public float getFloat(int ordinal) {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
 
     @Override
@@ -240,12 +239,12 @@ public abstract class ColumnVector implements AutoCloseable {
 
     @Override
     public MapData getMap(int ordinal) {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
 
     @Override
     public Object get(int ordinal, DataType dataType) {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
   }
 
@@ -562,7 +561,7 @@ public abstract class ColumnVector implements AutoCloseable {
    * Returns the value for rowId.
    */
   public MapData getMap(int ordinal) {
-    throw new NotImplementedException();
+    throw new UnsupportedOperationException();
   }
 
   /**
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index f50c35fc64a75577b04f2761a813c5c38f1cf5cc..2fa476b9cfb7198971b1a2152b59347225fc56df 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -23,8 +23,6 @@ import java.sql.Date;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.commons.lang.NotImplementedException;
-
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -112,7 +110,7 @@ public class ColumnVectorUtils {
       }
       return result;
     } else {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
   }
 
@@ -161,7 +159,7 @@ public class ColumnVectorUtils {
       } else if (t instanceof DateType) {
         dst.appendInt(DateTimeUtils.fromJavaDate((Date)o));
       } else {
-        throw new NotImplementedException("Type " + t);
+        throw new UnsupportedOperationException("Type " + t);
       }
     }
   }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index 8cece73faa4b924c7f65d6df55abfbf4f548c943..f3afa8f938f866d26e6d8b7103052ba86a511874 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.vectorized;
 import java.math.BigDecimal;
 import java.util.*;
 
-import org.apache.commons.lang.NotImplementedException;
-
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow;
@@ -166,7 +164,7 @@ public final class ColumnarBatch {
 
     @Override
     public boolean anyNull() {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
 
     @Override
@@ -227,12 +225,12 @@ public final class ColumnarBatch {
 
     @Override
     public MapData getMap(int ordinal) {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
 
     @Override
     public Object get(int ordinal, DataType dataType) {
-      throw new NotImplementedException();
+      throw new UnsupportedOperationException();
     }
 
     @Override
@@ -258,7 +256,7 @@ public final class ColumnarBatch {
           setDecimal(ordinal, Decimal.apply((BigDecimal) value, t.precision(), t.scale()),
               t.precision());
         } else {
-          throw new NotImplementedException("Datatype not supported " + dt);
+          throw new UnsupportedOperationException("Datatype not supported " + dt);
         }
       }
     }
@@ -430,7 +428,7 @@ public final class ColumnarBatch {
    */
   public void setColumn(int ordinal, ColumnVector column) {
     if (column instanceof OffHeapColumnVector) {
-      throw new NotImplementedException("Need to ref count columns.");
+      throw new UnsupportedOperationException("Need to ref count columns.");
     }
     columns[ordinal] = column;
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index e2c23a4ba8670732e74c7a4fae0565cbae639bc9..09203e69983dab1822f8a7f24e86489f1646394b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 02866c76cb7aae100d002589a4bfd16a0243df60..079e122a5a85ab962bc876edc74faddfdf855248 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar
 
 import scala.collection.JavaConverters._
 
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index cc3e807e7a8401c9e3351d7ed2c14f227e4e63bf..47bfaa86021d67867b03e7bd9a3b52bce09e9cee 100644
--- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -29,7 +29,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.common.cli.HiveFileProcessor;
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index b524af9578b7223515a46cd0e7de64b96ea936bc..6046426fdf8cb2739cbb3691d79e797e79aeea53 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.Queue
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
-import org.apache.commons.lang.SerializationUtils
+import org.apache.commons.lang3.SerializationUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
@@ -579,8 +579,7 @@ class StreamingContext private[streaming] (
               sparkContext.setCallSite(startSite.get)
               sparkContext.clearJobGroup()
               sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
-              savedProperties.set(SerializationUtils.clone(
-                sparkContext.localProperties.get()).asInstanceOf[Properties])
+              savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
               scheduler.start()
             }
             state = StreamingContextState.ACTIVE
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index ac18f73ea86aae522d9f68b3f59f666776055f2a..79d6254eb372bb2276609ddeb34e92d85313e1f3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -17,13 +17,12 @@
 
 package org.apache.spark.streaming.scheduler
 
-import java.util.Properties
 import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 
 import scala.collection.JavaConverters._
 import scala.util.Failure
 
-import org.apache.commons.lang.SerializationUtils
+import org.apache.commons.lang3.SerializationUtils
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.{PairRDDFunctions, RDD}
@@ -219,8 +218,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
     def run() {
       val oldProps = ssc.sparkContext.getLocalProperties
       try {
-        ssc.sparkContext.setLocalProperties(
-          SerializationUtils.clone(ssc.savedProperties.get()).asInstanceOf[Properties])
+        ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
         val formattedTime = UIUtils.formatBatchTime(
           job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
         val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"