diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index e0ba9403ba75b8103e64f8134c58154ca8479ac0..2c1b9518a3d16fec628df58ecbabea9d5c34c52d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -207,8 +207,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
    * }}}
    *
    */
-  def mapTriplets[ED2: ClassTag](
-      map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
+  def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
     mapTriplets((pid, iter) => iter.map(map), TripletFields.All)
   }
 
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeActiveness.java b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeActiveness.java
new file mode 100644
index 0000000000000000000000000000000000000000..377ae849f045c910659be57bf1cbf6c35a44e305
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeActiveness.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.impl;
+
+/**
+ * Criteria for filtering edges based on activeness. For internal use only.
+ */
+public enum EdgeActiveness {
+  /** Neither the source vertex nor the destination vertex need be active. */
+  Neither,
+  /** The source vertex must be active. */
+  SrcOnly,
+  /** The destination vertex must be active. */
+  DstOnly,
+  /** Both vertices must be active. */
+  Both,
+  /** At least one vertex must be active. */
+  Either
+}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index 78d8ac24b5271b9bc9102d2c16dc830c204817e6..373af754483746ff04f645c7aa1da5edf87abda1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -64,6 +64,7 @@ class EdgePartition[
     activeSet: Option[VertexSet])
   extends Serializable {
 
+  /** No-arg constructor for serialization. */
   private def this() = this(null, null, null, null, null, null, null, null)
 
   /** Return a new `EdgePartition` with the specified edge data. */
@@ -375,12 +376,7 @@ class EdgePartition[
    * @param sendMsg generates messages to neighboring vertices of an edge
    * @param mergeMsg the combiner applied to messages destined to the same vertex
    * @param tripletFields which triplet fields `sendMsg` uses
-   * @param srcMustBeActive if true, edges will only be considered if their source vertex is in the
-   *   active set
-   * @param dstMustBeActive if true, edges will only be considered if their destination vertex is in
-   *   the active set
-   * @param maySatisfyEither if true, only one vertex need be in the active set for an edge to be
-   *   considered
+   * @param activeness criteria for filtering edges based on activeness
    *
    * @return iterator aggregated messages keyed by the receiving vertex id
    */
@@ -388,9 +384,7 @@ class EdgePartition[
       sendMsg: EdgeContext[VD, ED, A] => Unit,
       mergeMsg: (A, A) => A,
       tripletFields: TripletFields,
-      srcMustBeActive: Boolean,
-      dstMustBeActive: Boolean,
-      maySatisfyEither: Boolean): Iterator[(VertexId, A)] = {
+      activeness: EdgeActiveness): Iterator[(VertexId, A)] = {
     val aggregates = new Array[A](vertexAttrs.length)
     val bitset = new BitSet(vertexAttrs.length)
 
@@ -401,10 +395,13 @@ class EdgePartition[
       val srcId = local2global(localSrcId)
       val localDstId = localDstIds(i)
       val dstId = local2global(localDstId)
-      val srcIsActive = !srcMustBeActive || isActive(srcId)
-      val dstIsActive = !dstMustBeActive || isActive(dstId)
       val edgeIsActive =
-        if (maySatisfyEither) srcIsActive || dstIsActive else srcIsActive && dstIsActive
+        if (activeness == EdgeActiveness.Neither) true
+        else if (activeness == EdgeActiveness.SrcOnly) isActive(srcId)
+        else if (activeness == EdgeActiveness.DstOnly) isActive(dstId)
+        else if (activeness == EdgeActiveness.Both) isActive(srcId) && isActive(dstId)
+        else if (activeness == EdgeActiveness.Either) isActive(srcId) || isActive(dstId)
+        else throw new Exception("unreachable")
       if (edgeIsActive) {
         val srcAttr = if (tripletFields.useSrc) vertexAttrs(localSrcId) else null.asInstanceOf[VD]
         val dstAttr = if (tripletFields.useDst) vertexAttrs(localDstId) else null.asInstanceOf[VD]
@@ -424,12 +421,7 @@ class EdgePartition[
    * @param sendMsg generates messages to neighboring vertices of an edge
    * @param mergeMsg the combiner applied to messages destined to the same vertex
    * @param tripletFields which triplet fields `sendMsg` uses
-   * @param srcMustBeActive if true, edges will only be considered if their source vertex is in the
-   *   active set
-   * @param dstMustBeActive if true, edges will only be considered if their destination vertex is in
-   *   the active set
-   * @param maySatisfyEither if true, only one vertex need be in the active set for an edge to be
-   *   considered
+   * @param activeness criteria for filtering edges based on activeness
    *
    * @return iterator aggregated messages keyed by the receiving vertex id
    */
@@ -437,9 +429,7 @@ class EdgePartition[
       sendMsg: EdgeContext[VD, ED, A] => Unit,
       mergeMsg: (A, A) => A,
       tripletFields: TripletFields,
-      srcMustBeActive: Boolean,
-      dstMustBeActive: Boolean,
-      maySatisfyEither: Boolean): Iterator[(VertexId, A)] = {
+      activeness: EdgeActiveness): Iterator[(VertexId, A)] = {
     val aggregates = new Array[A](vertexAttrs.length)
     val bitset = new BitSet(vertexAttrs.length)
 
@@ -448,8 +438,16 @@ class EdgePartition[
       val clusterSrcId = cluster._1
       val clusterPos = cluster._2
       val clusterLocalSrcId = localSrcIds(clusterPos)
-      val srcIsActive = !srcMustBeActive || isActive(clusterSrcId)
-      if (srcIsActive || maySatisfyEither) {
+
+      val scanCluster =
+        if (activeness == EdgeActiveness.Neither) true
+        else if (activeness == EdgeActiveness.SrcOnly) isActive(clusterSrcId)
+        else if (activeness == EdgeActiveness.DstOnly) true
+        else if (activeness == EdgeActiveness.Both) isActive(clusterSrcId)
+        else if (activeness == EdgeActiveness.Either) true
+        else throw new Exception("unreachable")
+
+      if (scanCluster) {
         var pos = clusterPos
         val srcAttr =
           if (tripletFields.useSrc) vertexAttrs(clusterLocalSrcId) else null.asInstanceOf[VD]
@@ -457,9 +455,13 @@ class EdgePartition[
         while (pos < size && localSrcIds(pos) == clusterLocalSrcId) {
           val localDstId = localDstIds(pos)
           val dstId = local2global(localDstId)
-          val dstIsActive = !dstMustBeActive || isActive(dstId)
           val edgeIsActive =
-            if (maySatisfyEither) srcIsActive || dstIsActive else srcIsActive && dstIsActive
+            if (activeness == EdgeActiveness.Neither) true
+            else if (activeness == EdgeActiveness.SrcOnly) true
+            else if (activeness == EdgeActiveness.DstOnly) isActive(dstId)
+            else if (activeness == EdgeActiveness.Both) isActive(dstId)
+            else if (activeness == EdgeActiveness.Either) isActive(clusterSrcId) || isActive(dstId)
+            else throw new Exception("unreachable")
           if (edgeIsActive) {
             val dstAttr =
               if (tripletFields.useDst) vertexAttrs(localDstId) else null.asInstanceOf[VD]
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index a1fe158b7b490d49b33cc31c2097983e49ec737c..2b4636a6c6ddfd8e8e8068b62ba725517be4a85e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -218,30 +218,30 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
           case Some(EdgeDirection.Both) =>
             if (activeFraction < 0.8) {
               edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields,
-                true, true, false)
+                EdgeActiveness.Both)
             } else {
               edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
-                true, true, false)
+                EdgeActiveness.Both)
             }
           case Some(EdgeDirection.Either) =>
             // TODO: Because we only have a clustered index on the source vertex ID, we can't filter
             // the index here. Instead we have to scan all edges and then do the filter.
             edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
-              true, true, true)
+              EdgeActiveness.Either)
           case Some(EdgeDirection.Out) =>
             if (activeFraction < 0.8) {
               edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields,
-                true, false, false)
+                EdgeActiveness.SrcOnly)
             } else {
               edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
-                true, false, false)
+                EdgeActiveness.SrcOnly)
             }
           case Some(EdgeDirection.In) =>
             edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
-              false, true, false)
+              EdgeActiveness.DstOnly)
           case _ => // None
             edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
-              false, false, false)
+              EdgeActiveness.Neither)
         }
     }).setName("GraphImpl.aggregateMessages - preAgg")