-
- Downloads
[SPARK-18546][CORE] Fix merging shuffle spills when using encryption.
The problem exists because it's not possible to just concatenate encrypted partition data from different spill files; currently each partition would have its own initial vector to set up encryption, and the final merged file should contain a single initial vector for each merged partiton, otherwise iterating over each record becomes really hard. To fix that, UnsafeShuffleWriter now decrypts the partitions when merging, so that the merged file contains a single initial vector at the start of the partition data. Because it's not possible to do that using the fast transferTo path, when encryption is enabled UnsafeShuffleWriter will revert back to using file streams when merging. It may be possible to use a hybrid approach when using encryption, using an intermediate direct buffer when reading from files and encrypting the data, but that's better left for a separate patch. As part of the change I made DiskBlockObjectWriter take a SerializerManager instead of a "wrap stream" closure, since that makes it easier to test the code without having to mock SerializerManager functionality. Tested with newly added unit tests (UnsafeShuffleWriterSuite for the write side and ExternalAppendOnlyMapSuite for integration), and by running some apps that failed without the fix. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #15982 from vanzin/SPARK-18546.
Showing
- core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java 28 additions, 20 deletions...va/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
- core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala 4 additions, 2 deletions...scala/org/apache/spark/serializer/SerializerManager.scala
- core/src/main/scala/org/apache/spark/storage/BlockManager.scala 2 additions, 3 deletions...rc/main/scala/org/apache/spark/storage/BlockManager.scala
- core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala 3 additions, 3 deletions...cala/org/apache/spark/storage/DiskBlockObjectWriter.scala
- core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java 71 additions, 29 deletions...g/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
- core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java 1 addition, 10 deletions...apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
- core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java 7 additions, 14 deletions...til/collection/unsafe/sort/UnsafeExternalSorterSuite.java
- core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala 3 additions, 2 deletions...park/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
- core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala 19 additions, 35 deletions...org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
- core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala 7 additions, 1 deletion...he/spark/util/collection/ExternalAppendOnlyMapSuite.scala
Loading
Please register or sign in to comment