Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
spark
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
cs525-sp18-g07
spark
Commits
eb4b46f8
Commit
eb4b46f8
authored
11 years ago
by
Ankur Dave
Browse files
Options
Downloads
Patches
Plain Diff
Improve docs for GraphOps
parent
9454fa1f
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+25
-53
25 additions, 53 deletions
graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
with
25 additions
and
53 deletions
graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+
25
−
53
View file @
eb4b46f8
...
@@ -8,57 +8,47 @@ import org.apache.spark.SparkException
...
@@ -8,57 +8,47 @@ import org.apache.spark.SparkException
/**
/**
* `GraphOps` contains additional functionality (syntatic sugar) for
* Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
* the graph type and is implicitly constructed for each Graph object.
* efficient GraphX API. This class is implicitly constructed for each Graph object.
* All operations in `GraphOps` are expressed in terms of the
* efficient GraphX API.
*
*
* @tparam VD the vertex attribute type
* @tparam VD the vertex attribute type
* @tparam ED the edge attribute type
* @tparam ED the edge attribute type
*
*/
*/
class
GraphOps
[
VD:
ClassTag
,
ED:
ClassTag
](
graph
:
Graph
[
VD
,
ED
])
{
class
GraphOps
[
VD:
ClassTag
,
ED:
ClassTag
](
graph
:
Graph
[
VD
,
ED
])
{
/**
/** The number of edges in the graph. */
* Compute the number of edges in the graph.
*/
lazy
val
numEdges
:
Long
=
graph
.
edges
.
count
()
lazy
val
numEdges
:
Long
=
graph
.
edges
.
count
()
/**
/** The number of vertices in the graph. */
* Compute the number of vertices in the graph.
*/
lazy
val
numVertices
:
Long
=
graph
.
vertices
.
count
()
lazy
val
numVertices
:
Long
=
graph
.
vertices
.
count
()
/**
/**
* Compute the in-degree of each vertex in the Graph returning an
* The in-degree of each vertex in the graph.
* RDD.
* @note Vertices with no in-edges are not returned in the resulting RDD.
* @note Vertices with no in edges are not returned in the resulting RDD.
*/
*/
lazy
val
inDegrees
:
VertexRDD
[
Int
]
=
degreesRDD
(
EdgeDirection
.
In
)
lazy
val
inDegrees
:
VertexRDD
[
Int
]
=
degreesRDD
(
EdgeDirection
.
In
)
/**
/**
*
Compute t
he out-degree of each vertex in the
G
raph
returning an RDD
.
*
T
he out-degree of each vertex in the
g
raph.
* @note Vertices with no out
edges are not returned in the resulting RDD.
* @note Vertices with no out
-
edges are not returned in the resulting RDD.
*/
*/
lazy
val
outDegrees
:
VertexRDD
[
Int
]
=
degreesRDD
(
EdgeDirection
.
Out
)
lazy
val
outDegrees
:
VertexRDD
[
Int
]
=
degreesRDD
(
EdgeDirection
.
Out
)
/**
/**
* Compute the degrees of each vertex in the Graph returning an RDD.
* The degree of each vertex in the graph.
* @note Vertices with no edges are not returned in the resulting
* @note Vertices with no edges are not returned in the resulting RDD.
* RDD.
*/
*/
lazy
val
degrees
:
VertexRDD
[
Int
]
=
degreesRDD
(
EdgeDirection
.
Both
)
lazy
val
degrees
:
VertexRDD
[
Int
]
=
degreesRDD
(
EdgeDirection
.
Both
)
/**
/**
* Compute the neighboring vertex degrees.
* Compute
s
the neighboring vertex degrees.
*
*
* @param edgeDirection the direction along which to collect
* @param edgeDirection the direction along which to collect neighboring vertex attributes
* neighboring vertex attributes.
*/
*/
private
def
degreesRDD
(
edgeDirection
:
EdgeDirection
)
:
VertexRDD
[
Int
]
=
{
private
def
degreesRDD
(
edgeDirection
:
EdgeDirection
)
:
VertexRDD
[
Int
]
=
{
if
(
edgeDirection
==
EdgeDirection
.
In
)
{
if
(
edgeDirection
==
EdgeDirection
.
In
)
{
...
@@ -70,32 +60,20 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
...
@@ -70,32 +60,20 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
}
}
}
}
/**
/**
* This function is used to compute a statistic for the neighborhood
* Computes a statistic for the neighborhood of each vertex.
* of each vertex and returns a value for all vertices (including
* those without neighbors).
*
* @note Because the a default value is provided all vertices will
* have a corresponding entry in the returned RDD.
*
*
* @param mapFunc the function applied to each edge adjacent to each
* @param mapFunc the function applied to each edge adjacent to each vertex. The mapFunc can
* vertex. The mapFunc can optionally return None in which case it
* optionally return `None`, in which case it does not contribute to the final sum.
* does not contribute to the final sum.
* @param reduceFunc the function used to merge the results of each map operation
* @param reduceFunc the function used to merge the results of each
* @param direction the direction of edges to consider (e.g., In, Out, Both).
* map operation.
* @tparam A the aggregation type
* @param default the default value to use for each vertex if it has
* no neighbors or the map function repeatedly evaluates to none
* @param direction the direction of edges to consider (e.g., In,
* Out, Both).
* @tparam VD2 The returned type of the aggregation operation.
*
*
* @return A Spark.RDD containing tuples of vertex identifiers and
* @return an RDD containing tuples of vertex identifiers and
* their resulting value. There will be exactly one entry for ever
* their resulting value. Vertices with no neighbors will not appear in the RDD.
* vertex in the original graph.
*
*
* @example We can use this function to compute the average follower
* @example We can use this function to compute the average follower
* age for each user
* age for each user
:
*
*
* {{{
* {{{
* val graph: Graph[Int,Int] = loadGraph()
* val graph: Graph[Int,Int] = loadGraph()
...
@@ -107,16 +85,12 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
...
@@ -107,16 +85,12 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
* EdgeDirection.In)
* EdgeDirection.In)
* .mapValues{ case (sum,followers) => sum.toDouble / followers}
* .mapValues{ case (sum,followers) => sum.toDouble / followers}
* }}}
* }}}
*
* @todo Should this return a graph with the new vertex values?
*
*/
*/
def
aggregateNeighbors
[
A:
ClassTag
](
def
aggregateNeighbors
[
A:
ClassTag
](
mapFunc
:
(
VertexID
,
EdgeTriplet
[
VD
,
ED
])
=>
Option
[
A
],
mapFunc
:
(
VertexID
,
EdgeTriplet
[
VD
,
ED
])
=>
Option
[
A
],
reduceFunc
:
(
A
,
A
)
=>
A
,
reduceFunc
:
(
A
,
A
)
=>
A
,
dir
:
EdgeDirection
)
dir
:
EdgeDirection
)
:
VertexRDD
[
A
]
=
{
:
VertexRDD
[
A
]
=
{
// Define a new map function over edge triplets
// Define a new map function over edge triplets
val
mf
=
(
et
:
EdgeTriplet
[
VD
,
ED
])
=>
{
val
mf
=
(
et
:
EdgeTriplet
[
VD
,
ED
])
=>
{
// Compute the message to the dst vertex
// Compute the message to the dst vertex
...
@@ -143,15 +117,14 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
...
@@ -143,15 +117,14 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
/**
/**
*
Return the Ids of
the neighbor
ing
vert
ices
.
*
Collect
the neighbor vert
ex ids for each vertex
.
*
*
* @param edgeDirection the direction along which to collect
* @param edgeDirection the direction along which to collect
* neighboring vertices
* neighboring vertices
*
*
* @return the
vertex
set of neighboring ids for each vertex
.
* @return the set of neighboring ids for each vertex
*/
*/
def
collectNeighborIds
(
edgeDirection
:
EdgeDirection
)
:
def
collectNeighborIds
(
edgeDirection
:
EdgeDirection
)
:
VertexRDD
[
Array
[
VertexID
]]
=
{
VertexRDD
[
Array
[
VertexID
]]
=
{
val
nbrs
=
val
nbrs
=
if
(
edgeDirection
==
EdgeDirection
.
Both
)
{
if
(
edgeDirection
==
EdgeDirection
.
Both
)
{
graph
.
mapReduceTriplets
[
Array
[
VertexID
]](
graph
.
mapReduceTriplets
[
Array
[
VertexID
]](
...
@@ -185,8 +158,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
...
@@ -185,8 +158,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
* @param edgeDirection the direction along which to collect
* @param edgeDirection the direction along which to collect
* neighboring vertices
* neighboring vertices
*
*
* @return the vertex set of neighboring vertex attributes for each
* @return the vertex set of neighboring vertex attributes for each vertex
* vertex.
*/
*/
def
collectNeighbors
(
edgeDirection
:
EdgeDirection
)
:
def
collectNeighbors
(
edgeDirection
:
EdgeDirection
)
:
VertexRDD
[
Array
[(
VertexID
,
VD
)]
]
=
{
VertexRDD
[
Array
[(
VertexID
,
VD
)]
]
=
{
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment