Skip to content
Snippets Groups Projects
  1. Apr 19, 2017
  2. Apr 18, 2017
    • zero323's avatar
      [SPARK-20208][R][DOCS] Document R fpGrowth support · 702d85af
      zero323 authored
      ## What changes were proposed in this pull request?
      
      Document  fpGrowth in:
      
      - vignettes
      - programming guide
      - code example
      
      ## How was this patch tested?
      
      Manual tests.
      
      Author: zero323 <zero323@users.noreply.github.com>
      
      Closes #17557 from zero323/SPARK-20208.
      702d85af
    • Kazuaki Ishizaki's avatar
      [SPARK-20254][SQL] Remove unnecessary data conversion for Dataset with primitive array · e468a96c
      Kazuaki Ishizaki authored
      ## What changes were proposed in this pull request?
      
      This PR elminates unnecessary data conversion, which is introduced by SPARK-19716, for Dataset with primitve array in the generated Java code.
      When we run the following example program, now we get the Java code "Without this PR". In this code, lines 56-82 are unnecessary since the primitive array in ArrayData can be converted into Java primitive array by using ``toDoubleArray()`` method. ``GenericArrayData`` is not required.
      
      ```java
      val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache
      ds.count
      ds.map(e => e).show
      ```
      
      Without this PR
      ```
      == Parsed Logical Plan ==
      'SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
      +- 'MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
         +- 'DeserializeToObject unresolveddeserializer(unresolvedmapobjects(<function1>, getcolumnbyordinal(0, ArrayType(DoubleType,false)), None).toDoubleArray), obj#23: [D
            +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
               +- ExternalRDD [obj#1]
      
      == Analyzed Logical Plan ==
      value: array<double>
      SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
      +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
         +- DeserializeToObject mapobjects(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue5).toDoubleArray, obj#23: [D
            +- SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
               +- ExternalRDD [obj#1]
      
      == Optimized Logical Plan ==
      SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
      +- MapElements <function1>, class [D, [StructField(value,ArrayType(DoubleType,false),true)], obj#24: [D
         +- DeserializeToObject mapobjects(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue5).toDoubleArray, obj#23: [D
            +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
                     +- Scan ExternalRDDScan[obj#1]
      
      == Physical Plan ==
      *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#25]
      +- *MapElements <function1>, obj#24: [D
         +- *DeserializeToObject mapobjects(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, assertnotnull(lambdavariable(MapObjects_loopValue5, MapObjects_loopIsNull5, DoubleType, true), - array element class: "scala.Double", - root class: "scala.Array"), value#2, None, MapObjects_builderValue5).toDoubleArray, obj#23: [D
            +- InMemoryTableScan [value#2]
                  +- InMemoryRelation [value#2], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *SerializeFromObject [staticinvoke(class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData, ArrayType(DoubleType,false), fromPrimitiveArray, input[0, [D, true], true) AS value#2]
                           +- Scan ExternalRDDScan[obj#1]
      ```
      
      ```java
      /* 050 */   protected void processNext() throws java.io.IOException {
      /* 051 */     while (inputadapter_input.hasNext() && !stopEarly()) {
      /* 052 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
      /* 053 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
      /* 054 */       ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0));
      /* 055 */
      /* 056 */       ArrayData deserializetoobject_value1 = null;
      /* 057 */
      /* 058 */       if (!inputadapter_isNull) {
      /* 059 */         int deserializetoobject_dataLength = inputadapter_value.numElements();
      /* 060 */
      /* 061 */         Double[] deserializetoobject_convertedArray = null;
      /* 062 */         deserializetoobject_convertedArray = new Double[deserializetoobject_dataLength];
      /* 063 */
      /* 064 */         int deserializetoobject_loopIndex = 0;
      /* 065 */         while (deserializetoobject_loopIndex < deserializetoobject_dataLength) {
      /* 066 */           MapObjects_loopValue2 = (double) (inputadapter_value.getDouble(deserializetoobject_loopIndex));
      /* 067 */           MapObjects_loopIsNull2 = inputadapter_value.isNullAt(deserializetoobject_loopIndex);
      /* 068 */
      /* 069 */           if (MapObjects_loopIsNull2) {
      /* 070 */             throw new RuntimeException(((java.lang.String) references[0]));
      /* 071 */           }
      /* 072 */           if (false) {
      /* 073 */             deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null;
      /* 074 */           } else {
      /* 075 */             deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue2;
      /* 076 */           }
      /* 077 */
      /* 078 */           deserializetoobject_loopIndex += 1;
      /* 079 */         }
      /* 080 */
      /* 081 */         deserializetoobject_value1 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /*###*/
      /* 082 */       }
      /* 083 */       boolean deserializetoobject_isNull = true;
      /* 084 */       double[] deserializetoobject_value = null;
      /* 085 */       if (!inputadapter_isNull) {
      /* 086 */         deserializetoobject_isNull = false;
      /* 087 */         if (!deserializetoobject_isNull) {
      /* 088 */           Object deserializetoobject_funcResult = null;
      /* 089 */           deserializetoobject_funcResult = deserializetoobject_value1.toDoubleArray();
      /* 090 */           if (deserializetoobject_funcResult == null) {
      /* 091 */             deserializetoobject_isNull = true;
      /* 092 */           } else {
      /* 093 */             deserializetoobject_value = (double[]) deserializetoobject_funcResult;
      /* 094 */           }
      /* 095 */
      /* 096 */         }
      /* 097 */         deserializetoobject_isNull = deserializetoobject_value == null;
      /* 098 */       }
      /* 099 */
      /* 100 */       boolean mapelements_isNull = true;
      /* 101 */       double[] mapelements_value = null;
      /* 102 */       if (!false) {
      /* 103 */         mapelements_resultIsNull = false;
      /* 104 */
      /* 105 */         if (!mapelements_resultIsNull) {
      /* 106 */           mapelements_resultIsNull = deserializetoobject_isNull;
      /* 107 */           mapelements_argValue = deserializetoobject_value;
      /* 108 */         }
      /* 109 */
      /* 110 */         mapelements_isNull = mapelements_resultIsNull;
      /* 111 */         if (!mapelements_isNull) {
      /* 112 */           Object mapelements_funcResult = null;
      /* 113 */           mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue);
      /* 114 */           if (mapelements_funcResult == null) {
      /* 115 */             mapelements_isNull = true;
      /* 116 */           } else {
      /* 117 */             mapelements_value = (double[]) mapelements_funcResult;
      /* 118 */           }
      /* 119 */
      /* 120 */         }
      /* 121 */         mapelements_isNull = mapelements_value == null;
      /* 122 */       }
      /* 123 */
      /* 124 */       serializefromobject_resultIsNull = false;
      /* 125 */
      /* 126 */       if (!serializefromobject_resultIsNull) {
      /* 127 */         serializefromobject_resultIsNull = mapelements_isNull;
      /* 128 */         serializefromobject_argValue = mapelements_value;
      /* 129 */       }
      /* 130 */
      /* 131 */       boolean serializefromobject_isNull = serializefromobject_resultIsNull;
      /* 132 */       final ArrayData serializefromobject_value = serializefromobject_resultIsNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue);
      /* 133 */       serializefromobject_isNull = serializefromobject_value == null;
      /* 134 */       serializefromobject_holder.reset();
      /* 135 */
      /* 136 */       serializefromobject_rowWriter.zeroOutNullBytes();
      /* 137 */
      /* 138 */       if (serializefromobject_isNull) {
      /* 139 */         serializefromobject_rowWriter.setNullAt(0);
      /* 140 */       } else {
      /* 141 */         // Remember the current cursor so that we can calculate how many bytes are
      /* 142 */         // written later.
      /* 143 */         final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
      /* 144 */
      /* 145 */         if (serializefromobject_value instanceof UnsafeArrayData) {
      /* 146 */           final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
      /* 147 */           // grow the global buffer before writing data.
      /* 148 */           serializefromobject_holder.grow(serializefromobject_sizeInBytes);
      /* 149 */           ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
      /* 150 */           serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
      /* 151 */
      /* 152 */         } else {
      /* 153 */           final int serializefromobject_numElements = serializefromobject_value.numElements();
      /* 154 */           serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 8);
      /* 155 */
      /* 156 */           for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) {
      /* 157 */             if (serializefromobject_value.isNullAt(serializefromobject_index)) {
      /* 158 */               serializefromobject_arrayWriter.setNullDouble(serializefromobject_index);
      /* 159 */             } else {
      /* 160 */               final double serializefromobject_element = serializefromobject_value.getDouble(serializefromobject_index);
      /* 161 */               serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element);
      /* 162 */             }
      /* 163 */           }
      /* 164 */         }
      /* 165 */
      /* 166 */         serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
      /* 167 */       }
      /* 168 */       serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
      /* 169 */       append(serializefromobject_result);
      /* 170 */       if (shouldStop()) return;
      /* 171 */     }
      /* 172 */   }
      ```
      
      With this PR (eliminated lines 56-62 in the above code)
      ```java
      /* 047 */   protected void processNext() throws java.io.IOException {
      /* 048 */     while (inputadapter_input.hasNext() && !stopEarly()) {
      /* 049 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
      /* 050 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
      /* 051 */       ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0));
      /* 052 */
      /* 053 */       boolean deserializetoobject_isNull = true;
      /* 054 */       double[] deserializetoobject_value = null;
      /* 055 */       if (!inputadapter_isNull) {
      /* 056 */         deserializetoobject_isNull = false;
      /* 057 */         if (!deserializetoobject_isNull) {
      /* 058 */           Object deserializetoobject_funcResult = null;
      /* 059 */           deserializetoobject_funcResult = inputadapter_value.toDoubleArray();
      /* 060 */           if (deserializetoobject_funcResult == null) {
      /* 061 */             deserializetoobject_isNull = true;
      /* 062 */           } else {
      /* 063 */             deserializetoobject_value = (double[]) deserializetoobject_funcResult;
      /* 064 */           }
      /* 065 */
      /* 066 */         }
      /* 067 */         deserializetoobject_isNull = deserializetoobject_value == null;
      /* 068 */       }
      /* 069 */
      /* 070 */       boolean mapelements_isNull = true;
      /* 071 */       double[] mapelements_value = null;
      /* 072 */       if (!false) {
      /* 073 */         mapelements_resultIsNull = false;
      /* 074 */
      /* 075 */         if (!mapelements_resultIsNull) {
      /* 076 */           mapelements_resultIsNull = deserializetoobject_isNull;
      /* 077 */           mapelements_argValue = deserializetoobject_value;
      /* 078 */         }
      /* 079 */
      /* 080 */         mapelements_isNull = mapelements_resultIsNull;
      /* 081 */         if (!mapelements_isNull) {
      /* 082 */           Object mapelements_funcResult = null;
      /* 083 */           mapelements_funcResult = ((scala.Function1) references[0]).apply(mapelements_argValue);
      /* 084 */           if (mapelements_funcResult == null) {
      /* 085 */             mapelements_isNull = true;
      /* 086 */           } else {
      /* 087 */             mapelements_value = (double[]) mapelements_funcResult;
      /* 088 */           }
      /* 089 */
      /* 090 */         }
      /* 091 */         mapelements_isNull = mapelements_value == null;
      /* 092 */       }
      /* 093 */
      /* 094 */       serializefromobject_resultIsNull = false;
      /* 095 */
      /* 096 */       if (!serializefromobject_resultIsNull) {
      /* 097 */         serializefromobject_resultIsNull = mapelements_isNull;
      /* 098 */         serializefromobject_argValue = mapelements_value;
      /* 099 */       }
      /* 100 */
      /* 101 */       boolean serializefromobject_isNull = serializefromobject_resultIsNull;
      /* 102 */       final ArrayData serializefromobject_value = serializefromobject_resultIsNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue);
      /* 103 */       serializefromobject_isNull = serializefromobject_value == null;
      /* 104 */       serializefromobject_holder.reset();
      /* 105 */
      /* 106 */       serializefromobject_rowWriter.zeroOutNullBytes();
      /* 107 */
      /* 108 */       if (serializefromobject_isNull) {
      /* 109 */         serializefromobject_rowWriter.setNullAt(0);
      /* 110 */       } else {
      /* 111 */         // Remember the current cursor so that we can calculate how many bytes are
      /* 112 */         // written later.
      /* 113 */         final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
      /* 114 */
      /* 115 */         if (serializefromobject_value instanceof UnsafeArrayData) {
      /* 116 */           final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
      /* 117 */           // grow the global buffer before writing data.
      /* 118 */           serializefromobject_holder.grow(serializefromobject_sizeInBytes);
      /* 119 */           ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
      /* 120 */           serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
      /* 121 */
      /* 122 */         } else {
      /* 123 */           final int serializefromobject_numElements = serializefromobject_value.numElements();
      /* 124 */           serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 8);
      /* 125 */
      /* 126 */           for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) {
      /* 127 */             if (serializefromobject_value.isNullAt(serializefromobject_index)) {
      /* 128 */               serializefromobject_arrayWriter.setNullDouble(serializefromobject_index);
      /* 129 */             } else {
      /* 130 */               final double serializefromobject_element = serializefromobject_value.getDouble(serializefromobject_index);
      /* 131 */               serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element);
      /* 132 */             }
      /* 133 */           }
      /* 134 */         }
      /* 135 */
      /* 136 */         serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
      /* 137 */       }
      /* 138 */       serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
      /* 139 */       append(serializefromobject_result);
      /* 140 */       if (shouldStop()) return;
      /* 141 */     }
      /* 142 */   }
      ```
      
      ## How was this patch tested?
      
      Add test suites into `DatasetPrimitiveSuite`
      
      Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
      
      Closes #17568 from kiszk/SPARK-20254.
      e468a96c
    • Tathagata Das's avatar
      [SPARK-20377][SS] Fix JavaStructuredSessionization example · 74aa0df8
      Tathagata Das authored
      ## What changes were proposed in this pull request?
      
      Extra accessors in java bean class causes incorrect encoder generation, which corrupted the state when using timeouts.
      
      ## How was this patch tested?
      manually ran the example
      
      Author: Tathagata Das <tathagata.das1565@gmail.com>
      
      Closes #17676 from tdas/SPARK-20377.
      74aa0df8
    • Kyle Kelley's avatar
      [SPARK-20360][PYTHON] reprs for interpreters · f654b39a
      Kyle Kelley authored
      ## What changes were proposed in this pull request?
      
      Establishes a very minimal `_repr_html_` for PySpark's `SparkContext`.
      
      ## How was this patch tested?
      
      nteract:
      
      ![screen shot 2017-04-17 at 3 41 29 pm](https://cloud.githubusercontent.com/assets/836375/25107701/d57090ba-2385-11e7-8147-74bc2c50a41b.png)
      
      Jupyter:
      
      ![screen shot 2017-04-17 at 3 53 19 pm](https://cloud.githubusercontent.com/assets/836375/25107725/05bf1fe8-2386-11e7-93e1-07a20c917dde.png)
      
      Hydrogen:
      
      ![screen shot 2017-04-17 at 3 49 55 pm](https://cloud.githubusercontent.com/assets/836375/25107664/a75e1ddc-2385-11e7-8477-258661833007.png)
      
      Author: Kyle Kelley <rgbkrk@gmail.com>
      
      Closes #17662 from rgbkrk/repr.
      f654b39a
    • 郭小龙 10207633's avatar
      [SPARK-20354][CORE][REST-API] When I request access to the 'http:... · 1f81dda3
      郭小龙 10207633 authored
      [SPARK-20354][CORE][REST-API] When I request access to the 'http: //ip:port/api/v1/applications' link, return 'sparkUser' is empty in REST API.
      
      ## What changes were proposed in this pull request?
      
      When I request access to the 'http: //ip:port/api/v1/applications' link, get the json. I need the 'sparkUser' field specific value, because my Spark big data management platform needs to filter through this field which user submits the application to facilitate my administration and query, but the current return of the json string is empty, causing me this Function can not be achieved, that is, I do not know who the specific application is submitted by this REST Api.
      
      **current return json:**
      [ {
        "id" : "app-20170417152053-0000",
        "name" : "KafkaWordCount",
        "attempts" : [ {
          "startTime" : "2017-04-17T07:20:51.395GMT",
          "endTime" : "1969-12-31T23:59:59.999GMT",
          "lastUpdated" : "2017-04-17T07:20:51.395GMT",
          "duration" : 0,
          **"sparkUser" : "",**
          "completed" : false,
          "endTimeEpoch" : -1,
          "startTimeEpoch" : 1492413651395,
          "lastUpdatedEpoch" : 1492413651395
        } ]
      } ]
      
      **When I fix this question, return json:**
      [ {
        "id" : "app-20170417154201-0000",
        "name" : "KafkaWordCount",
        "attempts" : [ {
          "startTime" : "2017-04-17T07:41:57.335GMT",
          "endTime" : "1969-12-31T23:59:59.999GMT",
          "lastUpdated" : "2017-04-17T07:41:57.335GMT",
          "duration" : 0,
          **"sparkUser" : "mr",**
          "completed" : false,
          "startTimeEpoch" : 1492414917335,
          "endTimeEpoch" : -1,
          "lastUpdatedEpoch" : 1492414917335
        } ]
      } ]
      
      ## How was this patch tested?
      
      manual tests
      
      Please review http://spark.apache.org/contributing.html before opening a pull request.
      
      Author: 郭小龙 10207633 <guo.xiaolong1@zte.com.cn>
      Author: guoxiaolong <guo.xiaolong1@zte.com.cn>
      Author: guoxiaolongzte <guo.xiaolong1@zte.com.cn>
      
      Closes #17656 from guoxiaolongzte/SPARK-20354.
      1f81dda3
    • wangzhenhua's avatar
      [SPARK-20366][SQL] Fix recursive join reordering: inside joins are not reordered · 321b4f03
      wangzhenhua authored
      ## What changes were proposed in this pull request?
      
      If a plan has multi-level successive joins, e.g.:
      ```
               Join
               /   \
           Union   t5
            /   \
          Join  t4
          /   \
        Join  t3
        /  \
       t1   t2
      ```
      Currently we fail to reorder the inside joins, i.e. t1, t2, t3.
      
      In join reorder, we use `OrderedJoin` to indicate a join has been ordered, such that when transforming down the plan, these joins don't need to be rerodered again.
      
      But there's a problem in the definition of `OrderedJoin`:
      The real join node is a parameter, but not a child. This breaks the transform procedure because `mapChildren` applies transform function on parameters which should be children.
      
      In this patch, we change `OrderedJoin` to a class having the same structure as a join node.
      
      ## How was this patch tested?
      
      Add a corresponding test case.
      
      Author: wangzhenhua <wangzhenhua@huawei.com>
      
      Closes #17668 from wzhfy/recursiveReorder.
      321b4f03
    • hyukjinkwon's avatar
      [SPARK-20343][BUILD] Force Avro 1.7.7 in sbt build to resolve build failure in... · d4f10cbb
      hyukjinkwon authored
      [SPARK-20343][BUILD] Force Avro 1.7.7 in sbt build to resolve build failure in SBT Hadoop 2.6 master on Jenkins
      
      ## What changes were proposed in this pull request?
      
      This PR proposes to force Avro's version to 1.7.7 in core to resolve the build failure as below:
      
      ```
      [error] /home/jenkins/workspace/spark-master-test-sbt-hadoop-2.6/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala:123: value createDatumWriter is not a member of org.apache.avro.generic.GenericData
      [error]     writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema))
      [error]
      ```
      
      https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/2770/consoleFull
      
      Note that this is a hack and should be removed in the future.
      
      ## How was this patch tested?
      
      I only tested this actually overrides the dependency.
      
      I tried many ways but I was unable to reproduce this in my local. Sean also tried the way I did but he was also unable to reproduce this.
      
      Please refer the comments in https://github.com/apache/spark/pull/17477#issuecomment-294094092
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #17651 from HyukjinKwon/SPARK-20343-sbt.
      d4f10cbb
    • Robert Stupp's avatar
      [SPARK-20344][SCHEDULER] Duplicate call in FairSchedulableBuilder.addTaskSetManager · 07fd94e0
      Robert Stupp authored
      ## What changes were proposed in this pull request?
      
      Eliminate the duplicate call to `Pool.getSchedulableByName()` in `FairSchedulableBuilder.addTaskSetManager`
      
      ## How was this patch tested?
      
      ./dev/run-tests
      
      Author: Robert Stupp <snazy@snazy.de>
      
      Closes #17647 from snazy/20344-dup-call-master.
      07fd94e0
    • Felix Cheung's avatar
      [SPARK-17647][SQL][FOLLOWUP][MINOR] fix typo · b0a1e93e
      Felix Cheung authored
      ## What changes were proposed in this pull request?
      
      fix typo
      
      ## How was this patch tested?
      
      manual
      
      Author: Felix Cheung <felixcheung_m@hotmail.com>
      
      Closes #17663 from felixcheung/likedoctypo.
      b0a1e93e
  3. Apr 17, 2017
    • Jacek Laskowski's avatar
      [TEST][MINOR] Replace repartitionBy with distribute in CollapseRepartitionSuite · 33ea908a
      Jacek Laskowski authored
      ## What changes were proposed in this pull request?
      
      Replace non-existent `repartitionBy` with `distribute` in `CollapseRepartitionSuite`.
      
      ## How was this patch tested?
      
      local build and `catalyst/testOnly *CollapseRepartitionSuite`
      
      Author: Jacek Laskowski <jacek@japila.pl>
      
      Closes #17657 from jaceklaskowski/CollapseRepartitionSuite.
      33ea908a
    • Andrew Ash's avatar
      Typo fix: distitrbuted -> distributed · 0075562d
      Andrew Ash authored
      ## What changes were proposed in this pull request?
      
      Typo fix: distitrbuted -> distributed
      
      ## How was this patch tested?
      
      Existing tests
      
      Author: Andrew Ash <andrew@andrewash.com>
      
      Closes #17664 from ash211/patch-1.
      0075562d
    • Jakob Odersky's avatar
      [SPARK-17647][SQL] Fix backslash escaping in 'LIKE' patterns. · e5fee3e4
      Jakob Odersky authored
      ## What changes were proposed in this pull request?
      
      This patch fixes a bug in the way LIKE patterns are translated to Java regexes. The bug causes any character following an escaped backslash to be escaped, i.e. there is double-escaping.
      A concrete example is the following pattern:`'%\\%'`. The expected Java regex that this pattern should correspond to (according to the behavior described below) is `'.*\\.*'`, however the current situation leads to `'.*\\%'` instead.
      
      ---
      
      Update: in light of the discussion that ensued, we should explicitly define the expected behaviour of LIKE expressions, especially in certain edge cases. With the help of gatorsmile, we put together a list of different RDBMS and their variations wrt to certain standard features.
      
      | RDBMS\Features | Wildcards | Default escape [1] | Case sensitivity |
      | --- | --- | --- | --- |
      | [MS SQL Server](https://msdn.microsoft.com/en-us/library/ms179859.aspx) | _, %, [], [^] | none | no |
      | [Oracle](https://docs.oracle.com/cd/B12037_01/server.101/b10759/conditions016.htm) | _, % | none | yes |
      | [DB2 z/OS](http://www.ibm.com/support/knowledgecenter/SSEPEK_11.0.0/sqlref/src/tpc/db2z_likepredicate.html) | _, % | none | yes |
      | [MySQL](http://dev.mysql.com/doc/refman/5.7/en/string-comparison-functions.html) | _, % | none | no |
      | [PostreSQL](https://www.postgresql.org/docs/9.0/static/functions-matching.html) | _, % | \ | yes |
      | [Hive](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF) | _, % | none | yes |
      | Current Spark | _, % | \ | yes |
      
      [1] Default escape character: most systems do not have a default escape character, instead the user can specify one by calling a like expression with an escape argument [A] LIKE [B] ESCAPE [C]. This syntax is currently not supported by Spark, however I would volunteer to implement this feature in a separate ticket.
      
      The specifications are often quite terse and certain scenarios are undocumented, so here is a list of scenarios that I am uncertain about and would appreciate any input. Specifically I am looking for feedback on whether or not Spark's current behavior should be changed.
      1. [x] Ending a pattern with the escape sequence, e.g. `like 'a\'`.
         PostreSQL gives an error: 'LIKE pattern must not end with escape character', which I personally find logical. Currently, Spark allows "non-terminated" escapes and simply ignores them as part of the pattern.
         According to [DB2's documentation](http://www.ibm.com/support/knowledgecenter/SSEPGG_9.7.0/com.ibm.db2.luw.messages.sql.doc/doc/msql00130n.html), ending a pattern in an escape character is invalid.
         _Proposed new behaviour in Spark: throw AnalysisException_
      2. [x] Empty input, e.g. `'' like ''`
         Postgres and DB2 will match empty input only if the pattern is empty as well, any other combination of empty input will not match. Spark currently follows this rule.
      3. [x] Escape before a non-special character, e.g. `'a' like '\a'`.
         Escaping a non-wildcard character is not really documented but PostgreSQL just treats it verbatim, which I also find the least surprising behavior. Spark does the same.
         According to [DB2's documentation](http://www.ibm.com/support/knowledgecenter/SSEPGG_9.7.0/com.ibm.db2.luw.messages.sql.doc/doc/msql00130n.html), it is invalid to follow an escape character with anything other than an escape character, an underscore or a percent sign.
         _Proposed new behaviour in Spark: throw AnalysisException_
      
      The current specification is also described in the operator's source code in this patch.
      ## How was this patch tested?
      
      Extra case in regex unit tests.
      
      Author: Jakob Odersky <jakob@odersky.com>
      
      This patch had conflicts when merged, resolved by
      Committer: Reynold Xin <rxin@databricks.com>
      
      Closes #15398 from jodersky/SPARK-17647.
      e5fee3e4
    • Xiao Li's avatar
      [SPARK-20349][SQL] ListFunctions returns duplicate functions after using persistent functions · 01ff0350
      Xiao Li authored
      ### What changes were proposed in this pull request?
      The session catalog caches some persistent functions in the `FunctionRegistry`, so there can be duplicates. Our Catalog API `listFunctions` does not handle it.
      
      It would be better if `SessionCatalog` API can de-duplciate the records, instead of doing it by each API caller. In `FunctionRegistry`, our functions are identified by the unquoted string. Thus, this PR is try to parse it using our parser interface and then de-duplicate the names.
      
      ### How was this patch tested?
      Added test cases.
      
      Author: Xiao Li <gatorsmile@gmail.com>
      
      Closes #17646 from gatorsmile/showFunctions.
      01ff0350
    • hyukjinkwon's avatar
      [SPARK-19828][R][FOLLOWUP] Rename asJsonArray to as.json.array in from_json function in R · 24f09b39
      hyukjinkwon authored
      ## What changes were proposed in this pull request?
      
      This was suggested to be `as.json.array` at the first place in the PR to SPARK-19828 but we could not do this as the lint check emits an error for multiple dots in the variable names.
      
      After SPARK-20278, now we are able to use `multiple.dots.in.names`. `asJsonArray` in `from_json` function is still able to be changed as 2.2 is not released yet.
      
      So, this PR proposes to rename `asJsonArray` to `as.json.array`.
      
      ## How was this patch tested?
      
      Jenkins tests, local tests with `./R/run-tests.sh` and manual `./dev/lint-r`. Existing tests should cover this.
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #17653 from HyukjinKwon/SPARK-19828-followup.
      24f09b39
  4. Apr 16, 2017
    • hyukjinkwon's avatar
      [SPARK-20278][R] Disable 'multiple_dots_linter' lint rule that is against project's code style · 86d251c5
      hyukjinkwon authored
      ## What changes were proposed in this pull request?
      
      Currently, multi-dot separated variables in R is not allowed. For example,
      
      ```diff
       setMethod("from_json", signature(x = "Column", schema = "structType"),
      -          function(x, schema, asJsonArray = FALSE, ...) {
      +          function(x, schema, as.json.array = FALSE, ...) {
                   if (asJsonArray) {
                     jschema <- callJStatic("org.apache.spark.sql.types.DataTypes",
                                            "createArrayType",
      ```
      
      produces an error as below:
      
      ```
      R/functions.R:2462:31: style: Words within variable and function names should be separated by '_' rather than '.'.
                function(x, schema, as.json.array = FALSE, ...) {
                                    ^~~~~~~~~~~~~
      ```
      
      This seems against https://google.github.io/styleguide/Rguide.xml#identifiers which says
      
      > The preferred form for variable names is all lower case letters and words separated with dots
      
      This looks because lintr by default https://github.com/jimhester/lintr follows http://r-pkgs.had.co.nz/style.html as written in the README.md. Few cases seems not following Google's one as "a few tweaks".
      
      Per [SPARK-6813](https://issues.apache.org/jira/browse/SPARK-6813), we follow Google's R Style Guide with few exceptions https://google.github.io/styleguide/Rguide.xml. This is also merged into Spark's website - https://github.com/apache/spark-website/pull/43
      
      Also, it looks we have no limit on function name. This rule also looks affecting to the name of functions as written in the README.md.
      
      > `multiple_dots_linter`: check that function and variable names are separated by _ rather than ..
      
      ## How was this patch tested?
      
      Manually tested `./dev/lint-r`with the manual change below in `R/functions.R`:
      
      ```diff
       setMethod("from_json", signature(x = "Column", schema = "structType"),
      -          function(x, schema, asJsonArray = FALSE, ...) {
      +          function(x, schema, as.json.array = FALSE, ...) {
                   if (asJsonArray) {
                     jschema <- callJStatic("org.apache.spark.sql.types.DataTypes",
                                            "createArrayType",
      ```
      
      **Before**
      
      ```R
      R/functions.R:2462:31: style: Words within variable and function names should be separated by '_' rather than '.'.
                function(x, schema, as.json.array = FALSE, ...) {
                                    ^~~~~~~~~~~~~
      ```
      
      **After**
      
      ```
      lintr checks passed.
      ```
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #17590 from HyukjinKwon/disable-dot-in-name.
      86d251c5
    • hyukjinkwon's avatar
      [SPARK-20343][BUILD] Add avro dependency in core POM to resolve build failure... · ad935f52
      hyukjinkwon authored
      [SPARK-20343][BUILD] Add avro dependency in core POM to resolve build failure in SBT Hadoop 2.6 master on Jenkins
      
      ## What changes were proposed in this pull request?
      
      This PR proposes to add
      
      ```
            <dependency>
              <groupId>org.apache.avro</groupId>
              <artifactId>avro</artifactId>
            </dependency>
      ```
      
      in core POM to see if it resolves the build failure as below:
      
      ```
      [error] /home/jenkins/workspace/spark-master-test-sbt-hadoop-2.6/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala:123: value createDatumWriter is not a member of org.apache.avro.generic.GenericData
      [error]     writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema))
      [error]
      ```
      
      https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/2770/consoleFull
      
      ## How was this patch tested?
      
      I tried many ways but I was unable to reproduce this in my local. Sean also tried the way I did but he was also unable to reproduce this.
      
      Please refer the comments in https://github.com/apache/spark/pull/17477#issuecomment-294094092
      
      Author: hyukjinkwon <gurwls223@gmail.com>
      
      Closes #17642 from HyukjinKwon/SPARK-20343.
      ad935f52
    • Ji Yan's avatar
      [SPARK-19740][MESOS] Add support in Spark to pass arbitrary parameters into... · a888fed3
      Ji Yan authored
      [SPARK-19740][MESOS] Add support in Spark to pass arbitrary parameters into docker when running on mesos with docker containerizer
      
      ## What changes were proposed in this pull request?
      
      Allow passing in arbitrary parameters into docker when launching spark executors on mesos with docker containerizer tnachen
      
      ## How was this patch tested?
      
      Manually built and tested with passed in parameter
      
      Author: Ji Yan <jiyan@Jis-MacBook-Air.local>
      
      Closes #17109 from yanji84/ji/allow_set_docker_user.
      a888fed3
  5. Apr 15, 2017
    • Xiao Li's avatar
      [SPARK-20335][SQL] Children expressions of Hive UDF impacts the determinism of Hive UDF · e090f3c0
      Xiao Li authored
      ### What changes were proposed in this pull request?
      ```JAVA
        /**
         * Certain optimizations should not be applied if UDF is not deterministic.
         * Deterministic UDF returns same result each time it is invoked with a
         * particular input. This determinism just needs to hold within the context of
         * a query.
         *
         * return true if the UDF is deterministic
         */
        boolean deterministic() default true;
      ```
      
      Based on the definition of [UDFType](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFType.java#L42-L50), when Hive UDF's children are non-deterministic, Hive UDF is also non-deterministic.
      
      ### How was this patch tested?
      Added test cases.
      
      Author: Xiao Li <gatorsmile@gmail.com>
      
      Closes #17635 from gatorsmile/udfDeterministic.
      e090f3c0
    • Wenchen Fan's avatar
      [SPARK-19716][SQL][FOLLOW-UP] UnresolvedMapObjects should always be serializable · 35e5ae4f
      Wenchen Fan authored
      ## What changes were proposed in this pull request?
      
      In https://github.com/apache/spark/pull/17398 we introduced `UnresolvedMapObjects` as a placeholder of `MapObjects`. Unfortunately `UnresolvedMapObjects` is not serializable as its `function` may reference Scala `Type` which is not serializable.
      
      Ideally this is fine, as we will never serialize and send unresolved expressions to executors. However users may accidentally do this, e.g. mistakenly reference an encoder instance when implementing `Aggregator`, we should fix it so that it's just a performance issue(more network traffic) and should not fail the query.
      
      ## How was this patch tested?
      
      N/A
      
      Author: Wenchen Fan <wenchen@databricks.com>
      
      Closes #17639 from cloud-fan/minor.
      35e5ae4f
    • ouyangxiaochen's avatar
      [SPARK-20316][SQL] Val and Var should strictly follow the Scala syntax · 98b41ecb
      ouyangxiaochen authored
      ## What changes were proposed in this pull request?
      
      val and var should strictly follow the Scala syntax
      
      ## How was this patch tested?
      
      manual test and exisiting test cases
      
      Author: ouyangxiaochen <ou.yangxiaochen@zte.com.cn>
      
      Closes #17628 from ouyangxiaochen/spark-413.
      98b41ecb
  6. Apr 14, 2017
    • wangzhenhua's avatar
      [SPARK-20318][SQL] Use Catalyst type for min/max in ColumnStat for ease of estimation · fb036c44
      wangzhenhua authored
      ## What changes were proposed in this pull request?
      
      Currently when estimating predicates like col > literal or col = literal, we will update min or max in column stats based on literal value. However, literal value is of Catalyst type (internal type), while min/max is of external type. Then for the next predicate, we again need to do type conversion to compare and update column stats. This is awkward and causes many unnecessary conversions in estimation.
      
      To solve this, we use Catalyst type for min/max in `ColumnStat`. Note that the persistent format in metastore is still of external type, so there's no inconsistency for statistics in metastore.
      
      This pr also fixes a bug for boolean type in `IN` condition.
      
      ## How was this patch tested?
      
      The changes for ColumnStat are covered by existing tests.
      For bug fix, a new test for boolean type in IN condition is added
      
      Author: wangzhenhua <wangzhenhua@huawei.com>
      
      Closes #17630 from wzhfy/refactorColumnStat.
      fb036c44
  7. Apr 13, 2017
    • Steve Loughran's avatar
      [SPARK-20038][SQL] FileFormatWriter.ExecuteWriteTask.releaseResources()... · 7536e284
      Steve Loughran authored
      [SPARK-20038][SQL] FileFormatWriter.ExecuteWriteTask.releaseResources() implementations to be re-entrant
      
      ## What changes were proposed in this pull request?
      
      have the`FileFormatWriter.ExecuteWriteTask.releaseResources()` implementations  set `currentWriter=null` in a finally clause. This guarantees that if the first call to `currentWriter()` throws an exception, the second releaseResources() call made during the task cancel process will not trigger a second attempt to close the stream.
      
      ## How was this patch tested?
      
      Tricky. I've been fixing the underlying cause when I saw the problem [HADOOP-14204](https://issues.apache.org/jira/browse/HADOOP-14204), but SPARK-10109 shows I'm not the first to have seen this. I can't replicate it locally any more, my code no longer being broken.
      
      code review, however, should be straightforward
      
      Author: Steve Loughran <stevel@hortonworks.com>
      
      Closes #17364 from steveloughran/stevel/SPARK-20038-close.
      7536e284
    • David Gingrich's avatar
      [SPARK-20232][PYTHON] Improve combineByKey docs · 8ddf0d2a
      David Gingrich authored
      ## What changes were proposed in this pull request?
      
      Improve combineByKey documentation:
      
      * Add note on memory allocation
      * Change example code to use different mergeValue and mergeCombiners
      
      ## How was this patch tested?
      
      Doctest.
      
      ## Legal
      
      This is my original work and I license the work to the project under the project’s open source license.
      
      Author: David Gingrich <david@textio.com>
      
      Closes #17545 from dgingrich/topic-spark-20232-combinebykey-docs.
      8ddf0d2a
    • Ioana Delaney's avatar
      [SPARK-20233][SQL] Apply star-join filter heuristics to dynamic programming join enumeration · fbe4216e
      Ioana Delaney authored
      ## What changes were proposed in this pull request?
      
      Implements star-join filter to reduce the search space for dynamic programming join enumeration. Consider the following join graph:
      
      ```
      T1       D1 - T2 - T3
        \     /
          F1
           |
          D2
      
      star-join: {F1, D1, D2}
      non-star: {T1, T2, T3}
      ```
      The following join combinations will be generated:
      ```
      level 0: (F1), (D1), (D2), (T1), (T2), (T3)
      level 1: {F1, D1}, {F1, D2}, {T2, T3}
      level 2: {F1, D1, D2}
      level 3: {F1, D1, D2, T1}, {F1, D1, D2, T2}
      level 4: {F1, D1, D2, T1, T2}, {F1, D1, D2, T2, T3 }
      level 6: {F1, D1, D2, T1, T2, T3}
      ```
      
      ## How was this patch tested?
      
      New test suite ```StarJOinCostBasedReorderSuite.scala```.
      
      Author: Ioana Delaney <ioanamdelaney@gmail.com>
      
      Closes #17546 from ioana-delaney/starSchemaCBOv3.
      fbe4216e
    • Sergei Lebedev's avatar
      [SPARK-20284][CORE] Make {Des,S}erializationStream extend Closeable · a4293c28
      Sergei Lebedev authored
      ## What changes were proposed in this pull request?
      
      This PR allows to use `SerializationStream` and `DeserializationStream` in try-with-resources.
      
      ## How was this patch tested?
      
      `core` unit tests.
      
      Author: Sergei Lebedev <s.lebedev@criteo.com>
      
      Closes #17598 from superbobry/compression-stream-closeable.
      a4293c28
    • Syrux's avatar
      [SPARK-20265][MLLIB] Improve Prefix'span pre-processing efficiency · 095d1cb3
      Syrux authored
      ## What changes were proposed in this pull request?
      
      Improve PrefixSpan pre-processing efficency by preventing sequences of zero in the cleaned database.
      The efficiency gain is reflected in the following graph : https://postimg.org/image/9x6ireuvn/
      
      ## How was this patch tested?
      
      Using MLlib's PrefixSpan existing tests and tests of my own on the 8 datasets shown in the graph. All
      result obtained were stricly the same as the original implementation (without this change).
      dev/run-tests was also runned, no error were found.
      
      Author : Cyril de Vogelaere <cyril.devogelaeregmail.com>
      
      Author: Syrux <pokcyril@hotmail.com>
      
      Closes #17575 from Syrux/SPARK-20265.
      095d1cb3
    • Yash Sharma's avatar
      [SPARK-20189][DSTREAM] Fix spark kinesis testcases to remove deprecated... · ec68d8f8
      Yash Sharma authored
      [SPARK-20189][DSTREAM] Fix spark kinesis testcases to remove deprecated createStream and use Builders
      
      ## What changes were proposed in this pull request?
      
      The spark-kinesis testcases use the KinesisUtils.createStream which are deprecated now. Modify the testcases to use the recommended KinesisInputDStream.builder instead.
      This change will also enable the testcases to automatically use the session tokens automatically.
      
      ## How was this patch tested?
      
      All the existing testcases work fine as expected with the changes.
      
      https://issues.apache.org/jira/browse/SPARK-20189
      
      Author: Yash Sharma <ysharma@atlassian.com>
      
      Closes #17506 from yssharma/ysharma/cleanup_kinesis_testcases.
      ec68d8f8
  8. Apr 12, 2017
    • Shixiong Zhu's avatar
      [SPARK-20131][CORE] Don't use `this` lock in StandaloneSchedulerBackend.stop · c5f1cc37
      Shixiong Zhu authored
      ## What changes were proposed in this pull request?
      
      `o.a.s.streaming.StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` is flaky is because there is a potential dead-lock in StandaloneSchedulerBackend which causes `await` timeout. Here is the related stack trace:
      ```
      "Thread-31" #211 daemon prio=5 os_prio=31 tid=0x00007fedd4808000 nid=0x16403 waiting on condition [0x00007000239b7000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at sun.misc.Unsafe.park(Native Method)
      	- parking to wait for  <0x000000079b49ca10> (a scala.concurrent.impl.Promise$CompletionLatch)
      	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
      	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
      	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
      	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
      	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
      	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
      	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
      	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
      	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:402)
      	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.org$apache$spark$scheduler$cluster$StandaloneSchedulerBackend$$stop(StandaloneSchedulerBackend.scala:213)
      	- locked <0x00000007066fca38> (a org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend)
      	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.stop(StandaloneSchedulerBackend.scala:116)
      	- locked <0x00000007066fca38> (a org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend)
      	at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:517)
      	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1657)
      	at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1921)
      	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1302)
      	at org.apache.spark.SparkContext.stop(SparkContext.scala:1920)
      	at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:708)
      	at org.apache.spark.streaming.StreamingContextSuite$$anonfun$43$$anonfun$apply$mcV$sp$66$$anon$3.run(StreamingContextSuite.scala:827)
      
      "dispatcher-event-loop-3" #18 daemon prio=5 os_prio=31 tid=0x00007fedd603a000 nid=0x6203 waiting for monitor entry [0x0000700003be4000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:253)
      	- waiting to lock <0x00000007066fca38> (a org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend)
      	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:124)
      	at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
      	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
      	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
      	at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      ```
      
      This PR removes `synchronized` and changes `stopping` to AtomicBoolean to ensure idempotent to fix the dead-lock.
      
      ## How was this patch tested?
      
      Jenkins
      
      Author: Shixiong Zhu <shixiong@databricks.com>
      
      Closes #17610 from zsxwing/SPARK-20131.
      c5f1cc37
    • Wenchen Fan's avatar
      [SPARK-15354][FLAKY-TEST] TopologyAwareBlockReplicationPolicyBehavior.Peers in 2 racks · a7b430b5
      Wenchen Fan authored
      ## What changes were proposed in this pull request?
      
      `TopologyAwareBlockReplicationPolicyBehavior.Peers in 2 racks` is failing occasionally: https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.storage.TopologyAwareBlockReplicationPolicyBehavior&test_name=Peers+in+2+racks.
      
      This is because, when we generate 10 block manager id to test, they may all belong to the same rack, as the rack is randomly picked. This PR fixes this problem by forcing each rack to be picked at least once.
      
      ## How was this patch tested?
      
      N/A
      
      Author: Wenchen Fan <wenchen@databricks.com>
      
      Closes #17624 from cloud-fan/test.
      a7b430b5
    • Burak Yavuz's avatar
      [SPARK-20301][FLAKY-TEST] Fix Hadoop Shell.runCommand flakiness in Structured Streaming tests · 924c4247
      Burak Yavuz authored
      ## What changes were proposed in this pull request?
      
      Some Structured Streaming tests show flakiness such as:
      ```
      [info] - prune results by current_date, complete mode - 696 *** FAILED *** (10 seconds, 937 milliseconds)
      [info]   Timed out while stopping and waiting for microbatchthread to terminate.: The code passed to failAfter did not complete within 10 seconds.
      ```
      
      This happens when we wait for the stream to stop, but it doesn't. The reason it doesn't stop is that we interrupt the microBatchThread, but Hadoop's `Shell.runCommand` swallows the interrupt exception, and the exception is not propagated upstream to the microBatchThread. Then this thread continues to run, only to start blocking on the `streamManualClock`.
      
      ## How was this patch tested?
      
      Thousand retries locally and [Jenkins](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75720/testReport) of the flaky tests
      
      Author: Burak Yavuz <brkyvz@gmail.com>
      
      Closes #17613 from brkyvz/flaky-stream-agg.
      924c4247
    • Jeff Zhang's avatar
      [SPARK-19570][PYSPARK] Allow to disable hive in pyspark shell · 99a94731
      Jeff Zhang authored
      ## What changes were proposed in this pull request?
      
      SPARK-15236 do this for scala shell, this ticket is for pyspark shell. This is not only for pyspark itself, but can also benefit downstream project like livy which use shell.py for its interactive session. For now, livy has no control of whether enable hive or not.
      
      ## How was this patch tested?
      
      I didn't find a way to add test for it. Just manually test it.
      Run `bin/pyspark --master local --conf spark.sql.catalogImplementation=in-memory` and verify hive is not enabled.
      
      Author: Jeff Zhang <zjffdu@apache.org>
      
      Closes #16906 from zjffdu/SPARK-19570.
      99a94731
    • Reynold Xin's avatar
      [SPARK-20304][SQL] AssertNotNull should not include path in string representation · 54085538
      Reynold Xin authored
      ## What changes were proposed in this pull request?
      AssertNotNull's toString/simpleString dumps the entire walkedTypePath. walkedTypePath is used for error message reporting and shouldn't be part of the output.
      
      ## How was this patch tested?
      Manually tested.
      
      Author: Reynold Xin <rxin@databricks.com>
      
      Closes #17616 from rxin/SPARK-20304.
      54085538
    • Xiao Li's avatar
      [SPARK-20303][SQL] Rename createTempFunction to registerFunction · 504e62e2
      Xiao Li authored
      ### What changes were proposed in this pull request?
      Session catalog API `createTempFunction` is being used by Hive build-in functions, persistent functions, and temporary functions. Thus, the name is confusing. This PR is to rename it by `registerFunction`. Also we can move construction of `FunctionBuilder` and `ExpressionInfo` into the new `registerFunction`, instead of duplicating the logics everywhere.
      
      In the next PRs, the remaining Function-related APIs also need cleanups.
      
      ### How was this patch tested?
      Existing test cases.
      
      Author: Xiao Li <gatorsmile@gmail.com>
      
      Closes #17615 from gatorsmile/cleanupCreateTempFunction.
      504e62e2
Loading