Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
spark
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
cs525-sp18-g07
spark
Commits
5fcd2a61
Commit
5fcd2a61
authored
11 years ago
by
Ankur Dave
Browse files
Options
Downloads
Patches
Plain Diff
Finish cleaning up Graph docs
parent
4c114a75
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+82
-98
82 additions, 98 deletions
graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
with
82 additions
and
98 deletions
graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+
82
−
98
View file @
5fcd2a61
...
...
@@ -15,8 +15,8 @@ import org.apache.spark.storage.StorageLevel
* RDDs, the graph is a functional data-structure in which mutating
* operations return new graphs.
*
* @note [[
org.apache.spark.graphx.
GraphOps]] contains additional convenience operations.
* [[
org.apache.spark.graphx.
algorithms.Algorithms]] contains graph algorithms; to access these,
* @note [[GraphOps]] contains additional convenience operations.
* [[algorithms.Algorithms]] contains graph algorithms; to access these,
* import `org.apache.spark.graphx.algorithms._`.
*
* @tparam VD the vertex attribute type
...
...
@@ -25,32 +25,31 @@ import org.apache.spark.storage.StorageLevel
abstract
class
Graph
[
VD:
ClassTag
,
ED:
ClassTag
]
{
/**
*
Get
the vertices and their
data
.
*
An RDD containing
the vertices and their
associated attributes
.
*
* @note vertex ids are unique.
* @return
A
n RDD containing the vertices in this graph
* @return
a
n RDD containing the vertices in this graph
*/
val
vertices
:
VertexRDD
[
VD
]
/**
* Get the Edges and their data as an RDD. The entries in the RDD
* contain just the source id and target id along with the edge
* data.
* An RDD containing the edges and their associated attributes. The entries in the RDD contain
* just the source id and target id along with the edge data.
*
* @return
A
n RDD containing the edges in this graph
* @return
a
n RDD containing the edges in this graph
*
* @see [[
org.apache.spark.graphx.
Edge]] for the edge type.
* @see
`
triplets
`
to get an RDD which contains all the edges
* @see [[Edge]] for the edge type.
* @see
[[
triplets
]]
to get an RDD which contains all the edges
* along with their vertex data.
*
*/
val
edges
:
EdgeRDD
[
ED
]
/**
*
Get the edges
with the vertex data associated with
the adjacent
*
pair of
vertices.
*
An RDD containing the edge triplets, which are edges along
with the vertex data associated with
*
the adjacent
vertices.
*
* @return
A
n RDD containing edge triplets
.
* @return
a
n RDD containing edge triplets
*
* @example This operation might be used to evaluate a graph
* coloring where we would like to check that both vertices are a
...
...
@@ -68,7 +67,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
val
triplets
:
RDD
[
EdgeTriplet
[
VD
,
ED
]]
/**
* Cache the vertices and edges associated with this graph.
* Cache
s
the vertices and edges associated with this graph
at the specified storage level
.
*
* @param newLevel the level at which to cache the graph.
...
...
@@ -78,14 +77,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
def
persist
(
newLevel
:
StorageLevel
=
StorageLevel
.
MEMORY_ONLY
)
:
Graph
[
VD
,
ED
]
/**
*
Return a graph that is cached when first created
. This is used to
*
Caches the vertices and edges associated with this graph
. This is used to
* pin a graph in memory enabling multiple queries to reuse the same
* construction process.
*/
def
cache
()
:
Graph
[
VD
,
ED
]
/**
* Uncache only the vertices of this graph, leaving the edges alone. This is useful in iterative
* Uncache
s
only the vertices of this graph, leaving the edges alone. This is useful in iterative
* algorithms that modify the vertex attributes but reuse the edges. This method can be used to
* uncache the vertex attributes of previous iterations once they are no longer needed, improving
* GC performance.
...
...
@@ -93,23 +92,22 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
def
unpersistVertices
(
blocking
:
Boolean
=
true
)
:
Graph
[
VD
,
ED
]
/**
* Repartition the edges in the graph according to `partitionStrategy`.
* Repartition
s
the edges in the graph according to `partitionStrategy`.
*/
def
partitionBy
(
partitionStrategy
:
PartitionStrategy
)
:
Graph
[
VD
,
ED
]
/**
* Compute statistics describing the graph representation.
* Compute
s
statistics describing the graph representation.
*/
def
statistics
:
Map
[
String
,
Any
]
/**
* Construct a new graph where each vertex value has been
* transformed by the map function.
* Transforms each vertex attribute in the graph using the map function.
*
* @note The new graph has the same structure. As a consequence the underlying index structures
* can be reused.
*
* @param map the function from a vertex object to a new vertex value
.
* @param map the function from a vertex object to a new vertex value
*
* @tparam VD2 the new vertex data type
*
...
...
@@ -125,10 +123,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
def
mapVertices
[
VD2:
ClassTag
](
map
:
(
VertexID
,
VD
)
=>
VD2
)
:
Graph
[
VD2
,
ED
]
/**
* Construct a new graph where the value of each edge is
* transformed by the map operation. This function is not passed
* the vertex value for the vertices adjacent to the edge. If
* vertex values are desired, use the `mapTriplets` method.
* Transforms each edge attribute in the graph using the map function. The map function is not
* passed the vertex value for the vertices adjacent to the edge. If vertex values are desired,
* use [[mapTriplets]].
*
* @note This graph is not changed and that the new graph has the
* same structure. As a consequence the underlying index structures
...
...
@@ -147,19 +144,19 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
}
/**
*
Construct a new graph transforming the value of each edge using the user defined iterator
* t
ransform. The iterator transform
is given an iterator over edges within a logical partition
*
as well as
the partition's ID, and it should return a new iterator over the new values of each
*
edge. The
new iterator's elements must correspond one-to-one with the old iterator's
*
elements. If
adjacent vertex values are desired, use
the `
mapTriplets
` method
.
*
Transforms each edge attribute using the map function, passing it a whole partition at a
* t
ime. The map function
is given an iterator over edges within a logical partition
as well as
* the partition's ID, and it should return a new iterator over the new values of each
edge. The
* new iterator's elements must correspond one-to-one with the old iterator's
elements. If
* adjacent vertex values are desired, use
[[
mapTriplets
]]
.
*
* @note This does not change the structure of the
* graph or modify the values of this graph. As a consequence
* the underlying index structures can be reused.
*
* @param map
the
function
which
takes a partition id and an iterator
* over all the edges in the partition and must return an iterator over
* the new values for each edge in the order of the input iterator
.
* @param map
a
function
that
takes a partition id and an iterator
* over all the edges in the partition
,
and must return an iterator over
* the new values for each edge in the order of the input iterator
*
* @tparam ED2 the new edge data type
*
...
...
@@ -168,11 +165,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
map
:
(
PartitionID
,
Iterator
[
Edge
[
ED
]])
=>
Iterator
[
ED2
])
:
Graph
[
VD
,
ED2
]
/**
* Construct a new graph where the value of each edge is
* transformed by the map operation. This function passes vertex
* values for the adjacent vertices to the map function. If
* adjacent vertex values are not required, consider using the
* `mapEdges` method instead.
* Transforms each edge attribute using the map function, passing it the adjacent vertex attributes
* as well. If adjacent vertex values are not required, consider using [[mapEdges]] instead.
*
* @note This does not change the structure of the
* graph or modify the values of this graph. As a consequence
...
...
@@ -196,21 +190,17 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
}
/**
* Construct a new graph transforming the value of each edge using
* the user defined iterator transform. The iterator transform is
* given an iterator over edge triplets within a logical partition
* and should yield a new iterator over the new values of each edge
* in the order in which they are provided to the iterator transform
* If adjacent vertex values are not required, consider using the
* mapEdges function instead.
*
* @note This that this does not change the structure of the
* Transforms each edge attribute a partition at a time using the map function, passing it the
* adjacent vertex attributes as well. The map function is given an iterator over edge triplets
* within a logical partition and should yield a new iterator over the new values of each edge in
* the order in which they are provided. If adjacent vertex values are not required, consider
* using [[mapEdges]] instead.
*
* @note This does not change the structure of the
* graph or modify the values of this graph. As a consequence
* the underlying index structures can be reused.
*
* @param map the function which takes a partition id and an iterator
* over all the edges in the partition and must return an iterator over
* the new values for each edge in the order of the input iterator.
* @param map the iterator transform
*
* @tparam ED2 the new edge data type
*
...
...
@@ -220,86 +210,82 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
Graph
[
VD
,
ED2
]
/**
* Construct a new graph with all the edges reversed. If this graph
* contains an edge from a to b then the returned graph contains an
* edge from b to a.
* Reverses all edges in the graph. If this graph contains an edge from a to b then the returned
* graph contains an edge from b to a.
*/
def
reverse
:
Graph
[
VD
,
ED
]
/**
* This function takes a vertex and edge predicate and constructs
* the subgraph that consists of vertices and edges that satisfy the
* predict. The resulting graph contains the vertices and edges
* that satisfy:
* Restricts the graph to only the vertices and edges satisfying the predicates. The resulting
* subgraph satisifies
*
* {{{
* V' = {v : for all v in V where vpred(v)}
* E' = {(u,v): for all (u,v) in E where epred((u,v)) && vpred(u) && vpred(v)}
* }}}
*
* @param epred the edge predicate which takes a triplet and
* @param epred the edge predicate
,
which takes a triplet and
* evaluates to true if the edge is to remain in the subgraph. Note
* that only edges
in which
both vertices satisfy the vertex
* that only edges
where
both vertices satisfy the vertex
* predicate are considered.
*
* @param vpred the vertex predicate which takes a vertex object and
* @param vpred the vertex predicate
,
which takes a vertex object and
* evaluates to true if the vertex is to be included in the subgraph
*
* @return the subgraph containing only the vertices and edges that
* satisfy the predicates
.
* satisfy the predicates
*/
def
subgraph
(
epred
:
EdgeTriplet
[
VD
,
ED
]
=>
Boolean
=
(
x
=>
true
),
vpred
:
(
VertexID
,
VD
)
=>
Boolean
=
((
v
,
d
)
=>
true
)
)
:
Graph
[
VD
,
ED
]
/**
* Subgraph of this graph with only vertices and edges from the other graph.
* Restricts the graph to only the vertices and edges that are also in `other`, but keeps the
* attributes from this graph.
* @param other the graph to project this graph onto
* @return a graph with vertices and edges that exist
s
in both the current graph and other,
* with vertex and edge data from the current graph
.
* @return a graph with vertices and edges that exist in both the current graph and
`
other
`
,
* with vertex and edge data from the current graph
*/
def
mask
[
VD2:
ClassTag
,
ED2:
ClassTag
](
other
:
Graph
[
VD2
,
ED2
])
:
Graph
[
VD
,
ED
]
/**
*
This function m
erges multiple edges between two vertices into a single
E
dge. For correct
*
results, the graph
must have been partitioned using partitionBy.
*
M
erges multiple edges between two vertices into a single
e
dge. For correct
results, the graph
* must have been partitioned using
[[
partitionBy
]]
.
*
* @tparam ED2 the type of the resulting edge data after grouping.
*
* @param f the user
supplied commutative associative function to merge edge attributes for
* @param f the user
-
supplied commutative associative function to merge edge attributes for
* duplicate edges.
*
* @return
Graph[VD,ED2]
The resulting graph with a single
E
dge for each source, dest vertex pair.
* @return The resulting graph with a single
e
dge for each
(
source, dest
)
vertex pair.
*/
def
groupEdges
(
merge
:
(
ED
,
ED
)
=>
ED
)
:
Graph
[
VD
,
ED
]
/**
* The mapReduceTriplets function is used to compute statistics
* about the neighboring edges and vertices of each vertex. The
* user supplied `mapFunc` function is invoked on each edge of the
* graph generating 0 or more "messages" to be "sent" to either
* vertex in the edge. The `reduceFunc` is then used to combine the
* output of the map phase destined to each vertex.
* Computes statistics about the neighboring edges and vertices of each vertex. The user supplied
* `mapFunc` function is invoked on each edge of the graph, generating 0 or more "messages" to be
* "sent" to either vertex in the edge. The `reduceFunc` is then used to combine the output of
* the map phase destined to each vertex.
*
* @tparam A the type of "message" to be sent to each vertex
*
* @param mapFunc the user defined map function which returns 0 or
* more messages to neighboring vertices
.
* more messages to neighboring vertices
*
* @param reduceFunc the user defined reduce function which should
* be commutative and assosciative and is used to combine the output
* of the map phase
.
* of the map phase
*
* @param activeSet optionally, a set of "active" vertices and a direction of edges to consider
* @param activeSet
Opt
optionally, a set of "active" vertices and a direction of edges to consider
* when running `mapFunc`. For example, if the direction is Out, `mapFunc` will only be run on
* edges originating from vertices in the active set.
`
active
S
et
`
must have the same index as
the
* graph's vertices.
* edges originating from vertices in the active set.
The
active
s
et must have the same index as
*
the
graph's vertices.
*
* @example We can use this function to compute the in
D
egree of each
* @example We can use this function to compute the in
-d
egree of each
* vertex
* {{{
* val rawGraph: Graph[(),()] = Graph.textFile("twittergraph")
* val inDeg: RDD[(VertexID, Int)] =
* mapReduceTriplets[Int](et =>
Array
((et.dst.id, 1)), _ + _)
* mapReduceTriplets[Int](et =>
Iterator
((et.dst.id, 1)), _ + _)
* }}}
*
* @note By expressing computation at the edge level we achieve
...
...
@@ -316,10 +302,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
:
VertexRDD
[
A
]
/**
* Join the vertices with an RDD and then apply a function from the
* the vertex and RDD entry to a new vertex value and type. The
* input table should contain at most one entry for each vertex. If
* no entry is provided the map function is invoked passing none.
* Joins the vertices with entries in the `table` RDD and merges the results using `mapFunc`. The
* input table should contain at most one entry for each vertex. If no entry in `table` is
* provided for a particular vertex in the graph, the map function receives `None`.
*
* @tparam U the type of entry in the table of updates
* @tparam VD2 the new vertex value type
...
...
@@ -331,12 +316,11 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
* values. The map function is invoked for all vertices, even those
* that do not have a corresponding entry in the table.
*
* @example This function is used to update the vertices with new
* values based on external data. For example we could add the out
* degree to each vertex record
* @example This function is used to update the vertices with new values based on external data.
* For example we could add the out-degree to each vertex record:
*
* {{{
* val rawGraph: Graph[
(),()
] = Graph.textFile("webgraph")
* val rawGraph: Graph[
_, _
] = Graph.textFile("webgraph")
* val outDeg: RDD[(VertexID, Int)] = rawGraph.outDegrees()
* val graph = rawGraph.outerJoinVertices(outDeg) {
* (vid, data, optDeg) => optDeg.getOrElse(0)
...
...
@@ -364,13 +348,13 @@ object Graph {
/**
* Construct a graph from a collection of edges encoded as vertex id pairs.
*
* @param rawEdges a collection of edges in (src,dst) form
.
* @param rawEdges a collection of edges in (src,
dst) form
* @param uniqueEdges if multiple identical edges are found they are combined and the edge
* attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enable
* uniqueEdges, a [[PartitionStrategy]] must be provided.
*
`
uniqueEdges
`
, a [[PartitionStrategy]] must be provided.
*
* @return a graph with edge attributes containing either the count of duplicate edges or 1
* (if `uniqueEdges
=
None`) and vertex attributes containing the total degree of each vertex.
* (if `uniqueEdges
` is `
None`) and vertex attributes containing the total degree of each vertex.
*/
def
fromEdgeTuples
[
VD:
ClassTag
](
rawEdges
:
RDD
[(
VertexID
,
VertexID
)],
...
...
@@ -400,10 +384,10 @@ object Graph {
}
/**
* Construct a graph from a collection
attributed
vertices and
* edges. Duplicate vertices are picked arbitrarily and
* Construct a graph from a collection
of
vertices and
* edges
with attributes
. Duplicate vertices are picked arbitrarily and
* vertices found in the edge collection but not in the input
* vertices are the default attribute.
* vertices are
assigned
the default attribute.
*
* @tparam VD the vertex attribute type
* @tparam ED the edge attribute type
...
...
@@ -412,7 +396,7 @@ object Graph {
* @param defaultVertexAttr the default vertex attribute to use for
* vertices that are mentioned in edges but not in vertices
* @param partitionStrategy the partition strategy to use when
* partitioning the edges
.
* partitioning the edges
*/
def
apply
[
VD:
ClassTag
,
ED:
ClassTag
](
vertices
:
RDD
[(
VertexID
,
VD
)],
...
...
@@ -422,10 +406,10 @@ object Graph {
}
/**
*
The i
mplicit
graphToGraphOPs function
extracts the GraphOps member from a graph.
*
I
mplicit
ly
extracts the
[[
GraphOps
]]
member from a graph.
*
* To improve modularity the Graph type only contains a small set of basic operations. All the
* convenience operations are defined in the GraphOps class which may be shared across multiple
* convenience operations are defined in the
[[
GraphOps
]]
class which may be shared across multiple
* graph implementations.
*/
implicit
def
graphToGraphOps
[
VD:
ClassTag
,
ED:
ClassTag
](
g
:
Graph
[
VD
,
ED
])
=
g
.
ops
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment