diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index dd8f5bacb9f6ec1e94d6aff85e1b1207114d1a53..490d45d12b8e3c49ac3496f7aef722d8de638f9a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.reflect.ClassTag -import com.google.common.collect.ConcurrentHashMultiset +import com.google.common.collect.{ConcurrentHashMultiset, ImmutableMultiset} import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.internal.Logging @@ -340,7 +340,7 @@ private[storage] class BlockInfoManager extends Logging { val blocksWithReleasedLocks = mutable.ArrayBuffer[BlockId]() val readLocks = synchronized { - readLocksByTask.remove(taskAttemptId).get + readLocksByTask.remove(taskAttemptId).getOrElse(ImmutableMultiset.of[BlockId]()) } val writeLocks = synchronized { writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty)