diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 1374f74968c9eacb57dfd92dbb62bbd42fca5be2..3a2e7649e68278997593ba3b62e47495cb2cd4ee 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -32,7 +32,7 @@ import warnings
 import heapq
 import bisect
 from random import Random
-from math import sqrt, log
+from math import sqrt, log, isinf, isnan
 
 from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
     BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
@@ -886,6 +886,133 @@ class RDD(object):
 
         return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
 
+    def histogram(self, buckets):
+        """
+        Compute a histogram using the provided buckets. The buckets
+        are all open to the right except for the last which is closed.
+        e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50],
+        which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1
+        and 50 we would have a histogram of 1,0,1.
+
+        If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),
+        this can be switched from an O(log n) inseration to O(1) per
+        element(where n = # buckets).
+
+        Buckets must be sorted and not contain any duplicates, must be
+        at least two elements.
+
+        If `buckets` is a number, it will generates buckets which are
+        evenly spaced between the minimum and maximum of the RDD. For
+        example, if the min value is 0 and the max is 100, given buckets
+        as 2, the resulting buckets will be [0,50) [50,100]. buckets must
+        be at least 1 If the RDD contains infinity, NaN throws an exception
+        If the elements in RDD do not vary (max == min) always returns
+        a single bucket.
+
+        It will return an tuple of buckets and histogram.
+
+        >>> rdd = sc.parallelize(range(51))
+        >>> rdd.histogram(2)
+        ([0, 25, 50], [25, 26])
+        >>> rdd.histogram([0, 5, 25, 50])
+        ([0, 5, 25, 50], [5, 20, 26])
+        >>> rdd.histogram([0, 15, 30, 45, 60])  # evenly spaced buckets
+        ([0, 15, 30, 45, 60], [15, 15, 15, 6])
+        >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"])
+        >>> rdd.histogram(("a", "b", "c"))
+        (('a', 'b', 'c'), [2, 2])
+        """
+
+        if isinstance(buckets, (int, long)):
+            if buckets < 1:
+                raise ValueError("number of buckets must be >= 1")
+
+            # filter out non-comparable elements
+            def comparable(x):
+                if x is None:
+                    return False
+                if type(x) is float and isnan(x):
+                    return False
+                return True
+
+            filtered = self.filter(comparable)
+
+            # faster than stats()
+            def minmax(a, b):
+                return min(a[0], b[0]), max(a[1], b[1])
+            try:
+                minv, maxv = filtered.map(lambda x: (x, x)).reduce(minmax)
+            except TypeError as e:
+                if " empty " in str(e):
+                    raise ValueError("can not generate buckets from empty RDD")
+                raise
+
+            if minv == maxv or buckets == 1:
+                return [minv, maxv], [filtered.count()]
+
+            try:
+                inc = (maxv - minv) / buckets
+            except TypeError:
+                raise TypeError("Can not generate buckets with non-number in RDD")
+
+            if isinf(inc):
+                raise ValueError("Can not generate buckets with infinite value")
+
+            # keep them as integer if possible
+            if inc * buckets != maxv - minv:
+                inc = (maxv - minv) * 1.0 / buckets
+
+            buckets = [i * inc + minv for i in range(buckets)]
+            buckets.append(maxv)  # fix accumulated error
+            even = True
+
+        elif isinstance(buckets, (list, tuple)):
+            if len(buckets) < 2:
+                raise ValueError("buckets should have more than one value")
+
+            if any(i is None or isinstance(i, float) and isnan(i) for i in buckets):
+                raise ValueError("can not have None or NaN in buckets")
+
+            if sorted(buckets) != list(buckets):
+                raise ValueError("buckets should be sorted")
+
+            if len(set(buckets)) != len(buckets):
+                raise ValueError("buckets should not contain duplicated values")
+
+            minv = buckets[0]
+            maxv = buckets[-1]
+            even = False
+            inc = None
+            try:
+                steps = [buckets[i + 1] - buckets[i] for i in range(len(buckets) - 1)]
+            except TypeError:
+                pass  # objects in buckets do not support '-'
+            else:
+                if max(steps) - min(steps) < 1e-10:  # handle precision errors
+                    even = True
+                    inc = (maxv - minv) / (len(buckets) - 1)
+
+        else:
+            raise TypeError("buckets should be a list or tuple or number(int or long)")
+
+        def histogram(iterator):
+            counters = [0] * len(buckets)
+            for i in iterator:
+                if i is None or (type(i) is float and isnan(i)) or i > maxv or i < minv:
+                    continue
+                t = (int((i - minv) / inc) if even
+                     else bisect.bisect_right(buckets, i) - 1)
+                counters[t] += 1
+            # add last two together
+            last = counters.pop()
+            counters[-1] += last
+            return [counters]
+
+        def mergeCounters(a, b):
+            return [i + j for i, j in zip(a, b)]
+
+        return buckets, self.mapPartitions(histogram).reduce(mergeCounters)
+
     def mean(self):
         """
         Compute the mean of this RDD's elements.
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 51bfbb47e53c285978c0ecdaaff6cfa63f0892d3..1db922f5137432c482ec13c5608948a81f56c7b1 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -364,6 +364,110 @@ class TestRDDFunctions(PySparkTestCase):
         self.assertEquals(a.count(), b.count())
         self.assertRaises(Exception, lambda: a.zip(b).count())
 
+    def test_histogram(self):
+        # empty
+        rdd = self.sc.parallelize([])
+        self.assertEquals([0], rdd.histogram([0, 10])[1])
+        self.assertEquals([0, 0], rdd.histogram([0, 4, 10])[1])
+        self.assertRaises(ValueError, lambda: rdd.histogram(1))
+
+        # out of range
+        rdd = self.sc.parallelize([10.01, -0.01])
+        self.assertEquals([0], rdd.histogram([0, 10])[1])
+        self.assertEquals([0, 0], rdd.histogram((0, 4, 10))[1])
+
+        # in range with one bucket
+        rdd = self.sc.parallelize(range(1, 5))
+        self.assertEquals([4], rdd.histogram([0, 10])[1])
+        self.assertEquals([3, 1], rdd.histogram([0, 4, 10])[1])
+
+        # in range with one bucket exact match
+        self.assertEquals([4], rdd.histogram([1, 4])[1])
+
+        # out of range with two buckets
+        rdd = self.sc.parallelize([10.01, -0.01])
+        self.assertEquals([0, 0], rdd.histogram([0, 5, 10])[1])
+
+        # out of range with two uneven buckets
+        rdd = self.sc.parallelize([10.01, -0.01])
+        self.assertEquals([0, 0], rdd.histogram([0, 4, 10])[1])
+
+        # in range with two buckets
+        rdd = self.sc.parallelize([1, 2, 3, 5, 6])
+        self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1])
+
+        # in range with two bucket and None
+        rdd = self.sc.parallelize([1, 2, 3, 5, 6, None, float('nan')])
+        self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1])
+
+        # in range with two uneven buckets
+        rdd = self.sc.parallelize([1, 2, 3, 5, 6])
+        self.assertEquals([3, 2], rdd.histogram([0, 5, 11])[1])
+
+        # mixed range with two uneven buckets
+        rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.0, 11.01])
+        self.assertEquals([4, 3], rdd.histogram([0, 5, 11])[1])
+
+        # mixed range with four uneven buckets
+        rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0, 200.0, 200.1])
+        self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1])
+
+        # mixed range with uneven buckets and NaN
+        rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0,
+                                   199.0, 200.0, 200.1, None, float('nan')])
+        self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1])
+
+        # out of range with infinite buckets
+        rdd = self.sc.parallelize([10.01, -0.01, float('nan'), float("inf")])
+        self.assertEquals([1, 2], rdd.histogram([float('-inf'), 0, float('inf')])[1])
+
+        # invalid buckets
+        self.assertRaises(ValueError, lambda: rdd.histogram([]))
+        self.assertRaises(ValueError, lambda: rdd.histogram([1]))
+        self.assertRaises(ValueError, lambda: rdd.histogram(0))
+        self.assertRaises(TypeError, lambda: rdd.histogram({}))
+
+        # without buckets
+        rdd = self.sc.parallelize(range(1, 5))
+        self.assertEquals(([1, 4], [4]), rdd.histogram(1))
+
+        # without buckets single element
+        rdd = self.sc.parallelize([1])
+        self.assertEquals(([1, 1], [1]), rdd.histogram(1))
+
+        # without bucket no range
+        rdd = self.sc.parallelize([1] * 4)
+        self.assertEquals(([1, 1], [4]), rdd.histogram(1))
+
+        # without buckets basic two
+        rdd = self.sc.parallelize(range(1, 5))
+        self.assertEquals(([1, 2.5, 4], [2, 2]), rdd.histogram(2))
+
+        # without buckets with more requested than elements
+        rdd = self.sc.parallelize([1, 2])
+        buckets = [1 + 0.2 * i for i in range(6)]
+        hist = [1, 0, 0, 0, 1]
+        self.assertEquals((buckets, hist), rdd.histogram(5))
+
+        # invalid RDDs
+        rdd = self.sc.parallelize([1, float('inf')])
+        self.assertRaises(ValueError, lambda: rdd.histogram(2))
+        rdd = self.sc.parallelize([float('nan')])
+        self.assertRaises(ValueError, lambda: rdd.histogram(2))
+
+        # string
+        rdd = self.sc.parallelize(["ab", "ac", "b", "bd", "ef"], 2)
+        self.assertEquals([2, 2], rdd.histogram(["a", "b", "c"])[1])
+        self.assertEquals((["ab", "ef"], [5]), rdd.histogram(1))
+        self.assertRaises(TypeError, lambda: rdd.histogram(2))
+
+        # mixed RDD
+        rdd = self.sc.parallelize([1, 4, "ab", "ac", "b"], 2)
+        self.assertEquals([1, 1], rdd.histogram([0, 4, 10])[1])
+        self.assertEquals([2, 1], rdd.histogram(["a", "b", "c"])[1])
+        self.assertEquals(([1, "b"], [5]), rdd.histogram(1))
+        self.assertRaises(TypeError, lambda: rdd.histogram(2))
+
 
 class TestIO(PySparkTestCase):