Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
spark
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
cs525-sp18-g07
spark
Commits
76077bf9
Commit
76077bf9
authored
11 years ago
by
Andre Schumacher
Browse files
Options
Downloads
Patches
Plain Diff
Implementing SPARK-838: Add DoubleRDDFunctions methods to PySpark
parent
53b1c306
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
python/pyspark/rdd.py
+59
-1
59 additions, 1 deletion
python/pyspark/rdd.py
python/pyspark/statcounter.py
+109
-0
109 additions, 0 deletions
python/pyspark/statcounter.py
with
168 additions
and
1 deletion
python/pyspark/rdd.py
+
59
−
1
View file @
76077bf9
...
...
@@ -31,6 +31,7 @@ from pyspark.serializers import batched, Batch, dump_pickle, load_pickle, \
read_from_pickle_file
from
pyspark.join
import
python_join
,
python_left_outer_join
,
\
python_right_outer_join
,
python_cogroup
from
pyspark.statcounter
import
StatCounter
from
py4j.java_collections
import
ListConverter
,
MapConverter
...
...
@@ -357,6 +358,63 @@ class RDD(object):
3
"""
return
self
.
mapPartitions
(
lambda
i
:
[
sum
(
1
for
_
in
i
)]).
sum
()
def
stats
(
self
):
"""
Return a L{StatCounter} object that captures the mean, variance
and count of the RDD
'
s elements in one operation.
"""
def
redFunc
(
left_counter
,
right_counter
):
return
left_counter
.
mergeStats
(
right_counter
)
return
self
.
mapPartitions
(
lambda
i
:
[
StatCounter
(
i
)]).
reduce
(
redFunc
)
def
mean
(
self
):
"""
Compute the mean of this RDD
'
s elements.
>>>
sc
.
parallelize
([
1
,
2
,
3
]).
mean
()
2.0
"""
return
self
.
stats
().
mean
()
def
variance
(
self
):
"""
Compute the variance of this RDD
'
s elements.
>>>
sc
.
parallelize
([
1
,
2
,
3
]).
variance
()
0.666
...
"""
return
self
.
stats
().
variance
()
def
stdev
(
self
):
"""
Compute the standard deviation of this RDD
'
s elements.
>>>
sc
.
parallelize
([
1
,
2
,
3
]).
stdev
()
0.816
...
"""
return
self
.
stats
().
stdev
()
def
sampleStdev
(
self
):
"""
Compute the sample standard deviation of this RDD
'
s elements (which corrects for bias in
estimating the standard deviation by dividing by N-1 instead of N).
>>>
sc
.
parallelize
([
1
,
2
,
3
]).
sampleStdev
()
1.0
"""
return
self
.
stats
().
sampleStdev
()
def
sampleVariance
(
self
):
"""
Compute the sample variance of this RDD
'
s elements (which corrects for bias in
estimating the variance by dividing by N-1 instead of N).
>>>
sc
.
parallelize
([
1
,
2
,
3
]).
sampleVariance
()
1.0
"""
return
self
.
stats
().
sampleVariance
()
def
countByValue
(
self
):
"""
...
...
@@ -777,7 +835,7 @@ def _test():
# The small batch size here ensures that we see multiple batches,
# even in these small test examples:
globs
[
'
sc
'
]
=
SparkContext
(
'
local[4]
'
,
'
PythonTest
'
,
batchSize
=
2
)
(
failure_count
,
test_count
)
=
doctest
.
testmod
(
globs
=
globs
)
(
failure_count
,
test_count
)
=
doctest
.
testmod
(
globs
=
globs
,
optionflags
=
doctest
.
ELLIPSIS
)
globs
[
'
sc
'
].
stop
()
if
failure_count
:
exit
(
-
1
)
...
...
This diff is collapsed.
Click to expand it.
python/pyspark/statcounter.py
0 → 100644
+
109
−
0
View file @
76077bf9
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This file is ported from spark/util/StatCounter.scala
import
copy
import
math
class
StatCounter
(
object
):
def
__init__
(
self
,
values
=
[]):
self
.
n
=
0L
# Running count of our values
self
.
mu
=
0.0
# Running mean of our values
self
.
m2
=
0.0
# Running variance numerator (sum of (x - mean)^2)
for
v
in
values
:
self
.
merge
(
v
)
# Add a value into this StatCounter, updating the internal statistics.
def
merge
(
self
,
value
):
delta
=
value
-
self
.
mu
self
.
n
+=
1
self
.
mu
+=
delta
/
self
.
n
self
.
m2
+=
delta
*
(
value
-
self
.
mu
)
return
self
# Merge another StatCounter into this one, adding up the internal statistics.
def
mergeStats
(
self
,
other
):
if
not
isinstance
(
other
,
StatCounter
):
raise
Exception
(
"
Can only merge Statcounters!
"
)
if
other
is
self
:
# reference equality holds
self
.
merge
(
copy
.
deepcopy
(
other
))
# Avoid overwriting fields in a weird order
else
:
if
self
.
n
==
0
:
self
.
mu
=
other
.
mu
self
.
m2
=
other
.
m2
self
.
n
=
other
.
n
elif
other
.
n
!=
0
:
delta
=
other
.
mu
-
self
.
mu
if
other
.
n
*
10
<
self
.
n
:
self
.
mu
=
self
.
mu
+
(
delta
*
other
.
n
)
/
(
self
.
n
+
other
.
n
)
elif
self
.
n
*
10
<
other
.
n
:
self
.
mu
=
other
.
mu
-
(
delta
*
self
.
n
)
/
(
self
.
n
+
other
.
n
)
else
:
self
.
mu
=
(
self
.
mu
*
self
.
n
+
other
.
mu
*
other
.
n
)
/
(
self
.
n
+
other
.
n
)
self
.
m2
+=
other
.
m2
+
(
delta
*
delta
*
self
.
n
*
other
.
n
)
/
(
self
.
n
+
other
.
n
)
self
.
n
+=
other
.
n
return
self
# Clone this StatCounter
def
copy
(
self
):
return
copy
.
deepcopy
(
self
)
def
count
(
self
):
return
self
.
n
def
mean
(
self
):
return
self
.
mu
def
sum
(
self
):
return
self
.
n
*
self
.
mu
# Return the variance of the values.
def
variance
(
self
):
if
self
.
n
==
0
:
return
float
(
'
nan
'
)
else
:
return
self
.
m2
/
self
.
n
#
# Return the sample variance, which corrects for bias in estimating the variance by dividing
# by N-1 instead of N.
#
def
sampleVariance
(
self
):
if
self
.
n
<=
1
:
return
float
(
'
nan
'
)
else
:
return
self
.
m2
/
(
self
.
n
-
1
)
# Return the standard deviation of the values.
def
stdev
(
self
):
return
math
.
sqrt
(
self
.
variance
())
#
# Return the sample standard deviation of the values, which corrects for bias in estimating the
# variance by dividing by N-1 instead of N.
#
def
sampleStdev
(
self
):
return
math
.
sqrt
(
self
.
sampleVariance
())
def
__repr__
(
self
):
return
"
(count: %s, mean: %s, stdev: %s)
"
%
(
self
.
count
(),
self
.
mean
(),
self
.
stdev
())
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment