-
hyukjinkwon authored
## What changes were proposed in this pull request? **binary_classification_metrics_example.py** LibSVM datasource loads `ml.linalg.SparseVector` whereas the example requires it to be `mllib.linalg.SparseVector`. For the equivalent Scala exmaple, `BinaryClassificationMetricsExample.scala` seems fine. ``` ./bin/spark-submit examples/src/main/python/mllib/binary_classification_metrics_example.py ``` ``` File ".../spark/examples/src/main/python/mllib/binary_classification_metrics_example.py", line 39, in <lambda> .rdd.map(lambda row: LabeledPoint(row[0], row[1])) File ".../spark/python/pyspark/mllib/regression.py", line 54, in __init__ self.features = _convert_to_vector(features) File ".../spark/python/pyspark/mllib/linalg/__init__.py", line 80, in _convert_to_vector raise TypeError("Cannot convert type %s into Vector" % type(l)) TypeError: Cannot convert type <class 'pyspark.ml.linalg.SparseVector'> into Vector ``` **status_api_demo.py** (this one does not work on Python 3.4.6) It's `queue` in Python 3+. ``` PYSPARK_PYTHON=python3 ./bin/spark-submit examples/src/main/python/status_api_demo.py ``` ``` Traceback (most recent call last): File ".../spark/examples/src/main/python/status_api_demo.py", line 22, in <module> import Queue ImportError: No module named 'Queue' ``` **bisecting_k_means_example.py** `BisectingKMeansModel` does not implement `save` and `load` in Python. ```bash ./bin/spark-submit examples/src/main/python/mllib/bisecting_k_means_example.py ``` ``` Traceback (most recent call last): File ".../spark/examples/src/main/python/mllib/bisecting_k_means_example.py", line 46, in <module> model.save(sc, path) AttributeError: 'BisectingKMeansModel' object has no attribute 'save' ``` **elementwise_product_example.py** It calls `collect` from the vector. ```bash ./bin/spark-submit examples/src/main/python/mllib/elementwise_product_example.py ``` ``` Traceback (most recent call last): File ".../spark/examples/src/main/python/mllib/elementwise_product_example.py", line 48, in <module> for each in transformedData2.collect(): File ".../spark/python/pyspark/mllib/linalg/__init__.py", line 478, in __getattr__ return getattr(self.array, item) AttributeError: 'numpy.ndarray' object has no attribute 'collect' ``` **These three tests look throwing an exception for a relative path set in `spark.sql.warehouse.dir`.** **hive.py** ``` ./bin/spark-submit examples/src/main/python/sql/hive.py ``` ``` Traceback (most recent call last): File ".../spark/examples/src/main/python/sql/hive.py", line 47, in <module> spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") File ".../spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 541, in sql File ".../spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ File ".../spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco pyspark.sql.utils.AnalysisException: 'org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:./spark-warehouse);' ``` **SparkHiveExample.scala** ``` ./bin/run-example sql.hive.SparkHiveExample ``` ``` Exception in thread "main" org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table. java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:./spark-warehouse at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:498) at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:484) at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1668) ``` **JavaSparkHiveExample.java** ``` ./bin/run-example sql.hive.JavaSparkHiveExample ``` ``` Exception in thread "main" org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table. java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:./spark-warehouse at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:498) at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:484) at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1668) ``` ## How was this patch tested? Manually via ``` ./bin/spark-submit examples/src/main/python/mllib/binary_classification_metrics_example.py ``` ``` PYSPARK_PYTHON=python3 ./bin/spark-submit examples/src/main/python/status_api_demo.py ``` ``` ./bin/spark-submit examples/src/main/python/mllib/bisecting_k_means_example.py ``` ``` ./bin/spark-submit examples/src/main/python/mllib/elementwise_product_example.py ``` ``` ./bin/spark-submit examples/src/main/python/sql/hive.py ``` ``` ./bin/run-example sql.hive.JavaSparkHiveExample ``` ``` ./bin/run-example sql.hive.SparkHiveExample ``` These were found via ```bash find ./examples/src/main/python -name "*.py" -exec spark-submit {} \; ``` Author: hyukjinkwon <gurwls223@gmail.com> Closes #16515 from HyukjinKwon/minor-example-fix.
hyukjinkwon authored## What changes were proposed in this pull request? **binary_classification_metrics_example.py** LibSVM datasource loads `ml.linalg.SparseVector` whereas the example requires it to be `mllib.linalg.SparseVector`. For the equivalent Scala exmaple, `BinaryClassificationMetricsExample.scala` seems fine. ``` ./bin/spark-submit examples/src/main/python/mllib/binary_classification_metrics_example.py ``` ``` File ".../spark/examples/src/main/python/mllib/binary_classification_metrics_example.py", line 39, in <lambda> .rdd.map(lambda row: LabeledPoint(row[0], row[1])) File ".../spark/python/pyspark/mllib/regression.py", line 54, in __init__ self.features = _convert_to_vector(features) File ".../spark/python/pyspark/mllib/linalg/__init__.py", line 80, in _convert_to_vector raise TypeError("Cannot convert type %s into Vector" % type(l)) TypeError: Cannot convert type <class 'pyspark.ml.linalg.SparseVector'> into Vector ``` **status_api_demo.py** (this one does not work on Python 3.4.6) It's `queue` in Python 3+. ``` PYSPARK_PYTHON=python3 ./bin/spark-submit examples/src/main/python/status_api_demo.py ``` ``` Traceback (most recent call last): File ".../spark/examples/src/main/python/status_api_demo.py", line 22, in <module> import Queue ImportError: No module named 'Queue' ``` **bisecting_k_means_example.py** `BisectingKMeansModel` does not implement `save` and `load` in Python. ```bash ./bin/spark-submit examples/src/main/python/mllib/bisecting_k_means_example.py ``` ``` Traceback (most recent call last): File ".../spark/examples/src/main/python/mllib/bisecting_k_means_example.py", line 46, in <module> model.save(sc, path) AttributeError: 'BisectingKMeansModel' object has no attribute 'save' ``` **elementwise_product_example.py** It calls `collect` from the vector. ```bash ./bin/spark-submit examples/src/main/python/mllib/elementwise_product_example.py ``` ``` Traceback (most recent call last): File ".../spark/examples/src/main/python/mllib/elementwise_product_example.py", line 48, in <module> for each in transformedData2.collect(): File ".../spark/python/pyspark/mllib/linalg/__init__.py", line 478, in __getattr__ return getattr(self.array, item) AttributeError: 'numpy.ndarray' object has no attribute 'collect' ``` **These three tests look throwing an exception for a relative path set in `spark.sql.warehouse.dir`.** **hive.py** ``` ./bin/spark-submit examples/src/main/python/sql/hive.py ``` ``` Traceback (most recent call last): File ".../spark/examples/src/main/python/sql/hive.py", line 47, in <module> spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") File ".../spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 541, in sql File ".../spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ File ".../spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco pyspark.sql.utils.AnalysisException: 'org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:./spark-warehouse);' ``` **SparkHiveExample.scala** ``` ./bin/run-example sql.hive.SparkHiveExample ``` ``` Exception in thread "main" org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table. java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:./spark-warehouse at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:498) at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:484) at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1668) ``` **JavaSparkHiveExample.java** ``` ./bin/run-example sql.hive.JavaSparkHiveExample ``` ``` Exception in thread "main" org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table. java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:./spark-warehouse at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:498) at org.apache.hadoop.hive.ql.metadata.Hive.alterTable(Hive.java:484) at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1668) ``` ## How was this patch tested? Manually via ``` ./bin/spark-submit examples/src/main/python/mllib/binary_classification_metrics_example.py ``` ``` PYSPARK_PYTHON=python3 ./bin/spark-submit examples/src/main/python/status_api_demo.py ``` ``` ./bin/spark-submit examples/src/main/python/mllib/bisecting_k_means_example.py ``` ``` ./bin/spark-submit examples/src/main/python/mllib/elementwise_product_example.py ``` ``` ./bin/spark-submit examples/src/main/python/sql/hive.py ``` ``` ./bin/run-example sql.hive.JavaSparkHiveExample ``` ``` ./bin/run-example sql.hive.SparkHiveExample ``` These were found via ```bash find ./examples/src/main/python -name "*.py" -exec spark-submit {} \; ``` Author: hyukjinkwon <gurwls223@gmail.com> Closes #16515 from HyukjinKwon/minor-example-fix.
status_api_demo.py 2.21 KiB
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import print_function
import time
import threading
import sys
if sys.version >= '3':
import queue as Queue
else:
import Queue
from pyspark import SparkConf, SparkContext
def delayed(seconds):
def f(x):
time.sleep(seconds)
return x
return f
def call_in_background(f, *args):
result = Queue.Queue(1)
t = threading.Thread(target=lambda: result.put(f(*args)))
t.daemon = True
t.start()
return result
def main():
conf = SparkConf().set("spark.ui.showConsoleProgress", "false")
sc = SparkContext(appName="PythonStatusAPIDemo", conf=conf)
def run():
rdd = sc.parallelize(range(10), 10).map(delayed(2))
reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
return reduced.map(delayed(2)).collect()
result = call_in_background(run)
status = sc.statusTracker()
while result.empty():
ids = status.getJobIdsForGroup()
for id in ids:
job = status.getJobInfo(id)
print("Job", id, "status: ", job.status)
for sid in job.stageIds:
info = status.getStageInfo(sid)
if info:
print("Stage %d: %d tasks total (%d active, %d complete)" %
(sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks))
time.sleep(1)
print("Job results are:", result.get())
sc.stop()
if __name__ == "__main__":
main()