diff --git a/docs/ml-guide.md b/docs/ml-guide.md
index 4ba07542bfb40ebfa11161f3df2901445e403359..78c93a95c7807e0a8b4fab6dcddaab3ee97a266b 100644
--- a/docs/ml-guide.md
+++ b/docs/ml-guide.md
@@ -212,26 +212,18 @@ This example covers the concepts of `Estimator`, `Transformer`, and `Param`.
 
 <div data-lang="scala">
 {% highlight scala %}
-import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.ml.classification.LogisticRegression
 import org.apache.spark.ml.param.ParamMap
 import org.apache.spark.mllib.linalg.{Vector, Vectors}
-import org.apache.spark.mllib.regression.LabeledPoint
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.Row
 
-val conf = new SparkConf().setAppName("SimpleParamsExample")
-val sc = new SparkContext(conf)
-val sqlContext = new SQLContext(sc)
-import sqlContext.implicits._
-
-// Prepare training data.
-// We use LabeledPoint, which is a case class.  Spark SQL can convert RDDs of case classes
-// into DataFrames, where it uses the case class metadata to infer the schema.
-val training = sc.parallelize(Seq(
-  LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
-  LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
-  LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
-  LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))
+// Prepare training data from a list of (label, features) tuples.
+val training = sqlContext.createDataFrame(Seq(
+  (1.0, Vectors.dense(0.0, 1.1, 0.1)),
+  (0.0, Vectors.dense(2.0, 1.0, -1.0)),
+  (0.0, Vectors.dense(2.0, 1.3, 1.0)),
+  (1.0, Vectors.dense(0.0, 1.2, -0.5))
+)).toDF("label", "features")
 
 // Create a LogisticRegression instance.  This instance is an Estimator.
 val lr = new LogisticRegression()
@@ -243,7 +235,7 @@ lr.setMaxIter(10)
   .setRegParam(0.01)
 
 // Learn a LogisticRegression model.  This uses the parameters stored in lr.
-val model1 = lr.fit(training.toDF)
+val model1 = lr.fit(training)
 // Since model1 is a Model (i.e., a Transformer produced by an Estimator),
 // we can view the parameters it used during fit().
 // This prints the parameter (name: value) pairs, where names are unique IDs for this
@@ -253,8 +245,8 @@ println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)
 // We may alternatively specify parameters using a ParamMap,
 // which supports several methods for specifying parameters.
 val paramMap = ParamMap(lr.maxIter -> 20)
-paramMap.put(lr.maxIter, 30) // Specify 1 Param.  This overwrites the original maxIter.
-paramMap.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.
+  .put(lr.maxIter, 30) // Specify 1 Param.  This overwrites the original maxIter.
+  .put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.
 
 // One can also combine ParamMaps.
 val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name
@@ -262,27 +254,27 @@ val paramMapCombined = paramMap ++ paramMap2
 
 // Now learn a new model using the paramMapCombined parameters.
 // paramMapCombined overrides all parameters set earlier via lr.set* methods.
-val model2 = lr.fit(training.toDF, paramMapCombined)
+val model2 = lr.fit(training, paramMapCombined)
 println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)
 
 // Prepare test data.
-val test = sc.parallelize(Seq(
-  LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
-  LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
-  LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))))
+val test = sqlContext.createDataFrame(Seq(
+  (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
+  (0.0, Vectors.dense(3.0, 2.0, -0.1)),
+  (1.0, Vectors.dense(0.0, 2.2, -1.5))
+)).toDF("label", "features")
 
 // Make predictions on test data using the Transformer.transform() method.
 // LogisticRegression.transform will only use the 'features' column.
 // Note that model2.transform() outputs a 'myProbability' column instead of the usual
 // 'probability' column since we renamed the lr.probabilityCol parameter previously.
-model2.transform(test.toDF)
+model2.transform(test)
   .select("features", "label", "myProbability", "prediction")
   .collect()
   .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
     println(s"($features, $label) -> prob=$prob, prediction=$prediction")
   }
 
-sc.stop()
 {% endhighlight %}
 </div>
 
@@ -291,30 +283,23 @@ sc.stop()
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.ml.classification.LogisticRegressionModel;
 import org.apache.spark.ml.param.ParamMap;
 import org.apache.spark.ml.classification.LogisticRegression;
 import org.apache.spark.mllib.linalg.Vectors;
 import org.apache.spark.mllib.regression.LabeledPoint;
 import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.Row;
 
-SparkConf conf = new SparkConf().setAppName("JavaSimpleParamsExample");
-JavaSparkContext jsc = new JavaSparkContext(conf);
-SQLContext jsql = new SQLContext(jsc);
-
 // Prepare training data.
 // We use LabeledPoint, which is a JavaBean.  Spark SQL can convert RDDs of JavaBeans
 // into DataFrames, where it uses the bean metadata to infer the schema.
-List<LabeledPoint> localTraining = Arrays.asList(
+DataFrame training = sqlContext.createDataFrame(Arrays.asList(
   new LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
   new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
   new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
-  new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)));
-DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledPoint.class);
+  new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))
+), LabeledPoint.class);
 
 // Create a LogisticRegression instance.  This instance is an Estimator.
 LogisticRegression lr = new LogisticRegression();
@@ -334,14 +319,14 @@ LogisticRegressionModel model1 = lr.fit(training);
 System.out.println("Model 1 was fit using parameters: " + model1.parent().extractParamMap());
 
 // We may alternatively specify parameters using a ParamMap.
-ParamMap paramMap = new ParamMap();
-paramMap.put(lr.maxIter().w(20)); // Specify 1 Param.
-paramMap.put(lr.maxIter(), 30); // This overwrites the original maxIter.
-paramMap.put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // Specify multiple Params.
+ParamMap paramMap = new ParamMap()
+  .put(lr.maxIter().w(20)) // Specify 1 Param.
+  .put(lr.maxIter(), 30) // This overwrites the original maxIter.
+  .put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // Specify multiple Params.
 
 // One can also combine ParamMaps.
-ParamMap paramMap2 = new ParamMap();
-paramMap2.put(lr.probabilityCol().w("myProbability")); // Change output column name
+ParamMap paramMap2 = new ParamMap()
+  .put(lr.probabilityCol().w("myProbability")); // Change output column name
 ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);
 
 // Now learn a new model using the paramMapCombined parameters.
@@ -350,11 +335,11 @@ LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
 System.out.println("Model 2 was fit using parameters: " + model2.parent().extractParamMap());
 
 // Prepare test documents.
-List<LabeledPoint> localTest = Arrays.asList(
-    new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
-    new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
-    new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)));
-DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), LabeledPoint.class);
+DataFrame test = sqlContext.createDataFrame(Arrays.asList(
+  new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
+  new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
+  new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))
+), LabeledPoint.class);
 
 // Make predictions on test documents using the Transformer.transform() method.
 // LogisticRegression.transform will only use the 'features' column.
@@ -366,28 +351,21 @@ for (Row r: results.select("features", "label", "myProbability", "prediction").c
       + ", prediction=" + r.get(3));
 }
 
-jsc.stop();
 {% endhighlight %}
 </div>
 
 <div data-lang="python">
 {% highlight python %}
-from pyspark import SparkContext
-from pyspark.mllib.regression import LabeledPoint
+from pyspark.mllib.linalg import Vectors
 from pyspark.ml.classification import LogisticRegression
 from pyspark.ml.param import Param, Params
-from pyspark.sql import Row, SQLContext
 
-sc = SparkContext(appName="SimpleParamsExample")
-sqlContext = SQLContext(sc)
-
-# Prepare training data.
-# We use LabeledPoint.
-# Spark SQL can convert RDDs of LabeledPoints into DataFrames.
-training = sc.parallelize([LabeledPoint(1.0, [0.0, 1.1,  0.1]),
-                           LabeledPoint(0.0, [2.0, 1.0, -1.0]),
-                           LabeledPoint(0.0, [2.0, 1.3,  1.0]),
-                           LabeledPoint(1.0, [0.0, 1.2, -0.5])])
+# Prepare training data from a list of (label, features) tuples.
+training = sqlContext.createDataFrame([
+    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
+    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
+    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
+    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])
 
 # Create a LogisticRegression instance. This instance is an Estimator.
 lr = LogisticRegression(maxIter=10, regParam=0.01)
@@ -395,7 +373,7 @@ lr = LogisticRegression(maxIter=10, regParam=0.01)
 print "LogisticRegression parameters:\n" + lr.explainParams() + "\n"
 
 # Learn a LogisticRegression model. This uses the parameters stored in lr.
-model1 = lr.fit(training.toDF())
+model1 = lr.fit(training)
 
 # Since model1 is a Model (i.e., a transformer produced by an Estimator),
 # we can view the parameters it used during fit().
@@ -416,25 +394,25 @@ paramMapCombined.update(paramMap2)
 
 # Now learn a new model using the paramMapCombined parameters.
 # paramMapCombined overrides all parameters set earlier via lr.set* methods.
-model2 = lr.fit(training.toDF(), paramMapCombined)
+model2 = lr.fit(training, paramMapCombined)
 print "Model 2 was fit using parameters: "
 print model2.extractParamMap()
 
 # Prepare test data
-test = sc.parallelize([LabeledPoint(1.0, [-1.0, 1.5,  1.3]),
-                       LabeledPoint(0.0, [ 3.0, 2.0, -0.1]),
-                       LabeledPoint(1.0, [ 0.0, 2.2, -1.5])])
+test = sqlContext.createDataFrame([
+    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
+    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
+    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])
 
 # Make predictions on test data using the Transformer.transform() method.
 # LogisticRegression.transform will only use the 'features' column.
 # Note that model2.transform() outputs a "myProbability" column instead of the usual
 # 'probability' column since we renamed the lr.probabilityCol parameter previously.
-prediction = model2.transform(test.toDF())
+prediction = model2.transform(test)
 selected = prediction.select("features", "label", "myProbability", "prediction")
 for row in selected.collect():
     print row
 
-sc.stop()
 {% endhighlight %}
 </div>
 
@@ -448,30 +426,19 @@ This example follows the simple text document `Pipeline` illustrated in the figu
 
 <div data-lang="scala">
 {% highlight scala %}
-import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.ml.Pipeline
 import org.apache.spark.ml.classification.LogisticRegression
 import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
 import org.apache.spark.mllib.linalg.Vector
-import org.apache.spark.sql.{Row, SQLContext}
-
-// Labeled and unlabeled instance types.
-// Spark SQL can infer schema from case classes.
-case class LabeledDocument(id: Long, text: String, label: Double)
-case class Document(id: Long, text: String)
+import org.apache.spark.sql.Row
 
-// Set up contexts.  Import implicit conversions to DataFrame from sqlContext.
-val conf = new SparkConf().setAppName("SimpleTextClassificationPipeline")
-val sc = new SparkContext(conf)
-val sqlContext = new SQLContext(sc)
-import sqlContext.implicits._
-
-// Prepare training documents, which are labeled.
-val training = sc.parallelize(Seq(
-  LabeledDocument(0L, "a b c d e spark", 1.0),
-  LabeledDocument(1L, "b d", 0.0),
-  LabeledDocument(2L, "spark f g h", 1.0),
-  LabeledDocument(3L, "hadoop mapreduce", 0.0)))
+// Prepare training documents from a list of (id, text, label) tuples.
+val training = sqlContext.createDataFrame(Seq(
+  (0L, "a b c d e spark", 1.0),
+  (1L, "b d", 0.0),
+  (2L, "spark f g h", 1.0),
+  (3L, "hadoop mapreduce", 0.0)
+)).toDF("id", "text", "label")
 
 // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
 val tokenizer = new Tokenizer()
@@ -488,14 +455,15 @@ val pipeline = new Pipeline()
   .setStages(Array(tokenizer, hashingTF, lr))
 
 // Fit the pipeline to training documents.
-val model = pipeline.fit(training.toDF)
+val model = pipeline.fit(training)
 
-// Prepare test documents, which are unlabeled.
-val test = sc.parallelize(Seq(
-  Document(4L, "spark i j k"),
-  Document(5L, "l m n"),
-  Document(6L, "mapreduce spark"),
-  Document(7L, "apache hadoop")))
+// Prepare test documents, which are unlabeled (id, text) tuples.
+val test = sqlContext.createDataFrame(Seq(
+  (4L, "spark i j k"),
+  (5L, "l m n"),
+  (6L, "mapreduce spark"),
+  (7L, "apache hadoop")
+)).toDF("id", "text")
 
 // Make predictions on test documents.
 model.transform(test.toDF)
@@ -505,7 +473,6 @@ model.transform(test.toDF)
     println(s"($id, $text) --> prob=$prob, prediction=$prediction")
   }
 
-sc.stop()
 {% endhighlight %}
 </div>
 
@@ -514,8 +481,6 @@ sc.stop()
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.ml.Pipeline;
 import org.apache.spark.ml.PipelineModel;
 import org.apache.spark.ml.PipelineStage;
@@ -524,7 +489,6 @@ import org.apache.spark.ml.feature.HashingTF;
 import org.apache.spark.ml.feature.Tokenizer;
 import org.apache.spark.sql.DataFrame;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
 
 // Labeled and unlabeled instance types.
 // Spark SQL can infer schema from Java Beans.
@@ -556,18 +520,13 @@ public class LabeledDocument extends Document implements Serializable {
   public void setLabel(double label) { this.label = label; }
 }
 
-// Set up contexts.
-SparkConf conf = new SparkConf().setAppName("JavaSimpleTextClassificationPipeline");
-JavaSparkContext jsc = new JavaSparkContext(conf);
-SQLContext jsql = new SQLContext(jsc);
-
 // Prepare training documents, which are labeled.
-List<LabeledDocument> localTraining = Arrays.asList(
+DataFrame training = sqlContext.createDataFrame(Arrays.asList(
   new LabeledDocument(0L, "a b c d e spark", 1.0),
   new LabeledDocument(1L, "b d", 0.0),
   new LabeledDocument(2L, "spark f g h", 1.0),
-  new LabeledDocument(3L, "hadoop mapreduce", 0.0));
-DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledDocument.class);
+  new LabeledDocument(3L, "hadoop mapreduce", 0.0)
+), LabeledDocument.class);
 
 // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
 Tokenizer tokenizer = new Tokenizer()
@@ -587,12 +546,12 @@ Pipeline pipeline = new Pipeline()
 PipelineModel model = pipeline.fit(training);
 
 // Prepare test documents, which are unlabeled.
-List<Document> localTest = Arrays.asList(
+DataFrame test = sqlContext.createDataFrame(Arrays.asList(
   new Document(4L, "spark i j k"),
   new Document(5L, "l m n"),
   new Document(6L, "mapreduce spark"),
-  new Document(7L, "apache hadoop"));
-DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), Document.class);
+  new Document(7L, "apache hadoop")
+), Document.class);
 
 // Make predictions on test documents.
 DataFrame predictions = model.transform(test);
@@ -601,28 +560,23 @@ for (Row r: predictions.select("id", "text", "probability", "prediction").collec
       + ", prediction=" + r.get(3));
 }
 
-jsc.stop();
 {% endhighlight %}
 </div>
 
 <div data-lang="python">
 {% highlight python %}
-from pyspark import SparkContext
 from pyspark.ml import Pipeline
 from pyspark.ml.classification import LogisticRegression
 from pyspark.ml.feature import HashingTF, Tokenizer
-from pyspark.sql import Row, SQLContext
-
-sc = SparkContext(appName="SimpleTextClassificationPipeline")
-sqlContext = SQLContext(sc)
+from pyspark.sql import Row
 
-# Prepare training documents, which are labeled.
+# Prepare training documents from a list of (id, text, label) tuples.
 LabeledDocument = Row("id", "text", "label")
-training = sc.parallelize([(0L, "a b c d e spark", 1.0),
-                           (1L, "b d", 0.0),
-                           (2L, "spark f g h", 1.0),
-                           (3L, "hadoop mapreduce", 0.0)]) \
-    .map(lambda x: LabeledDocument(*x)).toDF()
+training = sqlContext.createDataFrame([
+    (0L, "a b c d e spark", 1.0),
+    (1L, "b d", 0.0),
+    (2L, "spark f g h", 1.0),
+    (3L, "hadoop mapreduce", 0.0)], ["id", "text", "label"])
 
 # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
 tokenizer = Tokenizer(inputCol="text", outputCol="words")
@@ -633,13 +587,12 @@ pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
 # Fit the pipeline to training documents.
 model = pipeline.fit(training)
 
-# Prepare test documents, which are unlabeled.
-Document = Row("id", "text")
-test = sc.parallelize([(4L, "spark i j k"),
-                       (5L, "l m n"),
-                       (6L, "mapreduce spark"),
-                       (7L, "apache hadoop")]) \
-    .map(lambda x: Document(*x)).toDF()
+# Prepare test documents, which are unlabeled (id, text) tuples.
+test = sqlContext.createDataFrame([
+    (4L, "spark i j k"),
+    (5L, "l m n"),
+    (6L, "mapreduce spark"),
+    (7L, "apache hadoop")], ["id", "text"])
 
 # Make predictions on test documents and print columns of interest.
 prediction = model.transform(test)
@@ -647,7 +600,6 @@ selected = prediction.select("id", "text", "prediction")
 for row in selected.collect():
     print(row)
 
-sc.stop()
 {% endhighlight %}
 </div>
 
@@ -664,8 +616,8 @@ Currently, `spark.ml` supports model selection using the [`CrossValidator`](api/
 
 The `Evaluator` can be a [`RegressionEvaluator`](api/scala/index.html#org.apache.spark.ml.RegressionEvaluator)
 for regression problems, a [`BinaryClassificationEvaluator`](api/scala/index.html#org.apache.spark.ml.BinaryClassificationEvaluator)
-for binary data or a [`MultiClassClassificationEvaluator`](api/scala/index.html#org.apache.spark.ml.MultiClassClassificationEvaluator)
-for multiclass problems. The default metric used to choose the best `ParamMap` can be overriden by the setMetric
+for binary data, or a [`MultiClassClassificationEvaluator`](api/scala/index.html#org.apache.spark.ml.MultiClassClassificationEvaluator)
+for multiclass problems. The default metric used to choose the best `ParamMap` can be overriden by the `setMetric`
 method in each of these evaluators.
 
 The `ParamMap` which produces the best evaluation metric (averaged over the `$k$` folds) is selected as the best model.
@@ -684,39 +636,29 @@ However, it is also a well-established method for choosing parameters which is m
 
 <div data-lang="scala">
 {% highlight scala %}
-import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.ml.Pipeline
 import org.apache.spark.ml.classification.LogisticRegression
 import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
 import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
 import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
 import org.apache.spark.mllib.linalg.Vector
-import org.apache.spark.sql.{Row, SQLContext}
-
-// Labeled and unlabeled instance types.
-// Spark SQL can infer schema from case classes.
-case class LabeledDocument(id: Long, text: String, label: Double)
-case class Document(id: Long, text: String)
-
-val conf = new SparkConf().setAppName("CrossValidatorExample")
-val sc = new SparkContext(conf)
-val sqlContext = new SQLContext(sc)
-import sqlContext.implicits._
-
-// Prepare training documents, which are labeled.
-val training = sc.parallelize(Seq(
-  LabeledDocument(0L, "a b c d e spark", 1.0),
-  LabeledDocument(1L, "b d", 0.0),
-  LabeledDocument(2L, "spark f g h", 1.0),
-  LabeledDocument(3L, "hadoop mapreduce", 0.0),
-  LabeledDocument(4L, "b spark who", 1.0),
-  LabeledDocument(5L, "g d a y", 0.0),
-  LabeledDocument(6L, "spark fly", 1.0),
-  LabeledDocument(7L, "was mapreduce", 0.0),
-  LabeledDocument(8L, "e spark program", 1.0),
-  LabeledDocument(9L, "a e c l", 0.0),
-  LabeledDocument(10L, "spark compile", 1.0),
-  LabeledDocument(11L, "hadoop software", 0.0)))
+import org.apache.spark.sql.Row
+
+// Prepare training data from a list of (id, text, label) tuples.
+val training = sqlContext.createDataFrame(Seq(
+  (0L, "a b c d e spark", 1.0),
+  (1L, "b d", 0.0),
+  (2L, "spark f g h", 1.0),
+  (3L, "hadoop mapreduce", 0.0),
+  (4L, "b spark who", 1.0),
+  (5L, "g d a y", 0.0),
+  (6L, "spark fly", 1.0),
+  (7L, "was mapreduce", 0.0),
+  (8L, "e spark program", 1.0),
+  (9L, "a e c l", 0.0),
+  (10L, "spark compile", 1.0),
+  (11L, "hadoop software", 0.0)
+)).toDF("id", "text", "label")
 
 // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
 val tokenizer = new Tokenizer()
@@ -730,15 +672,6 @@ val lr = new LogisticRegression()
 val pipeline = new Pipeline()
   .setStages(Array(tokenizer, hashingTF, lr))
 
-// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
-// This will allow us to jointly choose parameters for all Pipeline stages.
-// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
-// Note that the evaluator here is a BinaryClassificationEvaluator and the default metric
-// used is areaUnderROC.
-val crossval = new CrossValidator()
-  .setEstimator(pipeline)
-  .setEvaluator(new BinaryClassificationEvaluator)
-
 // We use a ParamGridBuilder to construct a grid of parameters to search over.
 // With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
 // this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
@@ -746,28 +679,37 @@ val paramGrid = new ParamGridBuilder()
   .addGrid(hashingTF.numFeatures, Array(10, 100, 1000))
   .addGrid(lr.regParam, Array(0.1, 0.01))
   .build()
-crossval.setEstimatorParamMaps(paramGrid)
-crossval.setNumFolds(2) // Use 3+ in practice
+
+// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
+// This will allow us to jointly choose parameters for all Pipeline stages.
+// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
+// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
+// is areaUnderROC.
+val cv = new CrossValidator()
+  .setEstimator(pipeline)
+  .setEvaluator(new BinaryClassificationEvaluator)
+  .setEstimatorParamMaps(paramGrid)
+  .setNumFolds(2) // Use 3+ in practice
 
 // Run cross-validation, and choose the best set of parameters.
-val cvModel = crossval.fit(training.toDF)
+val cvModel = cv.fit(training)
 
-// Prepare test documents, which are unlabeled.
-val test = sc.parallelize(Seq(
-  Document(4L, "spark i j k"),
-  Document(5L, "l m n"),
-  Document(6L, "mapreduce spark"),
-  Document(7L, "apache hadoop")))
+// Prepare test documents, which are unlabeled (id, text) tuples.
+val test = sqlContext.createDataFrame(Seq(
+  (4L, "spark i j k"),
+  (5L, "l m n"),
+  (6L, "mapreduce spark"),
+  (7L, "apache hadoop")
+)).toDF("id", "text")
 
 // Make predictions on test documents. cvModel uses the best model found (lrModel).
-cvModel.transform(test.toDF)
+cvModel.transform(test)
   .select("id", "text", "probability", "prediction")
   .collect()
   .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
-  println(s"($id, $text) --> prob=$prob, prediction=$prediction")
-}
+    println(s"($id, $text) --> prob=$prob, prediction=$prediction")
+  }
 
-sc.stop()
 {% endhighlight %}
 </div>
 
@@ -776,8 +718,6 @@ sc.stop()
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.ml.Pipeline;
 import org.apache.spark.ml.PipelineStage;
 import org.apache.spark.ml.classification.LogisticRegression;
@@ -790,7 +730,6 @@ import org.apache.spark.ml.tuning.CrossValidatorModel;
 import org.apache.spark.ml.tuning.ParamGridBuilder;
 import org.apache.spark.sql.DataFrame;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
 
 // Labeled and unlabeled instance types.
 // Spark SQL can infer schema from Java Beans.
@@ -822,12 +761,9 @@ public class LabeledDocument extends Document implements Serializable {
   public void setLabel(double label) { this.label = label; }
 }
 
-SparkConf conf = new SparkConf().setAppName("JavaCrossValidatorExample");
-JavaSparkContext jsc = new JavaSparkContext(conf);
-SQLContext jsql = new SQLContext(jsc);
 
 // Prepare training documents, which are labeled.
-List<LabeledDocument> localTraining = Arrays.asList(
+DataFrame training = sqlContext.createDataFrame(Arrays.asList(
   new LabeledDocument(0L, "a b c d e spark", 1.0),
   new LabeledDocument(1L, "b d", 0.0),
   new LabeledDocument(2L, "spark f g h", 1.0),
@@ -839,8 +775,8 @@ List<LabeledDocument> localTraining = Arrays.asList(
   new LabeledDocument(8L, "e spark program", 1.0),
   new LabeledDocument(9L, "a e c l", 0.0),
   new LabeledDocument(10L, "spark compile", 1.0),
-  new LabeledDocument(11L, "hadoop software", 0.0));
-DataFrame training = jsql.createDataFrame(jsc.parallelize(localTraining), LabeledDocument.class);
+  new LabeledDocument(11L, "hadoop software", 0.0)
+), LabeledDocument.class);
 
 // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
 Tokenizer tokenizer = new Tokenizer()
@@ -856,15 +792,6 @@ LogisticRegression lr = new LogisticRegression()
 Pipeline pipeline = new Pipeline()
   .setStages(new PipelineStage[] {tokenizer, hashingTF, lr});
 
-// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
-// This will allow us to jointly choose parameters for all Pipeline stages.
-// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
-// Note that the evaluator here is a BinaryClassificationEvaluator and the default metric
-// used is areaUnderROC.
-CrossValidator crossval = new CrossValidator()
-    .setEstimator(pipeline)
-    .setEvaluator(new BinaryClassificationEvaluator());
-
 // We use a ParamGridBuilder to construct a grid of parameters to search over.
 // With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
 // this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
@@ -872,19 +799,28 @@ ParamMap[] paramGrid = new ParamGridBuilder()
     .addGrid(hashingTF.numFeatures(), new int[]{10, 100, 1000})
     .addGrid(lr.regParam(), new double[]{0.1, 0.01})
     .build();
-crossval.setEstimatorParamMaps(paramGrid);
-crossval.setNumFolds(2); // Use 3+ in practice
+
+// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
+// This will allow us to jointly choose parameters for all Pipeline stages.
+// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
+// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
+// is areaUnderROC.
+CrossValidator cv = new CrossValidator()
+  .setEstimator(pipeline)
+  .setEvaluator(new BinaryClassificationEvaluator())
+  .setEstimatorParamMaps(paramGrid)
+  .setNumFolds(2); // Use 3+ in practice
 
 // Run cross-validation, and choose the best set of parameters.
-CrossValidatorModel cvModel = crossval.fit(training);
+CrossValidatorModel cvModel = cv.fit(training);
 
 // Prepare test documents, which are unlabeled.
-List<Document> localTest = Arrays.asList(
+DataFrame test = sqlContext.createDataFrame(Arrays.asList(
   new Document(4L, "spark i j k"),
   new Document(5L, "l m n"),
   new Document(6L, "mapreduce spark"),
-  new Document(7L, "apache hadoop"));
-DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), Document.class);
+  new Document(7L, "apache hadoop")
+), Document.class);
 
 // Make predictions on test documents. cvModel uses the best model found (lrModel).
 DataFrame predictions = cvModel.transform(test);
@@ -893,7 +829,6 @@ for (Row r: predictions.select("id", "text", "probability", "prediction").collec
       + ", prediction=" + r.get(3));
 }
 
-jsc.stop();
 {% endhighlight %}
 </div>
 
@@ -935,7 +870,7 @@ val lr = new LinearRegression()
 // the evaluator.
 val paramGrid = new ParamGridBuilder()
   .addGrid(lr.regParam, Array(0.1, 0.01))
-  .addGrid(lr.fitIntercept, Array(true, false))
+  .addGrid(lr.fitIntercept)
   .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
   .build()
 
@@ -945,9 +880,8 @@ val trainValidationSplit = new TrainValidationSplit()
   .setEstimator(lr)
   .setEvaluator(new RegressionEvaluator)
   .setEstimatorParamMaps(paramGrid)
-
-// 80% of the data will be used for training and the remaining 20% for validation.
-trainValidationSplit.setTrainRatio(0.8)
+  // 80% of the data will be used for training and the remaining 20% for validation.
+  .setTrainRatio(0.8)
 
 // Run train validation split, and choose the best set of parameters.
 val model = trainValidationSplit.fit(training)
@@ -972,12 +906,12 @@ import org.apache.spark.mllib.util.MLUtils;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.DataFrame;
 
-DataFrame data = jsql.createDataFrame(
+DataFrame data = sqlContext.createDataFrame(
   MLUtils.loadLibSVMFile(jsc.sc(), "data/mllib/sample_libsvm_data.txt"),
   LabeledPoint.class);
 
 // Prepare training and test data.
-DataFrame[] splits = data.randomSplit(new double [] {0.9, 0.1}, 12345);
+DataFrame[] splits = data.randomSplit(new double[] {0.9, 0.1}, 12345);
 DataFrame training = splits[0];
 DataFrame test = splits[1];
 
@@ -997,10 +931,8 @@ ParamMap[] paramGrid = new ParamGridBuilder()
 TrainValidationSplit trainValidationSplit = new TrainValidationSplit()
   .setEstimator(lr)
   .setEvaluator(new RegressionEvaluator())
-  .setEstimatorParamMaps(paramGrid);
-
-// 80% of the data will be used for training and the remaining 20% for validation.
-trainValidationSplit.setTrainRatio(0.8);
+  .setEstimatorParamMaps(paramGrid)
+  .setTrainRatio(0.8); // 80% for training and the remaining 20% for validation
 
 // Run train validation split, and choose the best set of parameters.
 TrainValidationSplitModel model = trainValidationSplit.fit(training);