- Mar 30, 2016
-
-
Marcelo Vanzin authored
Move the logic to find Spark jars to CommandBuilderUtils and make it available for YARN code, so that it's possible to easily launch Spark on YARN from a build directory. Tested by running SparkPi from the build directory on YARN. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #11970 from vanzin/SPARK-13955.
-
Wenchen Fan authored
[SPARK-14268][SQL] rename toRowExpressions and fromRowExpression to serializer and deserializer in ExpressionEncoder ## What changes were proposed in this pull request? In `ExpressionEncoder`, we use `constructorFor` to build `fromRowExpression` as the `deserializer` in `ObjectOperator`. It's kind of confusing, we should make the name consistent. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #12058 from cloud-fan/rename.
-
Wenchen Fan authored
## What changes were proposed in this pull request? This PR implements buildReader for text data source and enable it in the new data source code path. ## How was this patch tested? Existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #11934 from cloud-fan/text.
-
- Mar 29, 2016
-
-
Shixiong Zhu authored
## What changes were proposed in this pull request? It would be very helpful for network performance investigation if we log the time spent on connecting and resolving host. ## How was this patch tested? Jenkins unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #12046 from zsxwing/connection-time.
-
gatorsmile authored
#### What changes were proposed in this pull request? This PR is to implement the following four Database-related DDL commands: - `CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name` - `DROP DATABASE [IF EXISTS] database_name [RESTRICT|CASCADE]` - `DESCRIBE DATABASE [EXTENDED] db_name` - `ALTER (DATABASE|SCHEMA) database_name SET DBPROPERTIES (property_name=property_value, ...)` Another PR will be submitted to handle the unsupported commands. In the Database-related DDL commands, we will issue an error exception for `ALTER (DATABASE|SCHEMA) database_name SET OWNER [USER|ROLE] user_or_role`. cc yhuai andrewor14 rxin Could you review the changes? Is it in the right direction? Thanks! #### How was this patch tested? Added a few test cases in `command/DDLSuite.scala` for testing DDL command execution in `SQLContext`. Since `HiveContext` also shares the same implementation, the existing test cases in `\hive` also verifies the correctness of these commands. Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #12009 from gatorsmile/dbDDL.
-
tedyu authored
## What changes were proposed in this pull request? For MemoryMode.OFF_HEAP, Unsafe.getInt etc. are used with no restriction. However, the Oracle implementation uses these methods only if the class variable unaligned (commented as "Cached unaligned-access capability") is true, which seems to be calculated whether the architecture is i386, x86, amd64, or x86_64. I think we should perform similar check for the use of Unsafe. Reference: https://github.com/netty/netty/blob/4.1/common/src/main/java/io/netty/util/internal/PlatformDependent0.java#L112 ## How was this patch tested? Unit test suite Author: tedyu <yuzhihong@gmail.com> Closes #11943 from tedyu/master.
-
Sameer Agarwal authored
## What changes were proposed in this pull request? Builds on https://github.com/apache/spark/pull/12022 and (a) appends "..." to truncated comment strings and (b) fixes indentation in lines after the commented strings if they happen to have a `(`, `{`, `)` or `}` ## How was this patch tested? Manually examined the generated code. Author: Sameer Agarwal <sameer@databricks.com> Closes #12044 from sameeragarwal/comment.
-
Davies Liu authored
## What changes were proposed in this pull request? This PR brings the support for chained Python UDFs, for example ```sql select udf1(udf2(a)) select udf1(udf2(a) + 3) select udf1(udf2(a) + udf3(b)) ``` Also directly chained unary Python UDFs are put in single batch of Python UDFs, others may require multiple batches. For example, ```python >>> sqlContext.sql("select double(double(1))").explain() == Physical Plan == WholeStageCodegen : +- Project [pythonUDF#10 AS double(double(1))#9] : +- INPUT +- !BatchPythonEvaluation double(double(1)), [pythonUDF#10] +- Scan OneRowRelation[] >>> sqlContext.sql("select double(double(1) + double(2))").explain() == Physical Plan == WholeStageCodegen : +- Project [pythonUDF#19 AS double((double(1) + double(2)))#16] : +- INPUT +- !BatchPythonEvaluation double((pythonUDF#17 + pythonUDF#18)), [pythonUDF#17,pythonUDF#18,pythonUDF#19] +- !BatchPythonEvaluation double(2), [pythonUDF#17,pythonUDF#18] +- !BatchPythonEvaluation double(1), [pythonUDF#17] +- Scan OneRowRelation[] ``` TODO: will support multiple unrelated Python UDFs in one batch (another PR). ## How was this patch tested? Added new unit tests for chained UDFs. Author: Davies Liu <davies@databricks.com> Closes #12014 from davies/py_udfs.
-
Eric Liang authored
## What changes were proposed in this pull request? This adds `debugCodegen` to the debug package for query execution. ## How was this patch tested? Unit and manual testing. Output example: ``` scala> import org.apache.spark.sql.execution.debug._ import org.apache.spark.sql.execution.debug._ scala> sqlContext.range(100).groupBy("id").count().orderBy("id").debugCodegen() Found 3 WholeStageCodegen subtrees. == Subtree 1 / 3 == WholeStageCodegen : +- TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#0L,count#9L]) : +- Range 0, 1, 1, 100, [id#0L] Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ /** Codegened pipeline for: /* 006 */ * TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#0L,count#9L]) /* 007 */ +- Range 0, 1, 1, 100, [id#0L] /* 008 */ */ /* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 010 */ private Object[] references; /* 011 */ private boolean agg_initAgg; /* 012 */ private org.apache.spark.sql.execution.aggregate.TungstenAggregate agg_plan; /* 013 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap; /* 014 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter; /* 015 */ private org.apache.spark.unsafe.KVIterator agg_mapIter; /* 016 */ private org.apache.spark.sql.execution.metric.LongSQLMetric range_numOutputRows; /* 017 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue range_metricValue; /* 018 */ private boolean range_initRange; /* 019 */ private long range_partitionEnd; /* 020 */ private long range_number; /* 021 */ private boolean range_overflow; /* 022 */ private scala.collection.Iterator range_input; /* 023 */ private UnsafeRow range_result; /* 024 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder range_holder; /* 025 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter range_rowWriter; /* 026 */ private UnsafeRow agg_result; /* 027 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 028 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 029 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowJoiner agg_unsafeRowJoiner; /* 030 */ private org.apache.spark.sql.execution.metric.LongSQLMetric wholestagecodegen_numOutputRows; /* 031 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue wholestagecodegen_metricValue; /* 032 */ /* 033 */ public GeneratedIterator(Object[] references) { /* 034 */ this.references = references; /* 035 */ } /* 036 */ /* 037 */ public void init(scala.collection.Iterator inputs[]) { /* 038 */ agg_initAgg = false; /* 039 */ this.agg_plan = (org.apache.spark.sql.execution.aggregate.TungstenAggregate) references[0]; /* 040 */ agg_hashMap = agg_plan.createHashMap(); /* 041 */ /* 042 */ this.range_numOutputRows = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1]; /* 043 */ range_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) range_numOutputRows.localValue(); /* 044 */ range_initRange = false; /* 045 */ range_partitionEnd = 0L; /* 046 */ range_number = 0L; /* 047 */ range_overflow = false; /* 048 */ range_input = inputs[0]; /* 049 */ range_result = new UnsafeRow(1); /* 050 */ this.range_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(range_result, 0); /* 051 */ this.range_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_holder, 1); /* 052 */ agg_result = new UnsafeRow(1); /* 053 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0); /* 054 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1); /* 055 */ agg_unsafeRowJoiner = agg_plan.createUnsafeJoiner(); /* 056 */ this.wholestagecodegen_numOutputRows = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2]; /* 057 */ wholestagecodegen_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) wholestagecodegen_numOutputRows.localValue(); /* 058 */ } /* 059 */ /* 060 */ private void agg_doAggregateWithKeys() throws java.io.IOException { /* 061 */ /*** PRODUCE: Range 0, 1, 1, 100, [id#0L] */ /* 062 */ /* 063 */ // initialize Range /* 064 */ if (!range_initRange) { /* 065 */ range_initRange = true; /* 066 */ if (range_input.hasNext()) { /* 067 */ initRange(((InternalRow) range_input.next()).getInt(0)); /* 068 */ } else { /* 069 */ return; /* 070 */ } /* 071 */ } /* 072 */ /* 073 */ while (!range_overflow && range_number < range_partitionEnd) { /* 074 */ long range_value = range_number; /* 075 */ range_number += 1L; /* 076 */ if (range_number < range_value ^ 1L < 0) { /* 077 */ range_overflow = true; /* 078 */ } /* 079 */ /* 080 */ /*** CONSUME: TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#0L,count#9L]) */ /* 081 */ /* 082 */ // generate grouping key /* 083 */ agg_rowWriter.write(0, range_value); /* 084 */ /* hash(input[0, bigint], 42) */ /* 085 */ int agg_value1 = 42; /* 086 */ /* 087 */ agg_value1 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashLong(range_value, agg_value1); /* 088 */ UnsafeRow agg_aggBuffer = null; /* 089 */ if (true) { /* 090 */ // try to get the buffer from hash map /* 091 */ agg_aggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value1); /* 092 */ } /* 093 */ if (agg_aggBuffer == null) { /* 094 */ if (agg_sorter == null) { /* 095 */ agg_sorter = agg_hashMap.destructAndCreateExternalSorter(); /* 096 */ } else { /* 097 */ agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter()); /* 098 */ } /* 099 */ /* 100 */ // the hash map had be spilled, it should have enough memory now, /* 101 */ // try to allocate buffer again. /* 102 */ agg_aggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value1); /* 103 */ if (agg_aggBuffer == null) { /* 104 */ // failed to allocate the first page /* 105 */ throw new OutOfMemoryError("No enough memory for aggregation"); /* 106 */ } /* 107 */ } /* 108 */ /* 109 */ // evaluate aggregate function /* 110 */ /* (input[0, bigint] + 1) */ /* 111 */ /* input[0, bigint] */ /* 112 */ long agg_value4 = agg_aggBuffer.getLong(0); /* 113 */ /* 114 */ long agg_value3 = -1L; /* 115 */ agg_value3 = agg_value4 + 1L; /* 116 */ // update aggregate buffer /* 117 */ agg_aggBuffer.setLong(0, agg_value3); /* 118 */ /* 119 */ if (shouldStop()) return; /* 120 */ } /* 121 */ /* 122 */ agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter); /* 123 */ } /* 124 */ /* 125 */ private void initRange(int idx) { /* 126 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx); /* 127 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(1L); /* 128 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(100L); /* 129 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L); /* 130 */ java.math.BigInteger start = java.math.BigInteger.valueOf(0L); /* 131 */ /* 132 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start); /* 133 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 134 */ range_number = Long.MAX_VALUE; /* 135 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 136 */ range_number = Long.MIN_VALUE; /* 137 */ } else { /* 138 */ range_number = st.longValue(); /* 139 */ } /* 140 */ /* 141 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice) /* 142 */ .multiply(step).add(start); /* 143 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 144 */ range_partitionEnd = Long.MAX_VALUE; /* 145 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 146 */ range_partitionEnd = Long.MIN_VALUE; /* 147 */ } else { /* 148 */ range_partitionEnd = end.longValue(); /* 149 */ } /* 150 */ /* 151 */ range_metricValue.add((range_partitionEnd - range_number) / 1L); /* 152 */ } /* 153 */ /* 154 */ protected void processNext() throws java.io.IOException { /* 155 */ /*** PRODUCE: TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#0L,count#9L]) */ /* 156 */ /* 157 */ if (!agg_initAgg) { /* 158 */ agg_initAgg = true; /* 159 */ agg_doAggregateWithKeys(); /* 160 */ } /* 161 */ /* 162 */ // output the result /* 163 */ while (agg_mapIter.next()) { /* 164 */ wholestagecodegen_metricValue.add(1); /* 165 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey(); /* 166 */ UnsafeRow agg_aggBuffer1 = (UnsafeRow) agg_mapIter.getValue(); /* 167 */ /* 168 */ UnsafeRow agg_resultRow = agg_unsafeRowJoiner.join(agg_aggKey, agg_aggBuffer1); /* 169 */ /* 170 */ /*** CONSUME: WholeStageCodegen */ /* 171 */ /* 172 */ append(agg_resultRow); /* 173 */ /* 174 */ if (shouldStop()) return; /* 175 */ } /* 176 */ /* 177 */ agg_mapIter.close(); /* 178 */ if (agg_sorter == null) { /* 179 */ agg_hashMap.free(); /* 180 */ } /* 181 */ } /* 182 */ } == Subtree 2 / 3 == WholeStageCodegen : +- Sort [id#0L ASC], true, 0 : +- INPUT +- Exchange rangepartitioning(id#0L ASC, 200), None +- WholeStageCodegen : +- TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Final,isDistinct=false)], output=[id#0L,count#4L]) : +- INPUT +- Exchange hashpartitioning(id#0L, 200), None +- WholeStageCodegen : +- TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#0L,count#9L]) : +- Range 0, 1, 1, 100, [id#0L] Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ /** Codegened pipeline for: /* 006 */ * Sort [id#0L ASC], true, 0 /* 007 */ +- INPUT /* 008 */ */ /* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 010 */ private Object[] references; /* 011 */ private boolean sort_needToSort; /* 012 */ private org.apache.spark.sql.execution.Sort sort_plan; /* 013 */ private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter; /* 014 */ private org.apache.spark.executor.TaskMetrics sort_metrics; /* 015 */ private scala.collection.Iterator<UnsafeRow> sort_sortedIter; /* 016 */ private scala.collection.Iterator inputadapter_input; /* 017 */ private org.apache.spark.sql.execution.metric.LongSQLMetric sort_dataSize; /* 018 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue; /* 019 */ private org.apache.spark.sql.execution.metric.LongSQLMetric sort_spillSize; /* 020 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue sort_metricValue1; /* 021 */ /* 022 */ public GeneratedIterator(Object[] references) { /* 023 */ this.references = references; /* 024 */ } /* 025 */ /* 026 */ public void init(scala.collection.Iterator inputs[]) { /* 027 */ sort_needToSort = true; /* 028 */ this.sort_plan = (org.apache.spark.sql.execution.Sort) references[0]; /* 029 */ sort_sorter = sort_plan.createSorter(); /* 030 */ sort_metrics = org.apache.spark.TaskContext.get().taskMetrics(); /* 031 */ /* 032 */ inputadapter_input = inputs[0]; /* 033 */ this.sort_dataSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1]; /* 034 */ sort_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_dataSize.localValue(); /* 035 */ this.sort_spillSize = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[2]; /* 036 */ sort_metricValue1 = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) sort_spillSize.localValue(); /* 037 */ } /* 038 */ /* 039 */ private void sort_addToSorter() throws java.io.IOException { /* 040 */ /*** PRODUCE: INPUT */ /* 041 */ /* 042 */ while (inputadapter_input.hasNext()) { /* 043 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 044 */ /*** CONSUME: Sort [id#0L ASC], true, 0 */ /* 045 */ /* 046 */ sort_sorter.insertRow((UnsafeRow)inputadapter_row); /* 047 */ if (shouldStop()) return; /* 048 */ } /* 049 */ /* 050 */ } /* 051 */ /* 052 */ protected void processNext() throws java.io.IOException { /* 053 */ /*** PRODUCE: Sort [id#0L ASC], true, 0 */ /* 054 */ if (sort_needToSort) { /* 055 */ sort_addToSorter(); /* 056 */ Long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled(); /* 057 */ sort_sortedIter = sort_sorter.sort(); /* 058 */ sort_metricValue.add(sort_sorter.getPeakMemoryUsage()); /* 059 */ sort_metricValue1.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore); /* 060 */ sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage()); /* 061 */ sort_needToSort = false; /* 062 */ } /* 063 */ /* 064 */ while (sort_sortedIter.hasNext()) { /* 065 */ UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next(); /* 066 */ /* 067 */ /*** CONSUME: WholeStageCodegen */ /* 068 */ /* 069 */ append(sort_outputRow); /* 070 */ /* 071 */ if (shouldStop()) return; /* 072 */ } /* 073 */ } /* 074 */ } == Subtree 3 / 3 == WholeStageCodegen : +- TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Final,isDistinct=false)], output=[id#0L,count#4L]) : +- INPUT +- Exchange hashpartitioning(id#0L, 200), None +- WholeStageCodegen : +- TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#0L,count#9L]) : +- Range 0, 1, 1, 100, [id#0L] Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ /** Codegened pipeline for: /* 006 */ * TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Final,isDistinct=false)], output=[id#0L,count#4L]) /* 007 */ +- INPUT /* 008 */ */ /* 009 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 010 */ private Object[] references; /* 011 */ private boolean agg_initAgg; /* 012 */ private org.apache.spark.sql.execution.aggregate.TungstenAggregate agg_plan; /* 013 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap; /* 014 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter; /* 015 */ private org.apache.spark.unsafe.KVIterator agg_mapIter; /* 016 */ private scala.collection.Iterator inputadapter_input; /* 017 */ private UnsafeRow agg_result; /* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; /* 019 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; /* 020 */ private UnsafeRow agg_result1; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder1; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter1; /* 023 */ private org.apache.spark.sql.execution.metric.LongSQLMetric wholestagecodegen_numOutputRows; /* 024 */ private org.apache.spark.sql.execution.metric.LongSQLMetricValue wholestagecodegen_metricValue; /* 025 */ /* 026 */ public GeneratedIterator(Object[] references) { /* 027 */ this.references = references; /* 028 */ } /* 029 */ /* 030 */ public void init(scala.collection.Iterator inputs[]) { /* 031 */ agg_initAgg = false; /* 032 */ this.agg_plan = (org.apache.spark.sql.execution.aggregate.TungstenAggregate) references[0]; /* 033 */ agg_hashMap = agg_plan.createHashMap(); /* 034 */ /* 035 */ inputadapter_input = inputs[0]; /* 036 */ agg_result = new UnsafeRow(1); /* 037 */ this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0); /* 038 */ this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1); /* 039 */ agg_result1 = new UnsafeRow(2); /* 040 */ this.agg_holder1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result1, 0); /* 041 */ this.agg_rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder1, 2); /* 042 */ this.wholestagecodegen_numOutputRows = (org.apache.spark.sql.execution.metric.LongSQLMetric) references[1]; /* 043 */ wholestagecodegen_metricValue = (org.apache.spark.sql.execution.metric.LongSQLMetricValue) wholestagecodegen_numOutputRows.localValue(); /* 044 */ } /* 045 */ /* 046 */ private void agg_doAggregateWithKeys() throws java.io.IOException { /* 047 */ /*** PRODUCE: INPUT */ /* 048 */ /* 049 */ while (inputadapter_input.hasNext()) { /* 050 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 051 */ /*** CONSUME: TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Final,isDistinct=false)], output=[id#0L,count#4L]) */ /* 052 */ /* input[0, bigint] */ /* 053 */ long inputadapter_value = inputadapter_row.getLong(0); /* 054 */ /* input[1, bigint] */ /* 055 */ long inputadapter_value1 = inputadapter_row.getLong(1); /* 056 */ /* 057 */ // generate grouping key /* 058 */ agg_rowWriter.write(0, inputadapter_value); /* 059 */ /* hash(input[0, bigint], 42) */ /* 060 */ int agg_value1 = 42; /* 061 */ /* 062 */ agg_value1 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashLong(inputadapter_value, agg_value1); /* 063 */ UnsafeRow agg_aggBuffer = null; /* 064 */ if (true) { /* 065 */ // try to get the buffer from hash map /* 066 */ agg_aggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value1); /* 067 */ } /* 068 */ if (agg_aggBuffer == null) { /* 069 */ if (agg_sorter == null) { /* 070 */ agg_sorter = agg_hashMap.destructAndCreateExternalSorter(); /* 071 */ } else { /* 072 */ agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter()); /* 073 */ } /* 074 */ /* 075 */ // the hash map had be spilled, it should have enough memory now, /* 076 */ // try to allocate buffer again. /* 077 */ agg_aggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value1); /* 078 */ if (agg_aggBuffer == null) { /* 079 */ // failed to allocate the first page /* 080 */ throw new OutOfMemoryError("No enough memory for aggregation"); /* 081 */ } /* 082 */ } /* 083 */ /* 084 */ // evaluate aggregate function /* 085 */ /* (input[0, bigint] + input[2, bigint]) */ /* 086 */ /* input[0, bigint] */ /* 087 */ long agg_value4 = agg_aggBuffer.getLong(0); /* 088 */ /* 089 */ long agg_value3 = -1L; /* 090 */ agg_value3 = agg_value4 + inputadapter_value1; /* 091 */ // update aggregate buffer /* 092 */ agg_aggBuffer.setLong(0, agg_value3); /* 093 */ if (shouldStop()) return; /* 094 */ } /* 095 */ /* 096 */ agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter); /* 097 */ } /* 098 */ /* 099 */ protected void processNext() throws java.io.IOException { /* 100 */ /*** PRODUCE: TungstenAggregate(key=[id#0L], functions=[(count(1),mode=Final,isDistinct=false)], output=[id#0L,count#4L]) */ /* 101 */ /* 102 */ if (!agg_initAgg) { /* 103 */ agg_initAgg = true; /* 104 */ agg_doAggregateWithKeys(); /* 105 */ } /* 106 */ /* 107 */ // output the result /* 108 */ while (agg_mapIter.next()) { /* 109 */ wholestagecodegen_metricValue.add(1); /* 110 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey(); /* 111 */ UnsafeRow agg_aggBuffer1 = (UnsafeRow) agg_mapIter.getValue(); /* 112 */ /* 113 */ /* input[0, bigint] */ /* 114 */ long agg_value6 = agg_aggKey.getLong(0); /* 115 */ /* input[0, bigint] */ /* 116 */ long agg_value7 = agg_aggBuffer1.getLong(0); /* 117 */ /* 118 */ /*** CONSUME: WholeStageCodegen */ /* 119 */ /* 120 */ agg_rowWriter1.write(0, agg_value6); /* 121 */ /* 122 */ agg_rowWriter1.write(1, agg_value7); /* 123 */ append(agg_result1); /* 124 */ /* 125 */ if (shouldStop()) return; /* 126 */ } /* 127 */ /* 128 */ agg_mapIter.close(); /* 129 */ if (agg_sorter == null) { /* 130 */ agg_hashMap.free(); /* 131 */ } /* 132 */ } /* 133 */ } ``` rxin Author: Eric Liang <ekl@databricks.com> Closes #12025 from ericl/spark-14227.
-
Dongjoon Hyun authored
## What changes were proposed in this pull request? This PR is a simple fix for an exception message to print `string[]` content correctly. ```java String[] colPath = requestedSchema.getPaths().get(i); ... - throw new IOException("Required column is missing in data file. Col: " + colPath); + throw new IOException("Required column is missing in data file. Col: " + Arrays.toString(colPath)); ``` ## How was this patch tested? Manual. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #12041 from dongjoon-hyun/fix_exception_message_with_string_array.
-
Dongjoon Hyun authored
## What changes were proposed in this pull request? This PR fixes two trivial typos: 'does not **much**' --> 'does not **match**'. ## How was this patch tested? Manual. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #12042 from dongjoon-hyun/fix_typo_by_replacing_much_with_match.
-
Jakob Odersky authored
Add a new api endpoint `/api/v1/version` to retrieve various version info. This PR only adds support for finding the current spark version, however other version info such as jvm or scala versions can easily be added. Author: Jakob Odersky <jodersky@gmail.com> Closes #10760 from jodersky/SPARK-10570.
-
Carson Wang authored
[SPARK-14232][WEBUI] Fix event timeline display issue when an executor is removed with a multiple line reason. ## What changes were proposed in this pull request? The event timeline doesn't show on job page if an executor is removed with a multiple line reason. This PR replaces all new line characters in the reason string with spaces.  ## How was this patch tested? Verified on the Web UI. Author: Carson Wang <carson.wang@intel.com> Closes #12029 from carsonwang/eventTimeline.
-
Yuhao Yang authored
## What changes were proposed in this pull request? jira: https://issues.apache.org/jira/browse/SPARK-14154 I just read the code for KolmogorovSmirnovTest and find it could be much simplified following the original definition. Send a PR for discussion ## How was this patch tested? unit test Author: Yuhao Yang <hhbyyh@gmail.com> Closes #11954 from hhbyyh/ksoptimize.
-
Cheng Lian authored
## What changes were proposed in this pull request? Renames SQL option `spark.sql.parquet.fileScan` since now all `HadoopFsRelation` based data sources are being migrated to `FileScanRDD` code path. ## How was this patch tested? None. Author: Cheng Lian <lian@databricks.com> Closes #12003 from liancheng/spark-14208-option-renaming.
-
Bryan Cutler authored
## What changes were proposed in this pull request? Adding binary toggle parameter to ml.feature.HashingTF, as well as mllib.feature.HashingTF since the former wraps this functionality. This parameter, if true, will set non-zero valued term counts to 1 to transform term count features to binary values that are well suited for discrete probability models. ## How was this patch tested? Added unit tests for ML and MLlib Author: Bryan Cutler <cutlerb@gmail.com> Closes #11832 from BryanCutler/binary-param-HashingTF-SPARK-13963.
-
Wenchen Fan authored
## What changes were proposed in this pull request? This PR implements buildReader for json data source and enable it in the new data source code path. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #11960 from cloud-fan/json.
-
wm624@hotmail.com authored
Add property to MLWritable.write method, so we can use .write instead of .write() Add a new test to ml/test.py to check whether the write is a property. ./python/run-tests --python-executables=python2.7 --modules=pyspark-ml Will test against the following Python executables: ['python2.7'] Will test the following Python modules: ['pyspark-ml'] Finished test(python2.7): pyspark.ml.evaluation (11s) Finished test(python2.7): pyspark.ml.clustering (16s) Finished test(python2.7): pyspark.ml.classification (24s) Finished test(python2.7): pyspark.ml.recommendation (24s) Finished test(python2.7): pyspark.ml.feature (39s) Finished test(python2.7): pyspark.ml.regression (26s) Finished test(python2.7): pyspark.ml.tuning (15s) Finished test(python2.7): pyspark.ml.tests (30s) Tests passed in 55 seconds Author: wm624@hotmail.com <wm624@hotmail.com> Closes #11945 from wangmiao1981/fix_property.
-
sethah authored
## What changes were proposed in this pull request? Now that GBTs have been moved to ML, they can use the implementation of feature importance for random forests. This patch simply adds a `featureImportances` attribute to `GBTClassifier` and `GBTRegressor` and adds tests for each. GBT feature importances here simply average the feature importances for each tree in its ensemble. This follows the implementation from scikit-learn. This method is also suggested by J Friedman in [this paper](https://statweb.stanford.edu/~jhf/ftp/trebst.pdf). ## How was this patch tested? Unit tests were added to `GBTClassifierSuite` and `GBTRegressorSuite` to validate feature importances. Author: sethah <seth.hendrickson16@gmail.com> Closes #11961 from sethah/SPARK-11730.
-
- Mar 28, 2016
-
-
Sun Rui authored
## What changes were proposed in this pull request? Refactor RRDD by separating the common logic interacting with the R worker to a new class RRunner, which can be used to evaluate R UDFs. Now RRDD relies on RRuner for RDD computation and RRDD could be reomved if we want to remove RDD API in SparkR later. ## How was this patch tested? dev/lint-r SparkR unit tests Author: Sun Rui <rui.sun@intel.com> Closes #12024 from sun-rui/SPARK-12792_new.
-
Nong Li authored
## What changes were proposed in this pull request? This adds a metric to parquet scans that measures the time in just the scan phase. This is only possible when the scan returns ColumnarBatches, otherwise the overhead is too high. This combined with the pipeline metric lets us easily see what percent of the time was in the scan. Author: Nong Li <nong@databricks.com> Closes #12007 from nongli/spark-14210.
-
Nong Li authored
## What changes were proposed in this pull request? This improves the Filter codegen for NULLs by deferring loading the values for IsNotNull. Instead of generating code like: boolean isNull = ... int value = ... if (isNull) continue; we will generate: boolean isNull = ... if (isNull) continue; int value = ... This is useful since retrieving the values can be non-trivial (they can be dictionary encoded among other things). This currently only works when the attribute comes from the column batch but could be extended to other cases in the future. ## How was this patch tested? On tpcds q55, this fixes the regression from introducing the IsNotNull predicates. ``` TPCDS Snappy: Best/Avg Time(ms) Rate(M/s) Per Row(ns) -------------------------------------------------------------------------------- q55 4564 / 5036 25.2 39.6 q55 4064 / 4340 28.3 35.3 ``` Author: Nong Li <nong@databricks.com> Closes #11792 from nongli/spark-13981.
-
Herman van Hovell authored
### What changes were proposed in this pull request? This PR migrates all HiveQl parsing to the new ANTLR4 parser. This PR is build on top of https://github.com/apache/spark/pull/12011, and we should wait with merging until that one is in (hence the WIP tag). As soon as this PR is merged we can start removing much of the old parser infrastructure. ### How was this patch tested? Exisiting Hive unit tests. cc rxin andrewor14 yhuai Author: Herman van Hovell <hvanhovell@questtec.nl> Closes #12015 from hvanhovell/SPARK-14213.
-
Wenchen Fan authored
## What changes were proposed in this pull request? After DataFrame and Dataset are merged, the trait `Queryable` becomes unnecessary as it has only one implementation. We should remove it. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #12001 from cloud-fan/df-ds.
-
Dongjoon Hyun authored
[SPARK-14219][GRAPHX] Fix `pickRandomVertex` not to fall into infinite loops for graphs with one vertex ## What changes were proposed in this pull request? Currently, `GraphOps.pickRandomVertex()` falls into infinite loops for graphs having only one vertex. This PR fixes it by modifying the following termination-checking condition. ```scala - if (selectedVertices.count > 1) { + if (selectedVertices.count > 0) { ``` ## How was this patch tested? Pass the Jenkins tests (including new test case). Author: Dongjoon Hyun <dongjoon@apache.org> Closes #12018 from dongjoon-hyun/SPARK-14219.
-
jerryshao authored
## What changes were proposed in this pull request? This is a follow-up fix of #9963, in #9963 we handle this stale states clean-up work only for dynamic allocation enabled scenario. Here we should also clean the states in `CoarseGrainedSchedulerBackend` for dynamic allocation disabled scenario. Please review, CC andrewor14 lianhuiwang , thanks a lot. ## How was this patch tested? Run the unit test locally, also with integration test manually. Author: jerryshao <sshao@hortonworks.com> Closes #11366 from jerryshao/SPARK-13447.
-
jeanlyn authored
## What changes were proposed in this pull request? We have a streaming job using `FlumePollInputStream` always driver OOM after few days, here is some driver heap dump before OOM ``` num #instances #bytes class name ---------------------------------------------- 1: 13845916 553836640 org.apache.spark.storage.BlockStatus 2: 14020324 336487776 org.apache.spark.storage.StreamBlockId 3: 13883881 333213144 scala.collection.mutable.DefaultEntry 4: 8907 89043952 [Lscala.collection.mutable.HashEntry; 5: 62360 65107352 [B 6: 163368 24453904 [Ljava.lang.Object; 7: 293651 20342664 [C ... ``` `BlockStatus` and `StreamBlockId` keep on growing, and the driver OOM in the end. After investigated, i found the `executorIdToStorageStatus` in `StorageStatusListener` seems never remove the blocks from `StorageStatus`. In order to fix the issue, i try to use `onBlockUpdated` replace `onTaskEnd ` , so we can update the block informations(add blocks, drop the block from memory to disk and delete the blocks) in time. ## How was this patch tested? Existing unit tests and manual tests Author: jeanlyn <jeanlyn92@gmail.com> Closes #11779 from jeanlyn/fix_driver_oom.
-
Andrew Or authored
## What changes were proposed in this pull request? Before: We just pass all role commands to Hive even though it doesn't work. After: We throw an `AnalysisException` that looks like this: ``` scala> sql("CREATE ROLE x") org.apache.spark.sql.AnalysisException: Unsupported Hive operation: CREATE ROLE; at org.apache.spark.sql.hive.HiveQl$$anonfun$parsePlan$1.apply(HiveQl.scala:213) at org.apache.spark.sql.hive.HiveQl$$anonfun$parsePlan$1.apply(HiveQl.scala:208) at org.apache.spark.sql.catalyst.parser.CatalystQl.safeParse(CatalystQl.scala:49) at org.apache.spark.sql.hive.HiveQl.parsePlan(HiveQl.scala:208) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:198) ``` ## How was this patch tested? `HiveQuerySuite` Author: Andrew Or <andrew@databricks.com> Closes #11948 from andrewor14/ddl-role-management.
-
Andrew Or authored
## What changes were proposed in this pull request? Session catalog was added in #11750. However, it doesn't really support temporary functions properly; right now we only store the metadata in the form of `CatalogFunction`, but this doesn't make sense for temporary functions because there is no class name. This patch moves the `FunctionRegistry` into the `SessionCatalog`. With this, the user can call `catalog.createTempFunction` and `catalog.lookupFunction` to use the function they registered previously. This is currently still dead code, however. ## How was this patch tested? `SessionCatalogSuite`. Author: Andrew Or <andrew@databricks.com> Closes #11972 from andrewor14/temp-functions.
-
Shixiong Zhu authored
## What changes were proposed in this pull request? Extract the workaround for HADOOP-10622 introduced by #11940 into UninterruptibleThread so that we can test and reuse it. ## How was this patch tested? Unit tests Author: Shixiong Zhu <shixiong@databricks.com> Closes #11971 from zsxwing/uninterrupt.
-
Reynold Xin authored
## What changes were proposed in this pull request? UserDefinedType is a developer API in Spark 1.x. With very high probability we will create a new API for user-defined type that also works well with column batches as well as encoders (datasets). In Spark 2.0, let's make `UserDefinedType` `private[spark]` first. ## How was this patch tested? Existing unit tests. Author: Reynold Xin <rxin@databricks.com> Closes #11955 from rxin/SPARK-14155.
-
Andrew Or authored
## What changes were proposed in this pull request? This patch addresses the remaining comments left in #11750 and #11918 after they are merged. For a full list of changes in this patch, just trace the commits. ## How was this patch tested? `SessionCatalogSuite` and `CatalogTestCases` Author: Andrew Or <andrew@databricks.com> Closes #12006 from andrewor14/session-catalog-followup.
-
Shixiong Zhu authored
## What changes were proposed in this pull request? Call `executor.stop` in a new thread to eliminate deadlock. ## How was this patch tested? Existing unit tests Author: Shixiong Zhu <shixiong@databricks.com> Closes #12012 from zsxwing/SPARK-14180.
-
Herman van Hovell authored
#### What changes were proposed in this pull request? This PR adds all the current Spark SQL DDL commands to the new ANTLR 4 based SQL parser. I have found a few inconsistencies in the current commands: - Function has an alias field. This is actually the class name of the function. - Partition specifications should contain nulls in some commands, and contain `None`s in others. - `AlterTableSkewedLocation`: Should defines which columns have skewed values, and should allow us to define storage for each skewed combination of values. We currently only allow one value per field. - `AlterTableSetFileFormat`: Should only have one file format, it currently supports both. I have implemented all these comments like they were, and I propose to improve them in follow-up PRs. #### How was this patch tested? The existing DDLCommandSuite. cc rxin andrewor14 yhuai Author: Herman van Hovell <hvanhovell@questtec.nl> Closes #12011 from hvanhovell/SPARK-14086.
-
Xusen Yin authored
https://issues.apache.org/jira/browse/SPARK-11893 jkbradley In order to share read/write with `TrainValidationSplit`, I move the `SharedReadWrite` out of `CrossValidator` into a new trait `SharedReadWrite` in the tunning package. To reduce the repeated tests, I move the complex tests from `CrossValidatorSuite` to `SharedReadWriteSuite`, and create a fake validator called `MyValidator` to test the shared code. With `SharedReadWrite`, potential newly added `Validator` can share the read/write common part, and only need to implement their extra params save/load. Author: Xusen Yin <yinxusen@gmail.com> Author: Joseph K. Bradley <joseph@databricks.com> Closes #9971 from yinxusen/SPARK-11893.
-
zero323 authored
## What changes were proposed in this pull request? This PR replaces list comprehension in python_full_outer_join.dispatch with a generator expression. ## How was this patch tested? PySpark-Core, PySpark-MLlib test suites against Python 2.7, 3.5. Author: zero323 <matthew.szymkiewicz@gmail.com> Closes #11998 from zero323/pyspark-join-generator-expr.
-
nfraison authored
## What changes were proposed in this pull request? This patch will ensure that we trim all path set in yarn.nodemanager.local-dirs and that the the scheme is well removed so the level db can be created. ## How was this patch tested? manual tests. Author: nfraison <nfraison@yahoo.fr> Closes #11475 from ashangit/level_db_creation_issue.
-
Yin Huai authored
Seems https://github.com/apache/spark/commit/600c0b69cab4767e8e5a6f4284777d8b9d4bd40e is missing the antlr4 maven plugin. This pr adds it. Author: Yin Huai <yhuai@databricks.com> Closes #12010 from yhuai/mavenAntlr4.
-
Davies Liu authored
## What changes were proposed in this pull request? Currently, for the key that can not fit within a long, we build a hash map for UnsafeHashedRelation, it's converted to BytesToBytesMap after serialization and deserialization. We should build a BytesToBytesMap directly to have better memory efficiency. In order to do that, BytesToBytesMap should support multiple (K,V) pair with the same K, Location.putNewKey() is renamed to Location.append(), which could append multiple values for the same key (same Location). `Location.newValue()` is added to find the next value for the same key. ## How was this patch tested? Existing tests. Added benchmark for broadcast hash join with duplicated keys. Author: Davies Liu <davies@databricks.com> Closes #11870 from davies/map2.
-
Herman van Hovell authored
### What changes were proposed in this pull request? The current ANTLR3 parser is quite complex to maintain and suffers from code blow-ups. This PR introduces a new parser that is based on ANTLR4. This parser is based on the [Presto's SQL parser](https://github.com/facebook/presto/blob/master/presto-parser/src/main/antlr4/com/facebook/presto/sql/parser/SqlBase.g4). The current implementation can parse and create Catalyst and SQL plans. Large parts of the HiveQl DDL and some of the DML functionality is currently missing, the plan is to add this in follow-up PRs. This PR is a work in progress, and work needs to be done in the following area's: - [x] Error handling should be improved. - [x] Documentation should be improved. - [x] Multi-Insert needs to be tested. - [ ] Naming and package locations. ### How was this patch tested? Catalyst and SQL unit tests. Author: Herman van Hovell <hvanhovell@questtec.nl> Closes #11557 from hvanhovell/ngParser.
-