diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 960fb882cf90108b0717ee76ea0736a6961ad06f..90ce8f81eb7fd424ffa3cb9499ef22be84ed55e2 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -563,6 +563,63 @@ class DataFrameWriter(OptionUtils):
         self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols))
         return self
 
+    @since(2.3)
+    def bucketBy(self, numBuckets, col, *cols):
+        """Buckets the output by the given columns.If specified,
+        the output is laid out on the file system similar to Hive's bucketing scheme.
+
+        :param numBuckets: the number of buckets to save
+        :param col: a name of a column, or a list of names.
+        :param cols: additional names (optional). If `col` is a list it should be empty.
+
+        .. note:: Applicable for file-based data sources in combination with
+                  :py:meth:`DataFrameWriter.saveAsTable`.
+
+        >>> (df.write.format('parquet')
+        ...     .bucketBy(100, 'year', 'month')
+        ...     .mode("overwrite")
+        ...     .saveAsTable('bucketed_table'))
+        """
+        if not isinstance(numBuckets, int):
+            raise TypeError("numBuckets should be an int, got {0}.".format(type(numBuckets)))
+
+        if isinstance(col, (list, tuple)):
+            if cols:
+                raise ValueError("col is a {0} but cols are not empty".format(type(col)))
+
+            col, cols = col[0], col[1:]
+
+        if not all(isinstance(c, basestring) for c in cols) or not(isinstance(col, basestring)):
+            raise TypeError("all names should be `str`")
+
+        self._jwrite = self._jwrite.bucketBy(numBuckets, col, _to_seq(self._spark._sc, cols))
+        return self
+
+    @since(2.3)
+    def sortBy(self, col, *cols):
+        """Sorts the output in each bucket by the given columns on the file system.
+
+        :param col: a name of a column, or a list of names.
+        :param cols: additional names (optional). If `col` is a list it should be empty.
+
+        >>> (df.write.format('parquet')
+        ...     .bucketBy(100, 'year', 'month')
+        ...     .sortBy('day')
+        ...     .mode("overwrite")
+        ...     .saveAsTable('sorted_bucketed_table'))
+        """
+        if isinstance(col, (list, tuple)):
+            if cols:
+                raise ValueError("col is a {0} but cols are not empty".format(type(col)))
+
+            col, cols = col[0], col[1:]
+
+        if not all(isinstance(c, basestring) for c in cols) or not(isinstance(col, basestring)):
+            raise TypeError("all names should be `str`")
+
+        self._jwrite = self._jwrite.sortBy(col, _to_seq(self._spark._sc, cols))
+        return self
+
     @since(1.4)
     def save(self, path=None, format=None, mode=None, partitionBy=None, **options):
         """Saves the contents of the :class:`DataFrame` to a data source.
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 7983bc536fc6c4a2d27eef0bcb33dde91cc9d808..e3fe01eae243f5c36e3831cd727fa49d1d5810ef 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -211,6 +211,12 @@ class SQLTests(ReusedPySparkTestCase):
         sqlContext2 = SQLContext(self.sc)
         self.assertTrue(sqlContext1.sparkSession is sqlContext2.sparkSession)
 
+    def tearDown(self):
+        super(SQLTests, self).tearDown()
+
+        # tear down test_bucketed_write state
+        self.spark.sql("DROP TABLE IF EXISTS pyspark_bucket")
+
     def test_row_should_be_read_only(self):
         row = Row(a=1, b=2)
         self.assertEqual(1, row.a)
@@ -2196,6 +2202,54 @@ class SQLTests(ReusedPySparkTestCase):
         df = self.spark.createDataFrame(data, schema=schema)
         df.collect()
 
+    def test_bucketed_write(self):
+        data = [
+            (1, "foo", 3.0), (2, "foo", 5.0),
+            (3, "bar", -1.0), (4, "bar", 6.0),
+        ]
+        df = self.spark.createDataFrame(data, ["x", "y", "z"])
+
+        def count_bucketed_cols(names, table="pyspark_bucket"):
+            """Given a sequence of column names and a table name
+            query the catalog and return number o columns which are
+            used for bucketing
+            """
+            cols = self.spark.catalog.listColumns(table)
+            num = len([c for c in cols if c.name in names and c.isBucket])
+            return num
+
+        # Test write with one bucketing column
+        df.write.bucketBy(3, "x").mode("overwrite").saveAsTable("pyspark_bucket")
+        self.assertEqual(count_bucketed_cols(["x"]), 1)
+        self.assertSetEqual(set(data), set(self.spark.table("pyspark_bucket").collect()))
+
+        # Test write two bucketing columns
+        df.write.bucketBy(3, "x", "y").mode("overwrite").saveAsTable("pyspark_bucket")
+        self.assertEqual(count_bucketed_cols(["x", "y"]), 2)
+        self.assertSetEqual(set(data), set(self.spark.table("pyspark_bucket").collect()))
+
+        # Test write with bucket and sort
+        df.write.bucketBy(2, "x").sortBy("z").mode("overwrite").saveAsTable("pyspark_bucket")
+        self.assertEqual(count_bucketed_cols(["x"]), 1)
+        self.assertSetEqual(set(data), set(self.spark.table("pyspark_bucket").collect()))
+
+        # Test write with a list of columns
+        df.write.bucketBy(3, ["x", "y"]).mode("overwrite").saveAsTable("pyspark_bucket")
+        self.assertEqual(count_bucketed_cols(["x", "y"]), 2)
+        self.assertSetEqual(set(data), set(self.spark.table("pyspark_bucket").collect()))
+
+        # Test write with bucket and sort with a list of columns
+        (df.write.bucketBy(2, "x")
+            .sortBy(["y", "z"])
+            .mode("overwrite").saveAsTable("pyspark_bucket"))
+        self.assertSetEqual(set(data), set(self.spark.table("pyspark_bucket").collect()))
+
+        # Test write with bucket and sort with multiple columns
+        (df.write.bucketBy(2, "x")
+            .sortBy("y", "z")
+            .mode("overwrite").saveAsTable("pyspark_bucket"))
+        self.assertSetEqual(set(data), set(self.spark.table("pyspark_bucket").collect()))
+
 
 class HiveSparkSubmitTests(SparkSubmitTests):