diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3f7cba6dbcdb5f005da02c4f2262d982d5b97be3..4ef90546a2452902e46ab107691f6fd1c715ec23 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -347,6 +347,19 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     value
   }
 
+  /** Control our logLevel. This overrides any user-defined log settings.
+   * @param logLevel The desired log level as a string.
+   * Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
+   */
+  def setLogLevel(logLevel: String) {
+    val validLevels = Seq("ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN")
+    if (!validLevels.contains(logLevel)) {
+      throw new IllegalArgumentException(
+        s"Supplied level $logLevel did not match one of: ${validLevels.mkString(",")}")
+    }
+    Utils.setLogLevel(org.apache.log4j.Level.toLevel(logLevel))
+  }
+
   try {
     _conf = config.clone()
     _conf.validateSettings()
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 3be6783bba49d3712e1b66ffab1496da16fd7985..02e49a853c5f78e6304c0d9654f75d81e63cbf8c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -755,6 +755,14 @@ class JavaSparkContext(val sc: SparkContext)
    */
   def getLocalProperty(key: String): String = sc.getLocalProperty(key)
 
+  /** Control our logLevel. This overrides any user-defined log settings.
+   * @param logLevel The desired log level as a string.
+   * Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
+   */
+  def setLogLevel(logLevel: String) {
+    sc.setLogLevel(logLevel)
+  }
+
   /**
    * Assigns a group ID to all the jobs started by this thread until the group ID is set to a
    * different value or cleared.
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 4b5a5df5ef7b778b34f51eca2b41f490ea7fac61..844f0cd22d95d54dd3e86f3525e22e94ab960339 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2022,6 +2022,13 @@ private[spark] object Utils extends Logging {
     }
   }
 
+  /**
+   * configure a new log4j level
+   */
+  def setLogLevel(l: org.apache.log4j.Level) {
+    org.apache.log4j.Logger.getRootLogger().setLevel(l)
+  }
+
   /**
    * config a log4j properties used for testsuite
    */
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 62a3cbcdf69eac928ef1e544f666f53145347347..651ead6ff1de2a07f1d879c6d30343ebf4b46b6e 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -35,9 +35,10 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.Logging
 import org.apache.spark.SparkConf
 
-class UtilsSuite extends FunSuite with ResetSystemProperties {
+class UtilsSuite extends FunSuite with ResetSystemProperties with Logging {
 
   test("timeConversion") {
     // Test -1
@@ -68,7 +69,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
     intercept[NumberFormatException] {
       Utils.timeStringAsMs("600l")
     }
-    
+
     intercept[NumberFormatException] {
       Utils.timeStringAsMs("This breaks 600s")
     }
@@ -99,7 +100,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
     assert(Utils.byteStringAsGb("1k") === 0)
     assert(Utils.byteStringAsGb("1t") === ByteUnit.TiB.toGiB(1))
     assert(Utils.byteStringAsGb("1p") === ByteUnit.PiB.toGiB(1))
-    
+
     assert(Utils.byteStringAsMb("1") === 1)
     assert(Utils.byteStringAsMb("1m") === 1)
     assert(Utils.byteStringAsMb("1048575b") === 0)
@@ -118,7 +119,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
     assert(Utils.byteStringAsKb("1g") === ByteUnit.GiB.toKiB(1))
     assert(Utils.byteStringAsKb("1t") === ByteUnit.TiB.toKiB(1))
     assert(Utils.byteStringAsKb("1p") === ByteUnit.PiB.toKiB(1))
-    
+
     assert(Utils.byteStringAsBytes("1") === 1)
     assert(Utils.byteStringAsBytes("1k") === ByteUnit.KiB.toBytes(1))
     assert(Utils.byteStringAsBytes("1m") === ByteUnit.MiB.toBytes(1))
@@ -127,17 +128,17 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
     assert(Utils.byteStringAsBytes("1p") === ByteUnit.PiB.toBytes(1))
 
     // Overflow handling, 1073741824p exceeds Long.MAX_VALUE if converted straight to Bytes
-    // This demonstrates that we can have e.g 1024^3 PB without overflowing. 
+    // This demonstrates that we can have e.g 1024^3 PB without overflowing.
     assert(Utils.byteStringAsGb("1073741824p") === ByteUnit.PiB.toGiB(1073741824))
     assert(Utils.byteStringAsMb("1073741824p") === ByteUnit.PiB.toMiB(1073741824))
-    
+
     // Run this to confirm it doesn't throw an exception
-    assert(Utils.byteStringAsBytes("9223372036854775807") === 9223372036854775807L) 
+    assert(Utils.byteStringAsBytes("9223372036854775807") === 9223372036854775807L)
     assert(ByteUnit.PiB.toPiB(9223372036854775807L) === 9223372036854775807L)
-    
+
     // Test overflow exception
     intercept[IllegalArgumentException] {
-      // This value exceeds Long.MAX when converted to bytes 
+      // This value exceeds Long.MAX when converted to bytes
       Utils.byteStringAsBytes("9223372036854775808")
     }
 
@@ -146,22 +147,22 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
       // This value exceeds Long.MAX when converted to TB
       ByteUnit.PiB.toTiB(9223372036854775807L)
     }
-    
+
     // Test fractional string
     intercept[NumberFormatException] {
       Utils.byteStringAsMb("0.064")
     }
-    
+
     // Test fractional string
     intercept[NumberFormatException] {
       Utils.byteStringAsMb("0.064m")
     }
-    
+
     // Test invalid strings
     intercept[NumberFormatException] {
       Utils.byteStringAsBytes("500ub")
     }
-    
+
     // Test invalid strings
     intercept[NumberFormatException] {
       Utils.byteStringAsBytes("This breaks 600b")
@@ -174,12 +175,12 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
     intercept[NumberFormatException] {
       Utils.byteStringAsBytes("600gb This breaks")
     }
-    
+
     intercept[NumberFormatException] {
       Utils.byteStringAsBytes("This 123mb breaks")
     }
   }
-  
+
   test("bytesToString") {
     assert(Utils.bytesToString(10) === "10.0 B")
     assert(Utils.bytesToString(1500) === "1500.0 B")
@@ -475,6 +476,15 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
     }
   }
 
+  // Test for using the util function to change our log levels.
+  test("log4j log level change") {
+    Utils.setLogLevel(org.apache.log4j.Level.ALL)
+    assert(log.isInfoEnabled())
+    Utils.setLogLevel(org.apache.log4j.Level.ERROR)
+    assert(!log.isInfoEnabled())
+    assert(log.isErrorEnabled())
+  }
+
   test("deleteRecursively") {
     val tempDir1 = Utils.createTempDir()
     assert(tempDir1.exists())
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index b006120eb266d6386aac195afbca94d31336ac26..31992795a9e4562162da3d5423f6483ae6683788 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -267,6 +267,13 @@ class SparkContext(object):
         """
         self.stop()
 
+    def setLogLevel(self, logLevel):
+        """
+        Control our logLevel. This overrides any user-defined log settings.
+        Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
+        """
+        self._jsc.setLogLevel(logLevel)
+
     @classmethod
     def setSystemProperty(cls, key, value):
         """
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 1f08c2df9305b3e32ee1a17165e7a9b9336d230b..5ff49cac5522b4893b1e65c0cd14ebe334de26cb 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1,6 +1,6 @@
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
+# contir[butor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with