Skip to content
Snippets Groups Projects
Commit 0b7d4966 authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-14316][SQL] StateStoreCoordinator should extend ThreadSafeRpcEndpoint

## What changes were proposed in this pull request?

RpcEndpoint is not thread safe and allows multiple messages to be processed at the same time. StateStoreCoordinator should use ThreadSafeRpcEndpoint.

## How was this patch tested?

Existing unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #12100 from zsxwing/fix-StateStoreCoordinator.
parent e41acb75
No related branches found
No related tags found
No related merge requests found
...@@ -21,7 +21,7 @@ import scala.collection.mutable ...@@ -21,7 +21,7 @@ import scala.collection.mutable
import org.apache.spark.SparkEnv import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.util.RpcUtils import org.apache.spark.util.RpcUtils
...@@ -112,7 +112,7 @@ private[sql] class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointR ...@@ -112,7 +112,7 @@ private[sql] class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointR
* Class for coordinating instances of [[StateStore]]s loaded in executors across the cluster, * Class for coordinating instances of [[StateStore]]s loaded in executors across the cluster,
* and get their locations for job scheduling. * and get their locations for job scheduling.
*/ */
private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends RpcEndpoint { private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
private val instances = new mutable.HashMap[StateStoreId, ExecutorCacheTaskLocation] private val instances = new mutable.HashMap[StateStoreId, ExecutorCacheTaskLocation]
override def receive: PartialFunction[Any, Unit] = { override def receive: PartialFunction[Any, Unit] = {
......
...@@ -124,11 +124,9 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAn ...@@ -124,11 +124,9 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAn
coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 0), "host1", "exec1") coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 0), "host1", "exec1")
coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 1), "host2", "exec2") coordinatorRef.reportActiveInstance(StateStoreId(path, opId, 1), "host2", "exec2")
eventually(timeout(10 seconds)) { assert(
assert( coordinatorRef.getLocation(StateStoreId(path, opId, 0)) ===
coordinatorRef.getLocation(StateStoreId(path, opId, 0)) === Some(ExecutorCacheTaskLocation("host1", "exec1").toString))
Some(ExecutorCacheTaskLocation("host1", "exec1").toString))
}
val rdd = makeRDD(sc, Seq("a", "b", "a")).mapPartitionWithStateStore( val rdd = makeRDD(sc, Seq("a", "b", "a")).mapPartitionWithStateStore(
increment, path, opId, storeVersion = 0, keySchema, valueSchema) increment, path, opId, storeVersion = 0, keySchema, valueSchema)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment