diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index cc5e93dcadf4d78f3d5f28d36e569cd6728b9738..e2ee9db04948951f48c74d8975189363187e37d8 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -166,7 +166,10 @@ class DataFrameReader(object):
             return self._df(self._jreader.stream())
 
     @since(1.4)
-    def json(self, path, schema=None):
+    def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
+             allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
+             allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
+             mode=None, columnNameOfCorruptRecord=None):
         """
         Loads a JSON file (one object per line) or an RDD of Strings storing JSON objects
         (one object per record) and returns the result as a :class`DataFrame`.
@@ -177,31 +180,36 @@ class DataFrameReader(object):
         :param path: string represents path to the JSON dataset,
                      or RDD of Strings storing JSON objects.
         :param schema: an optional :class:`StructType` for the input schema.
+        :param primitivesAsString: infers all primitive values as a string type. If None is set,
+                                   it uses the default value, ``false``.
+        :param prefersDecimal: infers all floating-point values as a decimal type. If the values
+                               do not fit in decimal, then it infers them as doubles. If None is
+                               set, it uses the default value, ``false``.
+        :param allowComments: ignores Java/C++ style comment in JSON records. If None is set,
+                              it uses the default value, ``false``.
+        :param allowUnquotedFieldNames: allows unquoted JSON field names. If None is set,
+                                        it uses the default value, ``false``.
+        :param allowSingleQuotes: allows single quotes in addition to double quotes. If None is
+                                        set, it uses the default value, ``true``.
+        :param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 00012). If None is
+                                        set, it uses the default value, ``false``.
+        :param allowBackslashEscapingAnyCharacter: allows accepting quoting of all character
+                                                   using backslash quoting mechanism. If None is
+                                                   set, it uses the default value, ``false``.
+        :param mode: allows a mode for dealing with corrupt records during parsing. If None is
+                     set, it uses the default value, ``PERMISSIVE``.
 
-        You can set the following JSON-specific options to deal with non-standard JSON files:
-            * ``primitivesAsString`` (default ``false``): infers all primitive values as a string \
-                type
-            * `prefersDecimal` (default `false`): infers all floating-point values as a decimal \
-                type. If the values do not fit in decimal, then it infers them as doubles.
-            * ``allowComments`` (default ``false``): ignores Java/C++ style comment in JSON records
-            * ``allowUnquotedFieldNames`` (default ``false``): allows unquoted JSON field names
-            * ``allowSingleQuotes`` (default ``true``): allows single quotes in addition to double \
-                quotes
-            * ``allowNumericLeadingZeros`` (default ``false``): allows leading zeros in numbers \
-                (e.g. 00012)
-            * ``allowBackslashEscapingAnyCharacter`` (default ``false``): allows accepting quoting \
-                of all character using backslash quoting mechanism
-            *  ``mode`` (default ``PERMISSIVE``): allows a mode for dealing with corrupt records \
-                during parsing.
                 *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
                   record and puts the malformed string into a new field configured by \
                  ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
                  ``null`` for extra fields.
                 *  ``DROPMALFORMED`` : ignores the whole corrupted records.
                 *  ``FAILFAST`` : throws an exception when it meets corrupted records.
-            *  ``columnNameOfCorruptRecord`` (default ``_corrupt_record``): allows renaming the \
-                 new field having malformed string created by ``PERMISSIVE`` mode. \
-                 This overrides ``spark.sql.columnNameOfCorruptRecord``.
+
+        :param columnNameOfCorruptRecord: allows renaming the new field having malformed string
+                                          created by ``PERMISSIVE`` mode. This overrides
+                                          ``spark.sql.columnNameOfCorruptRecord``. If None is set,
+                                          it uses the default value ``_corrupt_record``.
 
         >>> df1 = sqlContext.read.json('python/test_support/sql/people.json')
         >>> df1.dtypes
@@ -214,6 +222,24 @@ class DataFrameReader(object):
         """
         if schema is not None:
             self.schema(schema)
+        if primitivesAsString is not None:
+            self.option("primitivesAsString", primitivesAsString)
+        if prefersDecimal is not None:
+            self.option("prefersDecimal", prefersDecimal)
+        if allowComments is not None:
+            self.option("allowComments", allowComments)
+        if allowUnquotedFieldNames is not None:
+            self.option("allowUnquotedFieldNames", allowUnquotedFieldNames)
+        if allowSingleQuotes is not None:
+            self.option("allowSingleQuotes", allowSingleQuotes)
+        if allowNumericLeadingZero is not None:
+            self.option("allowNumericLeadingZero", allowNumericLeadingZero)
+        if allowBackslashEscapingAnyCharacter is not None:
+            self.option("allowBackslashEscapingAnyCharacter", allowBackslashEscapingAnyCharacter)
+        if mode is not None:
+            self.option("mode", mode)
+        if columnNameOfCorruptRecord is not None:
+            self.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
         if isinstance(path, basestring):
             return self._df(self._jreader.json(path))
         elif type(path) == list:
@@ -270,53 +296,62 @@ class DataFrameReader(object):
         [Row(value=u'hello'), Row(value=u'this')]
         """
         if isinstance(paths, basestring):
-            paths = [paths]
-        return self._df(self._jreader.text(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths)))
+            path = [paths]
+        return self._df(self._jreader.text(self._sqlContext._sc._jvm.PythonUtils.toSeq(path)))
 
     @since(2.0)
-    def csv(self, paths):
+    def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
+            comment=None, header=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
+            nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None,
+            maxColumns=None, maxCharsPerColumn=None, mode=None):
         """Loads a CSV file and returns the result as a [[DataFrame]].
 
         This function goes through the input once to determine the input schema. To avoid going
         through the entire data once, specify the schema explicitly using [[schema]].
 
-        :param paths: string, or list of strings, for input path(s).
-
-        You can set the following CSV-specific options to deal with CSV files:
-            * ``sep`` (default ``,``): sets the single character as a separator \
-                for each field and value.
-            * ``charset`` (default ``UTF-8``): decodes the CSV files by the given \
-                encoding type.
-            * ``quote`` (default ``"``): sets the single character used for escaping \
-                quoted values where the separator can be part of the value.
-            * ``escape`` (default ``\``): sets the single character used for escaping quotes \
-                inside an already quoted value.
-            * ``comment`` (default empty string): sets the single character used for skipping \
-                lines beginning with this character. By default, it is disabled.
-            * ``header`` (default ``false``): uses the first line as names of columns.
-            * ``ignoreLeadingWhiteSpace`` (default ``false``): defines whether or not leading \
-                whitespaces from values being read should be skipped.
-            * ``ignoreTrailingWhiteSpace`` (default ``false``): defines whether or not trailing \
-                whitespaces from values being read should be skipped.
-            * ``nullValue`` (default empty string): sets the string representation of a null value.
-            * ``nanValue`` (default ``NaN``): sets the string representation of a non-number \
-                value.
-            * ``positiveInf`` (default ``Inf``): sets the string representation of a positive \
-                infinity value.
-            * ``negativeInf`` (default ``-Inf``): sets the string representation of a negative \
-                infinity value.
-            * ``dateFormat`` (default ``None``): sets the string that indicates a date format. \
-                Custom date formats follow the formats at ``java.text.SimpleDateFormat``. This \
-                applies to both date type and timestamp type. By default, it is None which means \
-                trying to parse times and date by ``java.sql.Timestamp.valueOf()`` and \
-                ``java.sql.Date.valueOf()``.
-            * ``maxColumns`` (default ``20480``): defines a hard limit of how many columns \
-                a record can have.
-            * ``maxCharsPerColumn`` (default ``1000000``): defines the maximum number of \
-                characters allowed for any given value being read.
-            * ``mode`` (default ``PERMISSIVE``): allows a mode for dealing with corrupt records \
-                during parsing.
-                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. \
+        :param path: string, or list of strings, for input path(s).
+        :param schema: an optional :class:`StructType` for the input schema.
+        :param sep: sets the single character as a separator for each field and value.
+                    If None is set, it uses the default value, ``,``.
+        :param encoding: decodes the CSV files by the given encoding type. If None is set,
+                         it uses the default value, ``UTF-8``.
+        :param quote: sets the single character used for escaping quoted values where the
+                      separator can be part of the value. If None is set, it uses the default
+                      value, ``"``.
+        :param escape: sets the single character used for escaping quotes inside an already
+                       quoted value. If None is set, it uses the default value, ``\``.
+        :param comment: sets the single character used for skipping lines beginning with this
+                        character. By default (None), it is disabled.
+        :param header: uses the first line as names of columns. If None is set, it uses the
+                       default value, ``false``.
+        :param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values
+                                        being read should be skipped. If None is set, it uses
+                                        the default value, ``false``.
+        :param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values
+                                         being read should be skipped. If None is set, it uses
+                                         the default value, ``false``.
+        :param nullValue: sets the string representation of a null value. If None is set, it uses
+                          the default value, empty string.
+        :param nanValue: sets the string representation of a non-number value. If None is set, it
+                         uses the default value, ``NaN``.
+        :param positiveInf: sets the string representation of a positive infinity value. If None
+                            is set, it uses the default value, ``Inf``.
+        :param negativeInf: sets the string representation of a negative infinity value. If None
+                            is set, it uses the default value, ``Inf``.
+        :param dateFormat: sets the string that indicates a date format. Custom date formats
+                           follow the formats at ``java.text.SimpleDateFormat``. This
+                           applies to both date type and timestamp type. By default, it is None
+                           which means trying to parse times and date by
+                           ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``.
+        :param maxColumns: defines a hard limit of how many columns a record can have. If None is
+                           set, it uses the default value, ``20480``.
+        :param maxCharsPerColumn: defines the maximum number of characters allowed for any given
+                                  value being read. If None is set, it uses the default value,
+                                  ``1000000``.
+        :param mode: allows a mode for dealing with corrupt records during parsing. If None is
+                     set, it uses the default value, ``PERMISSIVE``.
+
+                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
                     When a schema is set by user, it sets ``null`` for extra fields.
                 * ``DROPMALFORMED`` : ignores the whole corrupted records.
                 * ``FAILFAST`` : throws an exception when it meets corrupted records.
@@ -325,9 +360,43 @@ class DataFrameReader(object):
         >>> df.dtypes
         [('C0', 'string'), ('C1', 'string')]
         """
-        if isinstance(paths, basestring):
-            paths = [paths]
-        return self._df(self._jreader.csv(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths)))
+        if schema is not None:
+            self.schema(schema)
+        if sep is not None:
+            self.option("sep", sep)
+        if encoding is not None:
+            self.option("encoding", encoding)
+        if quote is not None:
+            self.option("quote", quote)
+        if escape is not None:
+            self.option("escape", escape)
+        if comment is not None:
+            self.option("comment", comment)
+        if header is not None:
+            self.option("header", header)
+        if ignoreLeadingWhiteSpace is not None:
+            self.option("ignoreLeadingWhiteSpace", ignoreLeadingWhiteSpace)
+        if ignoreTrailingWhiteSpace is not None:
+            self.option("ignoreTrailingWhiteSpace", ignoreTrailingWhiteSpace)
+        if nullValue is not None:
+            self.option("nullValue", nullValue)
+        if nanValue is not None:
+            self.option("nanValue", nanValue)
+        if positiveInf is not None:
+            self.option("positiveInf", positiveInf)
+        if negativeInf is not None:
+            self.option("negativeInf", negativeInf)
+        if dateFormat is not None:
+            self.option("dateFormat", dateFormat)
+        if maxColumns is not None:
+            self.option("maxColumns", maxColumns)
+        if maxCharsPerColumn is not None:
+            self.option("maxCharsPerColumn", maxCharsPerColumn)
+        if mode is not None:
+            self.option("mode", mode)
+        if isinstance(path, basestring):
+            path = [path]
+        return self._df(self._jreader.csv(self._sqlContext._sc._jvm.PythonUtils.toSeq(path)))
 
     @since(1.5)
     def orc(self, path):
@@ -687,7 +756,8 @@ class DataFrameWriter(object):
         self._jwrite.text(path)
 
     @since(2.0)
-    def csv(self, path, mode=None, compression=None):
+    def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
+            header=None, nullValue=None):
         """Saves the content of the [[DataFrame]] in CSV format at the specified path.
 
         :param path: the path in any Hadoop supported file system
@@ -701,25 +771,33 @@ class DataFrameWriter(object):
         :param compression: compression codec to use when saving to file. This can be one of the
                             known case-insensitive shorten names (none, bzip2, gzip, lz4,
                             snappy and deflate).
-
-        You can set the following CSV-specific options to deal with CSV files:
-            * ``sep`` (default ``,``): sets the single character as a separator \
-                for each field and value.
-            * ``quote`` (default ``"``): sets the single character used for escaping \
-                quoted values where the separator can be part of the value.
-            * ``escape`` (default ``\``): sets the single character used for escaping quotes \
-                inside an already quoted value.
-            * ``header`` (default ``false``): writes the names of columns as the first line.
-            * ``nullValue`` (default empty string): sets the string representation of a null value.
-            * ``compression``: compression codec to use when saving to file. This can be one of \
-                the known case-insensitive shorten names (none, bzip2, gzip, lz4, snappy and \
-                deflate).
+        :param sep: sets the single character as a separator for each field and value. If None is
+                    set, it uses the default value, ``,``.
+        :param quote: sets the single character used for escaping quoted values where the
+                      separator can be part of the value. If None is set, it uses the default
+                      value, ``"``.
+        :param escape: sets the single character used for escaping quotes inside an already
+                       quoted value. If None is set, it uses the default value, ``\``
+        :param header: writes the names of columns as the first line. If None is set, it uses
+                       the default value, ``false``.
+        :param nullValue: sets the string representation of a null value. If None is set, it uses
+                          the default value, empty string.
 
         >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
         """
         self.mode(mode)
         if compression is not None:
             self.option("compression", compression)
+        if sep is not None:
+            self.option("sep", sep)
+        if quote is not None:
+            self.option("quote", quote)
+        if escape is not None:
+            self.option("escape", escape)
+        if header is not None:
+            self.option("header", header)
+        if nullValue is not None:
+            self.option("nullValue", nullValue)
         self._jwrite.csv(path)
 
     @since(1.5)
@@ -797,7 +875,7 @@ def _test():
     globs['sqlContext'] = SQLContext(sc)
     globs['hiveContext'] = HiveContext._createForTesting(sc)
     globs['df'] = globs['sqlContext'].read.parquet('python/test_support/sql/parquet_partitioned')
-    globs['sdf'] =\
+    globs['sdf'] = \
         globs['sqlContext'].read.format('text').stream('python/test_support/sql/streaming')
 
     (failure_count, test_count) = doctest.testmod(