diff --git a/docs/mllib-data-types.md b/docs/mllib-data-types.md index 7dd3c97a83e4d6ea883baac68b7d9aef5b9b8ce8..35cee3275e3b54f5b2b5d69e5878d15aeef62d8f 100644 --- a/docs/mllib-data-types.md +++ b/docs/mllib-data-types.md @@ -104,7 +104,7 @@ dv2 = [1.0, 0.0, 3.0] # Create a SparseVector. sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0]) # Use a single-column SciPy csc_matrix as a sparse vector. -sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape = (3, 1)) +sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape=(3, 1)) {% endhighlight %} </div> @@ -517,12 +517,12 @@ from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix # Create an RDD of indexed rows. # - This can be done explicitly with the IndexedRow class: -indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]), - IndexedRow(1, [4, 5, 6]), - IndexedRow(2, [7, 8, 9]), +indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]), + IndexedRow(1, [4, 5, 6]), + IndexedRow(2, [7, 8, 9]), IndexedRow(3, [10, 11, 12])]) # - or by using (long, vector) tuples: -indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]), +indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]), (2, [7, 8, 9]), (3, [10, 11, 12])]) # Create an IndexedRowMatrix from an RDD of IndexedRows. @@ -731,15 +731,15 @@ from pyspark.mllib.linalg import Matrices from pyspark.mllib.linalg.distributed import BlockMatrix # Create an RDD of sub-matrix blocks. -blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), +blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) # Create a BlockMatrix from an RDD of sub-matrix blocks. mat = BlockMatrix(blocks, 3, 2) # Get its size. -m = mat.numRows() # 6 -n = mat.numCols() # 2 +m = mat.numRows() # 6 +n = mat.numCols() # 2 # Get the blocks as an RDD of sub-matrix blocks. blocksRDD = mat.blocks diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 40287d7702bd5745c01059278f39ff45d4c72ad2..74d5ee1ca6b3fac7428da150b6f52d75d4da653b 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -445,7 +445,7 @@ Similarly to text files, SequenceFiles can be saved and loaded by specifying the classes can be specified, but for standard Writables this is not required. {% highlight python %} ->>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) +>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x)) >>> rdd.saveAsSequenceFile("path/to/file") >>> sorted(sc.sequenceFile("path/to/file").collect()) [(1, u'a'), (2, u'aa'), (3, u'aaa')] @@ -459,10 +459,12 @@ Elasticsearch ESInputFormat: {% highlight python %} $ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark ->>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults ->>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\ - "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) ->>> rdd.first() # the result is a MapWritable that is converted to a Python dict +>>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults +>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", + "org.apache.hadoop.io.NullWritable", + "org.elasticsearch.hadoop.mr.LinkedMapWritable", + conf=conf) +>>> rdd.first() # the result is a MapWritable that is converted to a Python dict (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', @@ -797,7 +799,6 @@ def increment_counter(x): rdd.foreach(increment_counter) print("Counter value: ", counter) - {% endhighlight %} </div> @@ -1455,13 +1456,14 @@ The code below shows an accumulator being used to add up the elements of an arra {% highlight python %} >>> accum = sc.accumulator(0) +>>> accum Accumulator<id=0, value=0> >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s -scala> accum.value +>>> accum.value 10 {% endhighlight %} diff --git a/docs/quick-start.md b/docs/quick-start.md index a29e28faf242d17bf3a0a051762f4de1d2d861ea..2eab8d19aa4c6d042e94f0f258336a48b6f32c1b 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -74,10 +74,10 @@ Spark's primary abstraction is a distributed collection of items called a Resili RDDs have _[actions](programming-guide.html#actions)_, which return values, and _[transformations](programming-guide.html#transformations)_, which return pointers to new RDDs. Let's start with a few actions: {% highlight python %} ->>> textFile.count() # Number of items in this RDD +>>> textFile.count() # Number of items in this RDD 126 ->>> textFile.first() # First item in this RDD +>>> textFile.first() # First item in this RDD u'# Apache Spark' {% endhighlight %} @@ -90,7 +90,7 @@ Now let's use a transformation. We will use the [`filter`](programming-guide.htm We can chain together transformations and actions: {% highlight python %} ->>> textFile.filter(lambda line: "Spark" in line).count() # How many lines contain "Spark"? +>>> textFile.filter(lambda line: "Spark" in line).count() # How many lines contain "Spark"? 15 {% endhighlight %} diff --git a/docs/streaming-kafka-0-8-integration.md b/docs/streaming-kafka-0-8-integration.md index f8f7b95cf745850fc89388369b96b7b47e73d341..d3fc9adfcf3ce4239b2164042f4e7b71fc09099a 100644 --- a/docs/streaming-kafka-0-8-integration.md +++ b/docs/streaming-kafka-0-8-integration.md @@ -195,8 +195,8 @@ Next, we discuss how to use this approach in your streaming application. for o in offsetRanges: print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset) - directKafkaStream\ - .transform(storeOffsetRanges)\ + directKafkaStream \ + .transform(storeOffsetRanges) \ .foreachRDD(printOffsetRanges) </div> </div> diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 82d36474ff4bfbf720f73c4e6d75ff38decc7d9a..c0e4f3b35afa59f4f8e8593bd5949dd8469a6b16 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -930,7 +930,7 @@ JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform( <div data-lang="python" markdown="1"> {% highlight python %} -spamInfoRDD = sc.pickleFile(...) # RDD containing spam information +spamInfoRDD = sc.pickleFile(...) # RDD containing spam information # join data stream with spam information to do data cleaning cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...)) @@ -1495,16 +1495,15 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_ </div> <div data-lang="python" markdown="1"> {% highlight python %} - def getWordBlacklist(sparkContext): - if ('wordBlacklist' not in globals()): - globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"]) - return globals()['wordBlacklist'] + if ("wordBlacklist" not in globals()): + globals()["wordBlacklist"] = sparkContext.broadcast(["a", "b", "c"]) + return globals()["wordBlacklist"] def getDroppedWordsCounter(sparkContext): - if ('droppedWordsCounter' not in globals()): - globals()['droppedWordsCounter'] = sparkContext.accumulator(0) - return globals()['droppedWordsCounter'] + if ("droppedWordsCounter" not in globals()): + globals()["droppedWordsCounter"] = sparkContext.accumulator(0) + return globals()["droppedWordsCounter"] def echo(time, rdd): # Get or register the blacklist Broadcast @@ -1626,12 +1625,12 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_ # Lazily instantiated global instance of SparkSession def getSparkSessionInstance(sparkConf): - if ('sparkSessionSingletonInstance' not in globals()): - globals()['sparkSessionSingletonInstance'] = SparkSession\ - .builder\ - .config(conf=sparkConf)\ + if ("sparkSessionSingletonInstance" not in globals()): + globals()["sparkSessionSingletonInstance"] = SparkSession \ + .builder \ + .config(conf=sparkConf) \ .getOrCreate() - return globals()['sparkSessionSingletonInstance'] + return globals()["sparkSessionSingletonInstance"] ... @@ -1829,11 +1828,11 @@ This behavior is made simple by using `StreamingContext.getOrCreate`. This is us {% highlight python %} # Function to create and setup a new StreamingContext def functionToCreateContext(): - sc = SparkContext(...) # new context - ssc = new StreamingContext(...) - lines = ssc.socketTextStream(...) # create DStreams + sc = SparkContext(...) # new context + ssc = StreamingContext(...) + lines = ssc.socketTextStream(...) # create DStreams ... - ssc.checkpoint(checkpointDirectory) # set checkpoint directory + ssc.checkpoint(checkpointDirectory) # set checkpoint directory return ssc # Get StreamingContext from checkpoint data or create a new one diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 8a88e06ebde56064b648b2de326b088104b578ae..cdc3975d7cb7c4f837fc48056469a4f4c65b140c 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -59,9 +59,9 @@ from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split -spark = SparkSession\ - .builder()\ - .appName("StructuredNetworkWordCount")\ +spark = SparkSession \ + .builder() \ + .appName("StructuredNetworkWordCount") \ .getOrCreate() {% endhighlight %} @@ -124,22 +124,22 @@ This `lines` DataFrame represents an unbounded table containing the streaming te {% highlight python %} # Create DataFrame representing the stream of input lines from connection to localhost:9999 -lines = spark\ - .readStream\ - .format('socket')\ - .option('host', 'localhost')\ - .option('port', 9999)\ +lines = spark \ + .readStream \ + .format("socket") \ + .option("host", "localhost") \ + .option("port", 9999) \ .load() # Split the lines into words words = lines.select( explode( - split(lines.value, ' ') - ).alias('word') + split(lines.value, " ") + ).alias("word") ) # Generate running word count -wordCounts = words.groupBy('word').count() +wordCounts = words.groupBy("word").count() {% endhighlight %} This `lines` DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have used two built-in SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we use the function `alias` to name the new column as "word". Finally, we have defined the `wordCounts` DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream. @@ -180,10 +180,10 @@ query.awaitTermination(); {% highlight python %} # Start running the query that prints the running counts to the console -query = wordCounts\ - .writeStream\ - .outputMode('complete')\ - .format('console')\ +query = wordCounts \ + .writeStream \ + .outputMode("complete") \ + .format("console") \ .start() query.awaitTermination() @@ -488,7 +488,7 @@ spark = SparkSession. ... # Read text from socket socketDF = spark \ - .readStream() \ + .readStream() \ .format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ @@ -504,7 +504,7 @@ csvDF = spark \ .readStream() \ .option("sep", ";") \ .schema(userSchema) \ - .csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory") + .csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory") {% endhighlight %} </div> @@ -596,8 +596,7 @@ ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API <div data-lang="python" markdown="1"> {% highlight python %} - -df = ... # streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType } +df = ... # streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType } # Select the devices which have signal more than 10 df.select("device").where("signal > 10") @@ -653,11 +652,11 @@ Dataset<Row> windowedCounts = words.groupBy( </div> <div data-lang="python" markdown="1"> {% highlight python %} -words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String } +words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String } # Group the data by window and word and compute the count of each group windowedCounts = words.groupBy( - window(words.timestamp, '10 minutes', '5 minutes'), + window(words.timestamp, "10 minutes", "5 minutes"), words.word ).count() {% endhighlight %} @@ -704,7 +703,7 @@ streamingDf.join(staticDf, "type", "right_join"); // right outer join with a st {% highlight python %} staticDf = spark.read. ... streamingDf = spark.readStream. ... -streamingDf.join(staticDf, "type") # inner equi-join with a static DF +streamingDf.join(staticDf, "type") # inner equi-join with a static DF streamingDf.join(staticDf, "type", "right_join") # right outer join with a static DF {% endhighlight %} @@ -907,25 +906,25 @@ spark.sql("select * from aggregates").show(); // interactively query in-memory noAggDF = deviceDataDf.select("device").where("signal > 10") # Print new data to console -noAggDF\ - .writeStream()\ - .format("console")\ +noAggDF \ + .writeStream() \ + .format("console") \ .start() # Write new data to Parquet files -noAggDF\ - .writeStream()\ - .parquet("path/to/destination/directory")\ +noAggDF \ + .writeStream() \ + .parquet("path/to/destination/directory") \ .start() # ========== DF with aggregation ========== aggDF = df.groupBy("device").count() # Print updated aggregations to console -aggDF\ - .writeStream()\ - .outputMode("complete")\ - .format("console")\ +aggDF \ + .writeStream() \ + .outputMode("complete") \ + .format("console") \ .start() # Have all the aggregates in an in memory table. The query name will be the table name @@ -1072,11 +1071,11 @@ spark.streams().awaitAnyTermination(); // block until any one of them terminat {% highlight python %} spark = ... # spark session -spark.streams().active # get the list of currently active streaming queries +spark.streams().active # get the list of currently active streaming queries -spark.streams().get(id) # get a query object by its unique id +spark.streams().get(id) # get a query object by its unique id -spark.streams().awaitAnyTermination() # block until any one of them terminates +spark.streams().awaitAnyTermination() # block until any one of them terminates {% endhighlight %} </div> @@ -1116,11 +1115,11 @@ aggDF <div data-lang="python" markdown="1"> {% highlight python %} -aggDF\ - .writeStream()\ - .outputMode("complete")\ - .option("checkpointLocation", "path/to/HDFS/dir")\ - .format("memory")\ +aggDF \ + .writeStream() \ + .outputMode("complete") \ + .option("checkpointLocation", "path/to/HDFS/dir") \ + .format("memory") \ .start() {% endhighlight %}