diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
index 994e18676ec494f620095ae390c08ed8dab83c85..a5778876d49015777d117bd3eecf3aa3c3d4a33d 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -63,8 +63,38 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
   def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
 
   /**
-   * Send a message to the corresponding [[RpcEndpoint]] and get its result within a default
-   * timeout, or throw a SparkException if this fails even after the default number of retries.
+   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
+   * default timeout, throw an exception if this fails.
+   *
+   * Note: this is a blocking action which may cost a lot of time,  so don't call it in a message
+   * loop of [[RpcEndpoint]].
+
+   * @param message the message to send
+   * @tparam T type of the reply message
+   * @return the reply message from the corresponding [[RpcEndpoint]]
+   */
+  def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)
+
+  /**
+   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
+   * specified timeout, throw an exception if this fails.
+   *
+   * Note: this is a blocking action which may cost a lot of time, so don't call it in a message
+   * loop of [[RpcEndpoint]].
+   *
+   * @param message the message to send
+   * @param timeout the timeout duration
+   * @tparam T type of the reply message
+   * @return the reply message from the corresponding [[RpcEndpoint]]
+   */
+  def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
+    val future = ask[T](message, timeout)
+    timeout.awaitResult(future)
+  }
+
+  /**
+   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
+   * default timeout, throw a SparkException if this fails even after the default number of retries.
    * The default `timeout` will be used in every trial of calling `sendWithReply`. Because this
    * method retries, the message handling in the receiver side should be idempotent.
    *
@@ -75,10 +105,11 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
    * @tparam T type of the reply message
    * @return the reply message from the corresponding [[RpcEndpoint]]
    */
+  @deprecated("use 'askSync' instead.", "2.2.0")
   def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout)
 
   /**
-   * Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a
+   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
    * specified timeout, throw a SparkException if this fails even after the specified number of
    * retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method
    * retries, the message handling in the receiver side should be idempotent.
@@ -91,6 +122,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
    * @tparam T type of the reply message
    * @return the reply message from the corresponding [[RpcEndpoint]]
    */
+  @deprecated("use 'askSync' instead.", "2.2.0")
   def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
     // TODO: Consider removing multiple attempts
     var attempts = 0
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index eca7c79465c6b0fc805cbc315b5c36941bb47f79..722024b8a6d57b5a9f1eac9d3fb40592b1f6ea3d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -159,7 +159,7 @@ private[streaming] class ReceiverSupervisorImpl(
     logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
     val numRecords = blockStoreResult.numRecords
     val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
-    trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
+    trackerEndpoint.askSync[Boolean](AddBlock(blockInfo))
     logDebug(s"Reported block $blockId")
   }