-
- Downloads
[SPARK-13353][SQL] fast serialization for collecting DataFrame/Dataset
## What changes were proposed in this pull request? When we call DataFrame/Dataset.collect(), Java serializer (or Kryo Serializer) will be used to serialize the UnsafeRows in executor, then deserialize them into UnsafeRows in driver. Java serializer (and Kyro serializer) are slow on millions rows, because they try to find out the same rows, but usually there is no same rows. This PR will serialize the UnsafeRows as byte array by packing them together, then Java serializer (or Kyro serializer) serialize the bytes very fast (there are fewer blocks and byte array are not compared by content). The UnsafeRow format is highly compressible, the serialized bytes are also compressed (configurable by spark.io.compression.codec). ## How was this patch tested? Existing unit tests. Add a benchmark for collect, before this patch: ``` Intel(R) Core(TM) i7-4558U CPU 2.80GHz collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- collect 1 million 3991 / 4311 0.3 3805.7 1.0X collect 2 millions 10083 / 10637 0.1 9616.0 0.4X collect 4 millions 29551 / 30072 0.0 28182.3 0.1X ``` ``` Intel(R) Core(TM) i7-4558U CPU 2.80GHz collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- collect 1 million 775 / 1170 1.4 738.9 1.0X collect 2 millions 1153 / 1758 0.9 1099.3 0.7X collect 4 millions 4451 / 5124 0.2 4244.9 0.2X ``` We can see about 5-7X speedup. Author: Davies Liu <davies@databricks.com> Closes #11664 from davies/serialize_row.
Showing
- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 2 additions, 2 deletionssql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 44 additions, 3 deletions...main/scala/org/apache/spark/sql/execution/SparkPlan.scala
- sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala 3 additions, 1 deletion...est/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala 25 additions, 0 deletions...ache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
Loading
Please register or sign in to comment