-
- Downloads
[SPARK-18891][SQL] Support for Scala Map collection types
## What changes were proposed in this pull request? Add support for arbitrary Scala `Map` types in deserialization as well as a generic implicit encoder. Used the builder approach as in #16541 to construct any provided `Map` type upon deserialization. Please note that this PR also adds (ignored) tests for issue [SPARK-19104 CompileException with Map and Case Class in Spark 2.1.0](https://issues.apache.org/jira/browse/SPARK-19104) but doesn't solve it. Added support for Java Maps in codegen code (encoders will be added in a different PR) with the following default implementations for interfaces/abstract classes: * `java.util.Map`, `java.util.AbstractMap` => `java.util.HashMap` * `java.util.SortedMap`, `java.util.NavigableMap` => `java.util.TreeMap` * `java.util.concurrent.ConcurrentMap` => `java.util.concurrent.ConcurrentHashMap` * `java.util.concurrent.ConcurrentNavigableMap` => `java.util.concurrent.ConcurrentSkipListMap` Resulting codegen for `Seq(Map(1 -> 2)).toDS().map(identity).queryExecution.debug.codegen`: ``` /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private boolean CollectObjectsToMap_loopIsNull1; /* 010 */ private int CollectObjectsToMap_loopValue0; /* 011 */ private boolean CollectObjectsToMap_loopIsNull3; /* 012 */ private int CollectObjectsToMap_loopValue2; /* 013 */ private UnsafeRow deserializetoobject_result; /* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 016 */ private scala.collection.immutable.Map mapelements_argValue; /* 017 */ private UnsafeRow mapelements_result; /* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 019 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 020 */ private UnsafeRow serializefromobject_result; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 023 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter; /* 024 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter1; /* 025 */ /* 026 */ public GeneratedIterator(Object[] references) { /* 027 */ this.references = references; /* 028 */ } /* 029 */ /* 030 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 031 */ partitionIndex = index; /* 032 */ this.inputs = inputs; /* 033 */ wholestagecodegen_init_0(); /* 034 */ wholestagecodegen_init_1(); /* 035 */ /* 036 */ } /* 037 */ /* 038 */ private void wholestagecodegen_init_0() { /* 039 */ inputadapter_input = inputs[0]; /* 040 */ /* 041 */ deserializetoobject_result = new UnsafeRow(1); /* 042 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32); /* 043 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 044 */ /* 045 */ mapelements_result = new UnsafeRow(1); /* 046 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32); /* 047 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 048 */ serializefromobject_result = new UnsafeRow(1); /* 049 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32); /* 050 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 051 */ this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 052 */ /* 053 */ } /* 054 */ /* 055 */ private void wholestagecodegen_init_1() { /* 056 */ this.serializefromobject_arrayWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 057 */ /* 058 */ } /* 059 */ /* 060 */ protected void processNext() throws java.io.IOException { /* 061 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 062 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 063 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 064 */ MapData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getMap(0)); /* 065 */ /* 066 */ boolean deserializetoobject_isNull1 = true; /* 067 */ ArrayData deserializetoobject_value1 = null; /* 068 */ if (!inputadapter_isNull) { /* 069 */ deserializetoobject_isNull1 = false; /* 070 */ if (!deserializetoobject_isNull1) { /* 071 */ Object deserializetoobject_funcResult = null; /* 072 */ deserializetoobject_funcResult = inputadapter_value.keyArray(); /* 073 */ if (deserializetoobject_funcResult == null) { /* 074 */ deserializetoobject_isNull1 = true; /* 075 */ } else { /* 076 */ deserializetoobject_value1 = (ArrayData) deserializetoobject_funcResult; /* 077 */ } /* 078 */ /* 079 */ } /* 080 */ deserializetoobject_isNull1 = deserializetoobject_value1 == null; /* 081 */ } /* 082 */ /* 083 */ boolean deserializetoobject_isNull3 = true; /* 084 */ ArrayData deserializetoobject_value3 = null; /* 085 */ if (!inputadapter_isNull) { /* 086 */ deserializetoobject_isNull3 = false; /* 087 */ if (!deserializetoobject_isNull3) { /* 088 */ Object deserializetoobject_funcResult1 = null; /* 089 */ deserializetoobject_funcResult1 = inputadapter_value.valueArray(); /* 090 */ if (deserializetoobject_funcResult1 == null) { /* 091 */ deserializetoobject_isNull3 = true; /* 092 */ } else { /* 093 */ deserializetoobject_value3 = (ArrayData) deserializetoobject_funcResult1; /* 094 */ } /* 095 */ /* 096 */ } /* 097 */ deserializetoobject_isNull3 = deserializetoobject_value3 == null; /* 098 */ } /* 099 */ scala.collection.immutable.Map deserializetoobject_value = null; /* 100 */ /* 101 */ if ((deserializetoobject_isNull1 && !deserializetoobject_isNull3) || /* 102 */ (!deserializetoobject_isNull1 && deserializetoobject_isNull3)) { /* 103 */ throw new RuntimeException("Invalid state: Inconsistent nullability of key-value"); /* 104 */ } /* 105 */ /* 106 */ if (!deserializetoobject_isNull1) { /* 107 */ if (deserializetoobject_value1.numElements() != deserializetoobject_value3.numElements()) { /* 108 */ throw new RuntimeException("Invalid state: Inconsistent lengths of key-value arrays"); /* 109 */ } /* 110 */ int deserializetoobject_dataLength = deserializetoobject_value1.numElements(); /* 111 */ /* 112 */ scala.collection.mutable.Builder CollectObjectsToMap_builderValue5 = scala.collection.immutable.Map$.MODULE$.newBuilder(); /* 113 */ CollectObjectsToMap_builderValue5.sizeHint(deserializetoobject_dataLength); /* 114 */ /* 115 */ int deserializetoobject_loopIndex = 0; /* 116 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) { /* 117 */ CollectObjectsToMap_loopValue0 = (int) (deserializetoobject_value1.getInt(deserializetoobject_loopIndex)); /* 118 */ CollectObjectsToMap_loopValue2 = (int) (deserializetoobject_value3.getInt(deserializetoobject_loopIndex)); /* 119 */ CollectObjectsToMap_loopIsNull1 = deserializetoobject_value1.isNullAt(deserializetoobject_loopIndex); /* 120 */ CollectObjectsToMap_loopIsNull3 = deserializetoobject_value3.isNullAt(deserializetoobject_loopIndex); /* 121 */ /* 122 */ if (CollectObjectsToMap_loopIsNull1) { /* 123 */ throw new RuntimeException("Found null in map key!"); /* 124 */ } /* 125 */ /* 126 */ scala.Tuple2 CollectObjectsToMap_loopValue4; /* 127 */ /* 128 */ if (CollectObjectsToMap_loopIsNull3) { /* 129 */ CollectObjectsToMap_loopValue4 = new scala.Tuple2(CollectObjectsToMap_loopValue0, null); /* 130 */ } else { /* 131 */ CollectObjectsToMap_loopValue4 = new scala.Tuple2(CollectObjectsToMap_loopValue0, CollectObjectsToMap_loopValue2); /* 132 */ } /* 133 */ /* 134 */ CollectObjectsToMap_builderValue5.$plus$eq(CollectObjectsToMap_loopValue4); /* 135 */ /* 136 */ deserializetoobject_loopIndex += 1; /* 137 */ } /* 138 */ /* 139 */ deserializetoobject_value = (scala.collection.immutable.Map) CollectObjectsToMap_builderValue5.result(); /* 140 */ } /* 141 */ /* 142 */ boolean mapelements_isNull = true; /* 143 */ scala.collection.immutable.Map mapelements_value = null; /* 144 */ if (!false) { /* 145 */ mapelements_argValue = deserializetoobject_value; /* 146 */ /* 147 */ mapelements_isNull = false; /* 148 */ if (!mapelements_isNull) { /* 149 */ Object mapelements_funcResult = null; /* 150 */ mapelements_funcResult = ((scala.Function1) references[0]).apply(mapelements_argValue); /* 151 */ if (mapelements_funcResult == null) { /* 152 */ mapelements_isNull = true; /* 153 */ } else { /* 154 */ mapelements_value = (scala.collection.immutable.Map) mapelements_funcResult; /* 155 */ } /* 156 */ /* 157 */ } /* 158 */ mapelements_isNull = mapelements_value == null; /* 159 */ } /* 160 */ /* 161 */ MapData serializefromobject_value = null; /* 162 */ if (!mapelements_isNull) { /* 163 */ final int serializefromobject_length = mapelements_value.size(); /* 164 */ final Object[] serializefromobject_convertedKeys = new Object[serializefromobject_length]; /* 165 */ final Object[] serializefromobject_convertedValues = new Object[serializefromobject_length]; /* 166 */ int serializefromobject_index = 0; /* 167 */ final scala.collection.Iterator serializefromobject_entries = mapelements_value.iterator(); /* 168 */ while(serializefromobject_entries.hasNext()) { /* 169 */ final scala.Tuple2 serializefromobject_entry = (scala.Tuple2) serializefromobject_entries.next(); /* 170 */ int ExternalMapToCatalyst_key1 = (Integer) serializefromobject_entry._1(); /* 171 */ int ExternalMapToCatalyst_value1 = (Integer) serializefromobject_entry._2(); /* 172 */ /* 173 */ boolean ExternalMapToCatalyst_value_isNull1 = false; /* 174 */ /* 175 */ if (false) { /* 176 */ throw new RuntimeException("Cannot use null as map key!"); /* 177 */ } else { /* 178 */ serializefromobject_convertedKeys[serializefromobject_index] = (Integer) ExternalMapToCatalyst_key1; /* 179 */ } /* 180 */ /* 181 */ if (false) { /* 182 */ serializefromobject_convertedValues[serializefromobject_index] = null; /* 183 */ } else { /* 184 */ serializefromobject_convertedValues[serializefromobject_index] = (Integer) ExternalMapToCatalyst_value1; /* 185 */ } /* 186 */ /* 187 */ serializefromobject_index++; /* 188 */ } /* 189 */ /* 190 */ serializefromobject_value = new org.apache.spark.sql.catalyst.util.ArrayBasedMapData(new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_convertedKeys), new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_convertedValues)); /* 191 */ } /* 192 */ serializefromobject_holder.reset(); /* 193 */ /* 194 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 195 */ /* 196 */ if (mapelements_isNull) { /* 197 */ serializefromobject_rowWriter.setNullAt(0); /* 198 */ } else { /* 199 */ // Remember the current cursor so that we can calculate how many bytes are /* 200 */ // written later. /* 201 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 202 */ /* 203 */ if (serializefromobject_value instanceof UnsafeMapData) { /* 204 */ final int serializefromobject_sizeInBytes = ((UnsafeMapData) serializefromobject_value).getSizeInBytes(); /* 205 */ // grow the global buffer before writing data. /* 206 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 207 */ ((UnsafeMapData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 208 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 209 */ /* 210 */ } else { /* 211 */ final ArrayData serializefromobject_keys = serializefromobject_value.keyArray(); /* 212 */ final ArrayData serializefromobject_values = serializefromobject_value.valueArray(); /* 213 */ /* 214 */ // preserve 8 bytes to write the key array numBytes later. /* 215 */ serializefromobject_holder.grow(8); /* 216 */ serializefromobject_holder.cursor += 8; /* 217 */ /* 218 */ // Remember the current cursor so that we can write numBytes of key array later. /* 219 */ final int serializefromobject_tmpCursor1 = serializefromobject_holder.cursor; /* 220 */ /* 221 */ if (serializefromobject_keys instanceof UnsafeArrayData) { /* 222 */ final int serializefromobject_sizeInBytes1 = ((UnsafeArrayData) serializefromobject_keys).getSizeInBytes(); /* 223 */ // grow the global buffer before writing data. /* 224 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes1); /* 225 */ ((UnsafeArrayData) serializefromobject_keys).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 226 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes1; /* 227 */ /* 228 */ } else { /* 229 */ final int serializefromobject_numElements = serializefromobject_keys.numElements(); /* 230 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4); /* 231 */ /* 232 */ for (int serializefromobject_index1 = 0; serializefromobject_index1 < serializefromobject_numElements; serializefromobject_index1++) { /* 233 */ if (serializefromobject_keys.isNullAt(serializefromobject_index1)) { /* 234 */ serializefromobject_arrayWriter.setNullInt(serializefromobject_index1); /* 235 */ } else { /* 236 */ final int serializefromobject_element = serializefromobject_keys.getInt(serializefromobject_index1); /* 237 */ serializefromobject_arrayWriter.write(serializefromobject_index1, serializefromobject_element); /* 238 */ } /* 239 */ } /* 240 */ } /* 241 */ /* 242 */ // Write the numBytes of key array into the first 8 bytes. /* 243 */ Platform.putLong(serializefromobject_holder.buffer, serializefromobject_tmpCursor1 - 8, serializefromobject_holder.cursor - serializefromobject_tmpCursor1); /* 244 */ /* 245 */ if (serializefromobject_values instanceof UnsafeArrayData) { /* 246 */ final int serializefromobject_sizeInBytes2 = ((UnsafeArrayData) serializefromobject_values).getSizeInBytes(); /* 247 */ // grow the global buffer before writing data. /* 248 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes2); /* 249 */ ((UnsafeArrayData) serializefromobject_values).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 250 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes2; /* 251 */ /* 252 */ } else { /* 253 */ final int serializefromobject_numElements1 = serializefromobject_values.numElements(); /* 254 */ serializefromobject_arrayWriter1.initialize(serializefromobject_holder, serializefromobject_numElements1, 4); /* 255 */ /* 256 */ for (int serializefromobject_index2 = 0; serializefromobject_index2 < serializefromobject_numElements1; serializefromobject_index2++) { /* 257 */ if (serializefromobject_values.isNullAt(serializefromobject_index2)) { /* 258 */ serializefromobject_arrayWriter1.setNullInt(serializefromobject_index2); /* 259 */ } else { /* 260 */ final int serializefromobject_element1 = serializefromobject_values.getInt(serializefromobject_index2); /* 261 */ serializefromobject_arrayWriter1.write(serializefromobject_index2, serializefromobject_element1); /* 262 */ } /* 263 */ } /* 264 */ } /* 265 */ /* 266 */ } /* 267 */ /* 268 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 269 */ } /* 270 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 271 */ append(serializefromobject_result); /* 272 */ if (shouldStop()) return; /* 273 */ } /* 274 */ } /* 275 */ } ``` Codegen for `java.util.Map`: ``` /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private boolean CollectObjectsToMap_loopIsNull1; /* 010 */ private int CollectObjectsToMap_loopValue0; /* 011 */ private boolean CollectObjectsToMap_loopIsNull3; /* 012 */ private int CollectObjectsToMap_loopValue2; /* 013 */ private UnsafeRow deserializetoobject_result; /* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 016 */ private java.util.HashMap mapelements_argValue; /* 017 */ private UnsafeRow mapelements_result; /* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 019 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 020 */ private UnsafeRow serializefromobject_result; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 023 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter; /* 024 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter1; /* 025 */ /* 026 */ public GeneratedIterator(Object[] references) { /* 027 */ this.references = references; /* 028 */ } /* 029 */ /* 030 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 031 */ partitionIndex = index; /* 032 */ this.inputs = inputs; /* 033 */ wholestagecodegen_init_0(); /* 034 */ wholestagecodegen_init_1(); /* 035 */ /* 036 */ } /* 037 */ /* 038 */ private void wholestagecodegen_init_0() { /* 039 */ inputadapter_input = inputs[0]; /* 040 */ /* 041 */ deserializetoobject_result = new UnsafeRow(1); /* 042 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32); /* 043 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 044 */ /* 045 */ mapelements_result = new UnsafeRow(1); /* 046 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32); /* 047 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 048 */ serializefromobject_result = new UnsafeRow(1); /* 049 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32); /* 050 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 051 */ this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 052 */ /* 053 */ } /* 054 */ /* 055 */ private void wholestagecodegen_init_1() { /* 056 */ this.serializefromobject_arrayWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 057 */ /* 058 */ } /* 059 */ /* 060 */ protected void processNext() throws java.io.IOException { /* 061 */ while (inputadapter_input.hasNext() && !stopEarly()) { /* 062 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 063 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 064 */ MapData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getMap(0)); /* 065 */ /* 066 */ boolean deserializetoobject_isNull1 = true; /* 067 */ ArrayData deserializetoobject_value1 = null; /* 068 */ if (!inputadapter_isNull) { /* 069 */ deserializetoobject_isNull1 = false; /* 070 */ if (!deserializetoobject_isNull1) { /* 071 */ Object deserializetoobject_funcResult = null; /* 072 */ deserializetoobject_funcResult = inputadapter_value.keyArray(); /* 073 */ if (deserializetoobject_funcResult == null) { /* 074 */ deserializetoobject_isNull1 = true; /* 075 */ } else { /* 076 */ deserializetoobject_value1 = (ArrayData) deserializetoobject_funcResult; /* 077 */ } /* 078 */ /* 079 */ } /* 080 */ deserializetoobject_isNull1 = deserializetoobject_value1 == null; /* 081 */ } /* 082 */ /* 083 */ boolean deserializetoobject_isNull3 = true; /* 084 */ ArrayData deserializetoobject_value3 = null; /* 085 */ if (!inputadapter_isNull) { /* 086 */ deserializetoobject_isNull3 = false; /* 087 */ if (!deserializetoobject_isNull3) { /* 088 */ Object deserializetoobject_funcResult1 = null; /* 089 */ deserializetoobject_funcResult1 = inputadapter_value.valueArray(); /* 090 */ if (deserializetoobject_funcResult1 == null) { /* 091 */ deserializetoobject_isNull3 = true; /* 092 */ } else { /* 093 */ deserializetoobject_value3 = (ArrayData) deserializetoobject_funcResult1; /* 094 */ } /* 095 */ /* 096 */ } /* 097 */ deserializetoobject_isNull3 = deserializetoobject_value3 == null; /* 098 */ } /* 099 */ java.util.HashMap deserializetoobject_value = null; /* 100 */ /* 101 */ if ((deserializetoobject_isNull1 && !deserializetoobject_isNull3) || /* 102 */ (!deserializetoobject_isNull1 && deserializetoobject_isNull3)) { /* 103 */ throw new RuntimeException("Invalid state: Inconsistent nullability of key-value"); /* 104 */ } /* 105 */ /* 106 */ if (!deserializetoobject_isNull1) { /* 107 */ if (deserializetoobject_value1.numElements() != deserializetoobject_value3.numElements()) { /* 108 */ throw new RuntimeException("Invalid state: Inconsistent lengths of key-value arrays"); /* 109 */ } /* 110 */ int deserializetoobject_dataLength = deserializetoobject_value1.numElements(); /* 111 */ java.util.Map CollectObjectsToMap_builderValue5 = new java.util.HashMap(deserializetoobject_dataLength); /* 112 */ /* 113 */ int deserializetoobject_loopIndex = 0; /* 114 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) { /* 115 */ CollectObjectsToMap_loopValue0 = (int) (deserializetoobject_value1.getInt(deserializetoobject_loopIndex)); /* 116 */ CollectObjectsToMap_loopValue2 = (int) (deserializetoobject_value3.getInt(deserializetoobject_loopIndex)); /* 117 */ CollectObjectsToMap_loopIsNull1 = deserializetoobject_value1.isNullAt(deserializetoobject_loopIndex); /* 118 */ CollectObjectsToMap_loopIsNull3 = deserializetoobject_value3.isNullAt(deserializetoobject_loopIndex); /* 119 */ /* 120 */ if (CollectObjectsToMap_loopIsNull1) { /* 121 */ throw new RuntimeException("Found null in map key!"); /* 122 */ } /* 123 */ /* 124 */ CollectObjectsToMap_builderValue5.put(CollectObjectsToMap_loopValue0, CollectObjectsToMap_loopValue2); /* 125 */ /* 126 */ deserializetoobject_loopIndex += 1; /* 127 */ } /* 128 */ /* 129 */ deserializetoobject_value = (java.util.HashMap) CollectObjectsToMap_builderValue5; /* 130 */ } /* 131 */ /* 132 */ boolean mapelements_isNull = true; /* 133 */ java.util.HashMap mapelements_value = null; /* 134 */ if (!false) { /* 135 */ mapelements_argValue = deserializetoobject_value; /* 136 */ /* 137 */ mapelements_isNull = false; /* 138 */ if (!mapelements_isNull) { /* 139 */ Object mapelements_funcResult = null; /* 140 */ mapelements_funcResult = ((scala.Function1) references[0]).apply(mapelements_argValue); /* 141 */ if (mapelements_funcResult == null) { /* 142 */ mapelements_isNull = true; /* 143 */ } else { /* 144 */ mapelements_value = (java.util.HashMap) mapelements_funcResult; /* 145 */ } /* 146 */ /* 147 */ } /* 148 */ mapelements_isNull = mapelements_value == null; /* 149 */ } /* 150 */ /* 151 */ MapData serializefromobject_value = null; /* 152 */ if (!mapelements_isNull) { /* 153 */ final int serializefromobject_length = mapelements_value.size(); /* 154 */ final Object[] serializefromobject_convertedKeys = new Object[serializefromobject_length]; /* 155 */ final Object[] serializefromobject_convertedValues = new Object[serializefromobject_length]; /* 156 */ int serializefromobject_index = 0; /* 157 */ final java.util.Iterator serializefromobject_entries = mapelements_value.entrySet().iterator(); /* 158 */ while(serializefromobject_entries.hasNext()) { /* 159 */ final java.util.Map$Entry serializefromobject_entry = (java.util.Map$Entry) serializefromobject_entries.next(); /* 160 */ int ExternalMapToCatalyst_key1 = (Integer) serializefromobject_entry.getKey(); /* 161 */ int ExternalMapToCatalyst_value1 = (Integer) serializefromobject_entry.getValue(); /* 162 */ /* 163 */ boolean ExternalMapToCatalyst_value_isNull1 = false; /* 164 */ /* 165 */ if (false) { /* 166 */ throw new RuntimeException("Cannot use null as map key!"); /* 167 */ } else { /* 168 */ serializefromobject_convertedKeys[serializefromobject_index] = (Integer) ExternalMapToCatalyst_key1; /* 169 */ } /* 170 */ /* 171 */ if (false) { /* 172 */ serializefromobject_convertedValues[serializefromobject_index] = null; /* 173 */ } else { /* 174 */ serializefromobject_convertedValues[serializefromobject_index] = (Integer) ExternalMapToCatalyst_value1; /* 175 */ } /* 176 */ /* 177 */ serializefromobject_index++; /* 178 */ } /* 179 */ /* 180 */ serializefromobject_value = new org.apache.spark.sql.catalyst.util.ArrayBasedMapData(new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_convertedKeys), new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_convertedValues)); /* 181 */ } /* 182 */ serializefromobject_holder.reset(); /* 183 */ /* 184 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 185 */ /* 186 */ if (mapelements_isNull) { /* 187 */ serializefromobject_rowWriter.setNullAt(0); /* 188 */ } else { /* 189 */ // Remember the current cursor so that we can calculate how many bytes are /* 190 */ // written later. /* 191 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 192 */ /* 193 */ if (serializefromobject_value instanceof UnsafeMapData) { /* 194 */ final int serializefromobject_sizeInBytes = ((UnsafeMapData) serializefromobject_value).getSizeInBytes(); /* 195 */ // grow the global buffer before writing data. /* 196 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 197 */ ((UnsafeMapData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 198 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 199 */ /* 200 */ } else { /* 201 */ final ArrayData serializefromobject_keys = serializefromobject_value.keyArray(); /* 202 */ final ArrayData serializefromobject_values = serializefromobject_value.valueArray(); /* 203 */ /* 204 */ // preserve 8 bytes to write the key array numBytes later. /* 205 */ serializefromobject_holder.grow(8); /* 206 */ serializefromobject_holder.cursor += 8; /* 207 */ /* 208 */ // Remember the current cursor so that we can write numBytes of key array later. /* 209 */ final int serializefromobject_tmpCursor1 = serializefromobject_holder.cursor; /* 210 */ /* 211 */ if (serializefromobject_keys instanceof UnsafeArrayData) { /* 212 */ final int serializefromobject_sizeInBytes1 = ((UnsafeArrayData) serializefromobject_keys).getSizeInBytes(); /* 213 */ // grow the global buffer before writing data. /* 214 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes1); /* 215 */ ((UnsafeArrayData) serializefromobject_keys).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 216 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes1; /* 217 */ /* 218 */ } else { /* 219 */ final int serializefromobject_numElements = serializefromobject_keys.numElements(); /* 220 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4); /* 221 */ /* 222 */ for (int serializefromobject_index1 = 0; serializefromobject_index1 < serializefromobject_numElements; serializefromobject_index1++) { /* 223 */ if (serializefromobject_keys.isNullAt(serializefromobject_index1)) { /* 224 */ serializefromobject_arrayWriter.setNullInt(serializefromobject_index1); /* 225 */ } else { /* 226 */ final int serializefromobject_element = serializefromobject_keys.getInt(serializefromobject_index1); /* 227 */ serializefromobject_arrayWriter.write(serializefromobject_index1, serializefromobject_element); /* 228 */ } /* 229 */ } /* 230 */ } /* 231 */ /* 232 */ // Write the numBytes of key array into the first 8 bytes. /* 233 */ Platform.putLong(serializefromobject_holder.buffer, serializefromobject_tmpCursor1 - 8, serializefromobject_holder.cursor - serializefromobject_tmpCursor1); /* 234 */ /* 235 */ if (serializefromobject_values instanceof UnsafeArrayData) { /* 236 */ final int serializefromobject_sizeInBytes2 = ((UnsafeArrayData) serializefromobject_values).getSizeInBytes(); /* 237 */ // grow the global buffer before writing data. /* 238 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes2); /* 239 */ ((UnsafeArrayData) serializefromobject_values).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 240 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes2; /* 241 */ /* 242 */ } else { /* 243 */ final int serializefromobject_numElements1 = serializefromobject_values.numElements(); /* 244 */ serializefromobject_arrayWriter1.initialize(serializefromobject_holder, serializefromobject_numElements1, 4); /* 245 */ /* 246 */ for (int serializefromobject_index2 = 0; serializefromobject_index2 < serializefromobject_numElements1; serializefromobject_index2++) { /* 247 */ if (serializefromobject_values.isNullAt(serializefromobject_index2)) { /* 248 */ serializefromobject_arrayWriter1.setNullInt(serializefromobject_index2); /* 249 */ } else { /* 250 */ final int serializefromobject_element1 = serializefromobject_values.getInt(serializefromobject_index2); /* 251 */ serializefromobject_arrayWriter1.write(serializefromobject_index2, serializefromobject_element1); /* 252 */ } /* 253 */ } /* 254 */ } /* 255 */ /* 256 */ } /* 257 */ /* 258 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor); /* 259 */ } /* 260 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize()); /* 261 */ append(serializefromobject_result); /* 262 */ if (shouldStop()) return; /* 263 */ } /* 264 */ } /* 265 */ } ``` ## How was this patch tested? ``` build/mvn -DskipTests clean package && dev/run-tests ``` Additionally in Spark shell: ``` scala> Seq(collection.mutable.HashMap(1 -> 2, 2 -> 3)).toDS().map(_ += (3 -> 4)).collect() res0: Array[scala.collection.mutable.HashMap[Int,Int]] = Array(Map(2 -> 3, 1 -> 2, 3 -> 4)) ``` Author: Michal Senkyr <mike.senkyr@gmail.com> Author: Michal Šenkýř <mike.senkyr@gmail.com> Closes #16986 from michalsenkyr/dataset-map-builder.
Showing
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 7 additions, 26 deletions...scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala 168 additions, 1 deletion...ache/spark/sql/catalyst/expressions/objects/objects.scala
- sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala 25 additions, 0 deletions.../org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
- sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala 5 additions, 0 deletions...re/src/main/scala/org/apache/spark/sql/SQLImplicits.scala
- sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala 86 additions, 0 deletions...st/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala
Loading
Please register or sign in to comment