Skip to content
Snippets Groups Projects
  • hyukjinkwon's avatar
    b0e5840d
    [SPARK-19134][EXAMPLE] Fix several sql, mllib and status api examples not working · b0e5840d
    hyukjinkwon authored
    ## What changes were proposed in this pull request?
    
    **binary_classification_metrics_example.py**
    
    LibSVM datasource loads `ml.linalg.SparseVector` whereas the example requires it to be `mllib.linalg.SparseVector`.  For the equivalent Scala exmaple, `BinaryClassificationMetricsExample.scala` seems fine.
    
    ```
    ./bin/spark-submit examples/src/main/python/mllib/binary_classification_metrics_example.py
    ```
    
    ```
      File ".../spark/examples/src/main/python/mllib/binary_classification_metrics_example.py", line 39, in <lambda>
        .rdd.map(lambda row: LabeledPoint(row[0], row[1]))
      File ".../spark/python/pyspark/mllib/regression.py", line 54, in __init__
        self.features = _convert_to_vector(features)
      File ".../spark/python/pyspark/mllib/linalg/__init__.py", line 80, in _convert_to_vector
        raise TypeError("Cannot convert type %s into Vector" % type(l))
    TypeError: Cannot convert type <class 'pyspark.ml.linalg.SparseVector'> into Vector
    ```
    
    **status_api_demo.py** (this one does not work on Python 3.4.6)
    
    It's `queue` in Python 3+.
    
    ```
    PYSPARK_PYTHON=python3 ./bin/spark-submit examples/src/main/python/status_api_demo.py
    ```
    
    ```
    Traceback (most recent call last):
      File ".../spark/examples/src/main/python/status_api_demo.py", line 22, in <module>
        import Queue
    ImportError: No module named 'Queue'
    ```
    
    **bisecting_k_means_example.py**
    
    `BisectingKMeansModel` does not implement `save` and `load` in Python.
    
    ```bash
    ./bin/spark-submit examples/src/main/python/mllib/bisecting_k_means_example.py
    ```
    
    ```
    Traceback (most recent call last):
      File ".../spark/examples/src/main/python/mllib/bisecting_k_means_example.py", line 46, in <module>
        model.save(sc, path)
    AttributeError: 'BisectingKMeansModel' object has no attribute 'save'
    ```
    
    **elementwise_product_example.py**
    
    It calls `collect` from the vector.
    
    ```bash
    ./bin/spark-submit examples/src/main/python/mllib/elementwise_product_example.py
    ```
    
    ```
    Traceback (most recent call last):
      File ".../spark/examples/src/main/python/mllib/elementwise_product_example.py", line 48, in <module>
        for each in transformedData2.collect():
      File ".../spark/python/pyspark/mllib/linalg/__init__.py", line 478, in __getattr__
        return getattr(self.array, item)
    AttributeError: 'numpy.ndarray' object has no attribute 'collect'
    ```
    
    **These three tests look throwing an exception for a relative path set in `spark.sql.warehouse.dir`.**
    
    **hive.py**
    
    ```
    ./bin/spark-submit examples/src/main/python/sql/hive.py
    ```
    
    ```
    Traceback (most recent call last):
      File ".../spark/examples/src/main/python/sql/hive.py", line 47, in <module>
        spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
      File ".../spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 541, in sql
      File ".../spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
      File ".../spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    pyspark.sql.utils.AnalysisException: 'org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:./spark-warehouse);'
    ```
    
    **SparkHiveExample.scala**
    
    ```
    ./bin/run-example sql.hive.SparkHiveExample
    ```
    
    ```
    Exception in thread "main" org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table. java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:./spark-warehouse
    	at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:498)
    	at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:484)
    	at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1668)
    ```
    
    **JavaSparkHiveExample.java**
    
    ```
    ./bin/run-example sql.hive.JavaSparkHiveExample
    ```
    
    ```
    Exception in thread "main" org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table. java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:./spark-warehouse
    	at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:498)
    	at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:484)
    	at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1668)
    ```
    
    ## How was this patch tested?
    
    Manually via
    
    ```
    ./bin/spark-submit examples/src/main/python/mllib/binary_classification_metrics_example.py
    ```
    
    ```
    PYSPARK_PYTHON=python3 ./bin/spark-submit examples/src/main/python/status_api_demo.py
    ```
    
    ```
    ./bin/spark-submit examples/src/main/python/mllib/bisecting_k_means_example.py
    ```
    
    ```
    ./bin/spark-submit examples/src/main/python/mllib/elementwise_product_example.py
    ```
    
    ```
    ./bin/spark-submit examples/src/main/python/sql/hive.py
    ```
    
    ```
    ./bin/run-example sql.hive.JavaSparkHiveExample
    ```
    
    ```
    ./bin/run-example sql.hive.SparkHiveExample
    ```
    
    These were found via
    
    ```bash
    find ./examples/src/main/python -name "*.py" -exec spark-submit {} \;
    ```
    
    Author: hyukjinkwon <gurwls223@gmail.com>
    
    Closes #16515 from HyukjinKwon/minor-example-fix.
    b0e5840d
    History
    [SPARK-19134][EXAMPLE] Fix several sql, mllib and status api examples not working
    hyukjinkwon authored
    ## What changes were proposed in this pull request?
    
    **binary_classification_metrics_example.py**
    
    LibSVM datasource loads `ml.linalg.SparseVector` whereas the example requires it to be `mllib.linalg.SparseVector`.  For the equivalent Scala exmaple, `BinaryClassificationMetricsExample.scala` seems fine.
    
    ```
    ./bin/spark-submit examples/src/main/python/mllib/binary_classification_metrics_example.py
    ```
    
    ```
      File ".../spark/examples/src/main/python/mllib/binary_classification_metrics_example.py", line 39, in <lambda>
        .rdd.map(lambda row: LabeledPoint(row[0], row[1]))
      File ".../spark/python/pyspark/mllib/regression.py", line 54, in __init__
        self.features = _convert_to_vector(features)
      File ".../spark/python/pyspark/mllib/linalg/__init__.py", line 80, in _convert_to_vector
        raise TypeError("Cannot convert type %s into Vector" % type(l))
    TypeError: Cannot convert type <class 'pyspark.ml.linalg.SparseVector'> into Vector
    ```
    
    **status_api_demo.py** (this one does not work on Python 3.4.6)
    
    It's `queue` in Python 3+.
    
    ```
    PYSPARK_PYTHON=python3 ./bin/spark-submit examples/src/main/python/status_api_demo.py
    ```
    
    ```
    Traceback (most recent call last):
      File ".../spark/examples/src/main/python/status_api_demo.py", line 22, in <module>
        import Queue
    ImportError: No module named 'Queue'
    ```
    
    **bisecting_k_means_example.py**
    
    `BisectingKMeansModel` does not implement `save` and `load` in Python.
    
    ```bash
    ./bin/spark-submit examples/src/main/python/mllib/bisecting_k_means_example.py
    ```
    
    ```
    Traceback (most recent call last):
      File ".../spark/examples/src/main/python/mllib/bisecting_k_means_example.py", line 46, in <module>
        model.save(sc, path)
    AttributeError: 'BisectingKMeansModel' object has no attribute 'save'
    ```
    
    **elementwise_product_example.py**
    
    It calls `collect` from the vector.
    
    ```bash
    ./bin/spark-submit examples/src/main/python/mllib/elementwise_product_example.py
    ```
    
    ```
    Traceback (most recent call last):
      File ".../spark/examples/src/main/python/mllib/elementwise_product_example.py", line 48, in <module>
        for each in transformedData2.collect():
      File ".../spark/python/pyspark/mllib/linalg/__init__.py", line 478, in __getattr__
        return getattr(self.array, item)
    AttributeError: 'numpy.ndarray' object has no attribute 'collect'
    ```
    
    **These three tests look throwing an exception for a relative path set in `spark.sql.warehouse.dir`.**
    
    **hive.py**
    
    ```
    ./bin/spark-submit examples/src/main/python/sql/hive.py
    ```
    
    ```
    Traceback (most recent call last):
      File ".../spark/examples/src/main/python/sql/hive.py", line 47, in <module>
        spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
      File ".../spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 541, in sql
      File ".../spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
      File ".../spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    pyspark.sql.utils.AnalysisException: 'org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:./spark-warehouse);'
    ```
    
    **SparkHiveExample.scala**
    
    ```
    ./bin/run-example sql.hive.SparkHiveExample
    ```
    
    ```
    Exception in thread "main" org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table. java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:./spark-warehouse
    	at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:498)
    	at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:484)
    	at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1668)
    ```
    
    **JavaSparkHiveExample.java**
    
    ```
    ./bin/run-example sql.hive.JavaSparkHiveExample
    ```
    
    ```
    Exception in thread "main" org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table. java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:./spark-warehouse
    	at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:498)
    	at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:484)
    	at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1668)
    ```
    
    ## How was this patch tested?
    
    Manually via
    
    ```
    ./bin/spark-submit examples/src/main/python/mllib/binary_classification_metrics_example.py
    ```
    
    ```
    PYSPARK_PYTHON=python3 ./bin/spark-submit examples/src/main/python/status_api_demo.py
    ```
    
    ```
    ./bin/spark-submit examples/src/main/python/mllib/bisecting_k_means_example.py
    ```
    
    ```
    ./bin/spark-submit examples/src/main/python/mllib/elementwise_product_example.py
    ```
    
    ```
    ./bin/spark-submit examples/src/main/python/sql/hive.py
    ```
    
    ```
    ./bin/run-example sql.hive.JavaSparkHiveExample
    ```
    
    ```
    ./bin/run-example sql.hive.SparkHiveExample
    ```
    
    These were found via
    
    ```bash
    find ./examples/src/main/python -name "*.py" -exec spark-submit {} \;
    ```
    
    Author: hyukjinkwon <gurwls223@gmail.com>
    
    Closes #16515 from HyukjinKwon/minor-example-fix.
status_api_demo.py 2.21 KiB
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import print_function

import time
import threading
import sys
if sys.version >= '3':
    import queue as Queue
else:
    import Queue

from pyspark import SparkConf, SparkContext


def delayed(seconds):
    def f(x):
        time.sleep(seconds)
        return x
    return f


def call_in_background(f, *args):
    result = Queue.Queue(1)
    t = threading.Thread(target=lambda: result.put(f(*args)))
    t.daemon = True
    t.start()
    return result


def main():
    conf = SparkConf().set("spark.ui.showConsoleProgress", "false")
    sc = SparkContext(appName="PythonStatusAPIDemo", conf=conf)

    def run():
        rdd = sc.parallelize(range(10), 10).map(delayed(2))
        reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
        return reduced.map(delayed(2)).collect()

    result = call_in_background(run)
    status = sc.statusTracker()
    while result.empty():
        ids = status.getJobIdsForGroup()
        for id in ids:
            job = status.getJobInfo(id)
            print("Job", id, "status: ", job.status)
            for sid in job.stageIds:
                info = status.getStageInfo(sid)
                if info:
                    print("Stage %d: %d tasks total (%d active, %d complete)" %
                          (sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks))
        time.sleep(1)

    print("Job results are:", result.get())
    sc.stop()

if __name__ == "__main__":
    main()