From 968c0187a12f5ae4a696c02c1ff088e998ed7edd Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan <mridulm80@apache.org> Date: Thu, 24 Apr 2014 20:48:33 -0700 Subject: [PATCH] SPARK-1586 Windows build fixes Unfortunately, this is not exhaustive - particularly hive tests still fail due to path issues. Author: Mridul Muralidharan <mridulm80@apache.org> This patch had conflicts when merged, resolved by Committer: Matei Zaharia <matei@databricks.com> Closes #505 from mridulm/windows_fixes and squashes the following commits: ef12283 [Mridul Muralidharan] Move to org.apache.commons.lang3 for StringEscapeUtils. Earlier version was buggy appparently cdae406 [Mridul Muralidharan] Remove leaked changes from > 2G fix branch 3267f4b [Mridul Muralidharan] Fix build failures 35b277a [Mridul Muralidharan] Fix Scalastyle failures bc69d14 [Mridul Muralidharan] Change from hardcoded path separator 10c4d78 [Mridul Muralidharan] Use explicit encoding while using getBytes 1337abd [Mridul Muralidharan] fix classpath while running in windows --- bin/compute-classpath.cmd | 157 ++++++++++-------- .../org/apache/spark/SparkSaslClient.scala | 6 +- .../org/apache/spark/SparkSaslServer.scala | 8 +- .../apache/spark/api/python/PythonRDD.scala | 2 +- .../apache/spark/network/ReceiverTest.scala | 2 +- .../org/apache/spark/network/SenderTest.scala | 2 +- .../scala/org/apache/spark/rdd/PipedRDD.scala | 8 +- .../scala/org/apache/spark/util/Utils.scala | 24 ++- .../java/org/apache/spark/JavaAPISuite.java | 4 +- .../streaming/examples/MQTTWordCount.scala | 2 +- .../streaming/flume/FlumeStreamSuite.scala | 2 +- .../streaming/mqtt/MQTTInputDStream.scala | 2 +- .../org/apache/spark/repl/ReplSuite.scala | 6 +- .../spark/sql/columnar/ColumnType.scala | 6 +- .../spark/sql/columnar/ColumnTypeSuite.scala | 44 ++++- .../spark/sql/hive/ScriptTransformation.scala | 2 +- .../org/apache/spark/sql/hive/TestHive.scala | 9 +- .../execution/BigDataBenchmarkSuite.scala | 2 +- .../hive/execution/HiveComparisonTest.scala | 5 +- .../execution/HiveCompatibilitySuite.scala | 6 +- .../deploy/yarn/YarnSparkHadoopUtil.scala | 2 +- 21 files changed, 185 insertions(+), 116 deletions(-) diff --git a/bin/compute-classpath.cmd b/bin/compute-classpath.cmd index 4f60bff19c..065553eb31 100644 --- a/bin/compute-classpath.cmd +++ b/bin/compute-classpath.cmd @@ -1,69 +1,88 @@ -@echo off - -rem -rem Licensed to the Apache Software Foundation (ASF) under one or more -rem contributor license agreements. See the NOTICE file distributed with -rem this work for additional information regarding copyright ownership. -rem The ASF licenses this file to You under the Apache License, Version 2.0 -rem (the "License"); you may not use this file except in compliance with -rem the License. You may obtain a copy of the License at -rem -rem http://www.apache.org/licenses/LICENSE-2.0 -rem -rem Unless required by applicable law or agreed to in writing, software -rem distributed under the License is distributed on an "AS IS" BASIS, -rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -rem See the License for the specific language governing permissions and -rem limitations under the License. -rem - -rem This script computes Spark's classpath and prints it to stdout; it's used by both the "run" -rem script and the ExecutorRunner in standalone cluster mode. - -set SCALA_VERSION=2.10 - -rem Figure out where the Spark framework is installed -set FWDIR=%~dp0..\ - -rem Load environment variables from conf\spark-env.cmd, if it exists -if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd" - -rem Build up classpath -set CLASSPATH=%FWDIR%conf -if exist "%FWDIR%RELEASE" ( - for %%d in ("%FWDIR%jars\spark-assembly*.jar") do ( - set ASSEMBLY_JAR=%%d - ) -) else ( - for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do ( - set ASSEMBLY_JAR=%%d - ) -) -set CLASSPATH=%CLASSPATH%;%ASSEMBLY_JAR% - -if "x%SPARK_TESTING%"=="x1" ( - rem Add test clases to path - set CLASSPATH=%CLASSPATH%;%FWDIR%core\target\scala-%SCALA_VERSION%\test-classes - set CLASSPATH=%CLASSPATH%;%FWDIR%repl\target\scala-%SCALA_VERSION%\test-classes - set CLASSPATH=%CLASSPATH%;%FWDIR%mllib\target\scala-%SCALA_VERSION%\test-classes - set CLASSPATH=%CLASSPATH%;%FWDIR%bagel\target\scala-%SCALA_VERSION%\test-classes - set CLASSPATH=%CLASSPATH%;%FWDIR%streaming\target\scala-%SCALA_VERSION%\test-classes -) - -rem Add hadoop conf dir - else FileSystem.*, etc fail -rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts -rem the configurtion files. -if "x%HADOOP_CONF_DIR%"=="x" goto no_hadoop_conf_dir - set CLASSPATH=%CLASSPATH%;%HADOOP_CONF_DIR% -:no_hadoop_conf_dir - -if "x%YARN_CONF_DIR%"=="x" goto no_yarn_conf_dir - set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR% -:no_yarn_conf_dir - -rem A bit of a hack to allow calling this script within run2.cmd without seeing output -if "%DONT_PRINT_CLASSPATH%"=="1" goto exit - -echo %CLASSPATH% - -:exit +@echo off + +rem +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. +rem + +rem This script computes Spark's classpath and prints it to stdout; it's used by both the "run" +rem script and the ExecutorRunner in standalone cluster mode. + +set SCALA_VERSION=2.10 + +rem Figure out where the Spark framework is installed +set FWDIR=%~dp0..\ + +rem Load environment variables from conf\spark-env.cmd, if it exists +if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd" + +rem Build up classpath +set CLASSPATH=%FWDIR%conf +if exist "%FWDIR%RELEASE" ( + for %%d in ("%FWDIR%jars\spark-assembly*.jar") do ( + set ASSEMBLY_JAR=%%d + ) +) else ( + for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do ( + set ASSEMBLY_JAR=%%d + ) +) + +set CLASSPATH=%CLASSPATH%;%ASSEMBLY_JAR% + +set SPARK_CLASSES=%FWDIR%core\target\scala-%SCALA_VERSION%\classes +set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%repl\target\scala-%SCALA_VERSION%\classes +set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%mllib\target\scala-%SCALA_VERSION%\classes +set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%bagel\target\scala-%SCALA_VERSION%\classes +set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%graphx\target\scala-%SCALA_VERSION%\classes +set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%streaming\target\scala-%SCALA_VERSION%\classes +set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%tools\target\scala-%SCALA_VERSION%\classes +set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%sql\catalyst\target\scala-%SCALA_VERSION%\classes +set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%sql\core\target\scala-%SCALA_VERSION%\classes +set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%sql\hive\target\scala-%SCALA_VERSION%\classes + +set SPARK_TEST_CLASSES=%FWDIR%core\target\scala-%SCALA_VERSION%\test-classes +set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%repl\target\scala-%SCALA_VERSION%\test-classes +set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%mllib\target\scala-%SCALA_VERSION%\test-classes +set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%bagel\target\scala-%SCALA_VERSION%\test-classes +set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%graphx\target\scala-%SCALA_VERSION%\test-classes +set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%streaming\target\scala-%SCALA_VERSION%\test-classes +set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%sql\catalyst\target\scala-%SCALA_VERSION%\test-classes +set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%sql\core\target\scala-%SCALA_VERSION%\test-classes +set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%sql\hive\target\scala-%SCALA_VERSION%\test-classes + +if "x%SPARK_TESTING%"=="x1" ( + rem Add test clases to path - note, add SPARK_CLASSES and SPARK_TEST_CLASSES before CLASSPATH + rem so that local compilation takes precedence over assembled jar + set CLASSPATH=%SPARK_CLASSES%;%SPARK_TEST_CLASSES%;%CLASSPATH% +) + +rem Add hadoop conf dir - else FileSystem.*, etc fail +rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts +rem the configurtion files. +if "x%HADOOP_CONF_DIR%"=="x" goto no_hadoop_conf_dir + set CLASSPATH=%CLASSPATH%;%HADOOP_CONF_DIR% +:no_hadoop_conf_dir + +if "x%YARN_CONF_DIR%"=="x" goto no_yarn_conf_dir + set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR% +:no_yarn_conf_dir + +rem A bit of a hack to allow calling this script within run2.cmd without seeing output +if "%DONT_PRINT_CLASSPATH%"=="1" goto exit + +echo %CLASSPATH% + +:exit diff --git a/core/src/main/scala/org/apache/spark/SparkSaslClient.scala b/core/src/main/scala/org/apache/spark/SparkSaslClient.scala index 5b14c4291d..65003b6ac6 100644 --- a/core/src/main/scala/org/apache/spark/SparkSaslClient.scala +++ b/core/src/main/scala/org/apache/spark/SparkSaslClient.scala @@ -111,10 +111,10 @@ private[spark] class SparkSaslClient(securityMgr: SecurityManager) extends Logg CallbackHandler { private val userName: String = - SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes()) + SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes("utf-8")) private val secretKey = securityMgr.getSecretKey() - private val userPassword: Array[Char] = - SparkSaslServer.encodePassword(if (secretKey != null) secretKey.getBytes() else "".getBytes()) + private val userPassword: Array[Char] = SparkSaslServer.encodePassword( + if (secretKey != null) secretKey.getBytes("utf-8") else "".getBytes("utf-8")) /** * Implementation used to respond to SASL request from the server. diff --git a/core/src/main/scala/org/apache/spark/SparkSaslServer.scala b/core/src/main/scala/org/apache/spark/SparkSaslServer.scala index 6161a6fb7a..f6b0a9132a 100644 --- a/core/src/main/scala/org/apache/spark/SparkSaslServer.scala +++ b/core/src/main/scala/org/apache/spark/SparkSaslServer.scala @@ -89,7 +89,7 @@ private[spark] class SparkSaslServer(securityMgr: SecurityManager) extends Loggi extends CallbackHandler { private val userName: String = - SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes()) + SparkSaslServer.encodeIdentifier(securityMgr.getSaslUser().getBytes("utf-8")) override def handle(callbacks: Array[Callback]) { logDebug("In the sasl server callback handler") @@ -101,7 +101,7 @@ private[spark] class SparkSaslServer(securityMgr: SecurityManager) extends Loggi case pc: PasswordCallback => { logDebug("handle: SASL server callback: setting userPassword") val password: Array[Char] = - SparkSaslServer.encodePassword(securityMgr.getSecretKey().getBytes()) + SparkSaslServer.encodePassword(securityMgr.getSecretKey().getBytes("utf-8")) pc.setPassword(password) } case rc: RealmCallback => { @@ -159,7 +159,7 @@ private[spark] object SparkSaslServer { * @return Base64-encoded string */ def encodeIdentifier(identifier: Array[Byte]): String = { - new String(Base64.encodeBase64(identifier)) + new String(Base64.encodeBase64(identifier), "utf-8") } /** @@ -168,7 +168,7 @@ private[spark] object SparkSaslServer { * @return password as a char array. */ def encodePassword(password: Array[Byte]): Array[Char] = { - new String(Base64.encodeBase64(password)).toCharArray() + new String(Base64.encodeBase64(password), "utf-8").toCharArray() } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 1498b017a7..672c344a56 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -187,7 +187,7 @@ private[spark] class PythonRDD[T: ClassTag]( val exLength = stream.readInt() val obj = new Array[Byte](exLength) stream.readFully(obj) - throw new PythonException(new String(obj), readerException) + throw new PythonException(new String(obj, "utf-8"), readerException) case SpecialLengths.END_OF_DATA_SECTION => // We've finished the data section of the output, but we can still // read some accumulator updates: diff --git a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala index 9dc51e0d40..53a6038a9b 100644 --- a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala +++ b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala @@ -28,7 +28,7 @@ private[spark] object ReceiverTest { manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { /* println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis) */ - val buffer = ByteBuffer.wrap("response".getBytes) + val buffer = ByteBuffer.wrap("response".getBytes("utf-8")) Some(Message.createBufferMessage(buffer, msg.id)) }) Thread.currentThread.join() diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala index 14c094c617..b8ea7c2cff 100644 --- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala +++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala @@ -54,7 +54,7 @@ private[spark] object SenderTest { val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage) .map { response => val buffer = response.asInstanceOf[BufferMessage].buffers(0) - new String(buffer.array) + new String(buffer.array, "utf-8") }.getOrElse("none") val finishTime = System.currentTimeMillis diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index e441d4a40c..5d77d37378 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -87,10 +87,10 @@ private[spark] class PipedRDD[T: ClassTag]( // When spark.worker.separated.working.directory option is turned on, each // task will be run in separate directory. This should be resolve file // access conflict issue - val taskDirectory = "./tasks/" + java.util.UUID.randomUUID.toString + val taskDirectory = "tasks" + File.separator + java.util.UUID.randomUUID.toString var workInTaskDirectory = false logDebug("taskDirectory = " + taskDirectory) - if (separateWorkingDir == true) { + if (separateWorkingDir) { val currentDir = new File(".") logDebug("currentDir = " + currentDir.getAbsolutePath()) val taskDirFile = new File(taskDirectory) @@ -106,13 +106,13 @@ private[spark] class PipedRDD[T: ClassTag]( for (file <- currentDir.list(tasksDirFilter)) { val fileWithDir = new File(currentDir, file) Utils.symlink(new File(fileWithDir.getAbsolutePath()), - new File(taskDirectory + "/" + fileWithDir.getName())) + new File(taskDirectory + File.separator + fileWithDir.getName())) } pb.directory(taskDirFile) workInTaskDirectory = true } catch { case e: Exception => logError("Unable to setup task working directory: " + e.getMessage + - " (" + taskDirectory + ")") + " (" + taskDirectory + ")", e) } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 084a71c4ca..8351f7156a 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -30,6 +30,7 @@ import scala.io.Source import scala.reflect.ClassTag import com.google.common.io.Files +import org.apache.commons.lang.SystemUtils import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.json4s._ @@ -45,10 +46,13 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream, */ private[spark] object Utils extends Logging { - val osName = System.getProperty("os.name") - val random = new Random() + def sparkBin(sparkHome: String, which: String): File = { + val suffix = if (SystemUtils.IS_OS_WINDOWS) ".cmd" else "" + new File(sparkHome + File.separator + "bin", which + suffix) + } + /** Serialize an object using Java serialization */ def serialize[T](o: T): Array[Byte] = { val bos = new ByteArrayOutputStream() @@ -605,7 +609,7 @@ private[spark] object Utils extends Logging { */ def isSymlink(file: File): Boolean = { if (file == null) throw new NullPointerException("File must not be null") - if (osName.startsWith("Windows")) return false + if (SystemUtils.IS_OS_WINDOWS) return false val fileInCanonicalDir = if (file.getParent() == null) { file } else { @@ -1008,10 +1012,18 @@ private[spark] object Utils extends Logging { if (dst.isAbsolute()) { throw new IOException("Destination must be relative") } - val linkCmd = if (osName.startsWith("Windows")) "copy" else "ln -sf" + var cmdSuffix = "" + val linkCmd = if (SystemUtils.IS_OS_WINDOWS) { + // refer to http://technet.microsoft.com/en-us/library/cc771254.aspx + cmdSuffix = " /s /e /k /h /y /i" + "cmd /c xcopy " + } else { + cmdSuffix = "" + "ln -sf " + } import scala.sys.process._ - (linkCmd + " " + src.getAbsolutePath() + " " + dst.getPath()) lines_! ProcessLogger(line => - (logInfo(line))) + (linkCmd + src.getAbsolutePath() + " " + dst.getPath() + cmdSuffix) lines_! + ProcessLogger(line => (logInfo(line))) } diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 8d2e9f1846..76c6f5af82 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -610,8 +610,8 @@ public class JavaAPISuite implements Serializable { @Test public void wholeTextFiles() throws IOException { - byte[] content1 = "spark is easy to use.\n".getBytes(); - byte[] content2 = "spark is also easy to use.\n".getBytes(); + byte[] content1 = "spark is easy to use.\n".getBytes("utf-8"); + byte[] content2 = "spark is also easy to use.\n".getBytes("utf-8"); File tempDir = Files.createTempDir(); String tempDirName = tempDir.getAbsolutePath(); diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index 3d3c27ed78..62aef0fb47 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -56,7 +56,7 @@ object MQTTPublisher { val msg: String = "hello mqtt demo for spark streaming" while (true) { - val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes()) + val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes("utf-8")) msgtopic.publish(message) println("Published data. topic: " + msgtopic.getName() + " Message: " + message) } diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 78603200d2..dd287d0ef9 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -57,7 +57,7 @@ class FlumeStreamSuite extends TestSuiteBase { for (i <- 0 until input.size) { val event = new AvroFlumeEvent - event.setBody(ByteBuffer.wrap(input(i).toString.getBytes())) + event.setBody(ByteBuffer.wrap(input(i).toString.getBytes("utf-8"))) event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header")) client.append(event) Thread.sleep(500) diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index 0beee8b415..77661f71ad 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -92,7 +92,7 @@ class MQTTReceiver( // Handles Mqtt message override def messageArrived(arg0: String, arg1: MqttMessage) { - store(new String(arg1.getPayload())) + store(new String(arg1.getPayload(),"utf-8")) } override def deliveryComplete(arg0: IMqttDeliveryToken) { diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 4155007c6d..e33f4f9803 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer import com.google.common.io.Files import org.scalatest.FunSuite import org.apache.spark.SparkContext +import org.apache.commons.lang3.StringEscapeUtils class ReplSuite extends FunSuite { @@ -185,11 +186,12 @@ class ReplSuite extends FunSuite { out.close() val output = runInterpreter("local", """ - |var file = sc.textFile("%s/input").cache() + |var file = sc.textFile("%s").cache() |file.count() |file.count() |file.count() - """.stripMargin.format(tempDir.getAbsolutePath)) + """.stripMargin.format(StringEscapeUtils.escapeJava( + tempDir.getAbsolutePath + File.separator + "input"))) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) assertContains("res0: Long = 3", output) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala index 5be76890af..4cd52d8288 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala @@ -200,10 +200,10 @@ private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) { } private[sql] object STRING extends NativeColumnType(StringType, 7, 8) { - override def actualSize(v: String): Int = v.getBytes.length + 4 + override def actualSize(v: String): Int = v.getBytes("utf-8").length + 4 override def append(v: String, buffer: ByteBuffer) { - val stringBytes = v.getBytes() + val stringBytes = v.getBytes("utf-8") buffer.putInt(stringBytes.length).put(stringBytes, 0, stringBytes.length) } @@ -211,7 +211,7 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) { val length = buffer.getInt() val stringBytes = new Array[Byte](length) buffer.get(stringBytes, 0, length) - new String(stringBytes) + new String(stringBytes, "utf-8") } override def setField(row: MutableRow, ordinal: Int, value: String) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala index 1d3608ed2d..325173cf95 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala @@ -58,7 +58,7 @@ class ColumnTypeSuite extends FunSuite { checkActualSize(DOUBLE, Double.MaxValue, 8) checkActualSize(FLOAT, Float.MaxValue, 4) checkActualSize(BOOLEAN, true, 1) - checkActualSize(STRING, "hello", 4 + 5) + checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length) val binary = Array.fill[Byte](4)(0: Byte) checkActualSize(BINARY, binary, 4 + 4) @@ -91,14 +91,16 @@ class ColumnTypeSuite extends FunSuite { testNativeColumnType[StringType.type]( STRING, (buffer: ByteBuffer, string: String) => { - val bytes = string.getBytes() - buffer.putInt(bytes.length).put(string.getBytes) + + val bytes = string.getBytes("utf-8") + buffer.putInt(bytes.length) + buffer.put(bytes) }, (buffer: ByteBuffer) => { val length = buffer.getInt() val bytes = new Array[Byte](length) - buffer.get(bytes, 0, length) - new String(bytes) + buffer.get(bytes) + new String(bytes, "utf-8") }) testColumnType[BinaryType.type, Array[Byte]]( @@ -161,9 +163,13 @@ class ColumnTypeSuite extends FunSuite { buffer.rewind() seq.foreach { expected => + println("buffer = " + buffer + ", expected = " + expected) + val extracted = columnType.extract(buffer) assert( - expected === columnType.extract(buffer), - "Extracted value didn't equal to the original one") + expected === extracted, + "Extracted value didn't equal to the original one. " + + hexDump(expected) + " != " + hexDump(extracted) + + ", buffer = " + dumpBuffer(buffer.duplicate().rewind().asInstanceOf[ByteBuffer])) } } @@ -179,4 +185,28 @@ class ColumnTypeSuite extends FunSuite { } } } + + private def hexDump(value: Any): String = { + if (value.isInstanceOf[String]) { + val sb = new StringBuilder() + for (ch <- value.asInstanceOf[String].toCharArray) { + sb.append(Integer.toHexString(ch & 0xffff)).append(' ') + } + if (! sb.isEmpty) sb.setLength(sb.length - 1) + sb.toString() + } else { + // for now .. + hexDump(value.toString) + } + } + + private def dumpBuffer(buff: ByteBuffer): Any = { + val sb = new StringBuilder() + while (buff.hasRemaining) { + val b = buff.get() + sb.append(Integer.toHexString(b & 0xff)).append(' ') + } + if (! sb.isEmpty) sb.setLength(sb.length - 1) + sb.toString() + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala index 610fa9cb84..8258ee5fef 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ScriptTransformation.scala @@ -71,7 +71,7 @@ case class ScriptTransformation( iter .map(outputProjection) // TODO: Use SerDe - .map(_.mkString("", "\t", "\n").getBytes).foreach(outputStream.write) + .map(_.mkString("", "\t", "\n").getBytes("utf-8")).foreach(outputStream.write) outputStream.close() readerThread.join() outputLines.toIterator diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 74110ee27b..3ad66a3d7f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -100,14 +100,15 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { hiveFilesTemp.delete() hiveFilesTemp.mkdir() - val inRepoTests = if (System.getProperty("user.dir").endsWith("sql/hive")) { - new File("src/test/resources/") + val inRepoTests = if (System.getProperty("user.dir").endsWith("sql" + File.separator + "hive")) { + new File("src" + File.separator + "test" + File.separator + "resources" + File.separator) } else { - new File("sql/hive/src/test/resources") + new File("sql" + File.separator + "hive" + File.separator + "src" + File.separator + "test" + + File.separator + "resources") } def getHiveFile(path: String): File = { - val stripped = path.replaceAll("""\.\.\/""", "") + val stripped = path.replaceAll("""\.\.\/""", "").replace('/', File.separatorChar) hiveDevHome .map(new File(_, stripped)) .filter(_.exists) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala index 9b9a823b6e..42a82c1fbf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.hive.test.TestHive._ * https://amplab.cs.berkeley.edu/benchmark/ */ class BigDataBenchmarkSuite extends HiveComparisonTest { - val testDataDirectory = new File("target/big-data-benchmark-testdata") + val testDataDirectory = new File("target" + File.separator + "big-data-benchmark-testdata") val testTables = Seq( TestTable( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index ea17e6e93b..edff38b901 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -78,7 +78,8 @@ abstract class HiveComparisonTest .map(name => new File(targetDir, s"$suiteName.$name")) /** The local directory with cached golden answer will be stored. */ - protected val answerCache = new File("src/test/resources/golden") + protected val answerCache = new File("src" + File.separator + "test" + + File.separator + "resources" + File.separator + "golden") if (!answerCache.exists) { answerCache.mkdir() } @@ -120,7 +121,7 @@ abstract class HiveComparisonTest protected val cacheDigest = java.security.MessageDigest.getInstance("MD5") protected def getMd5(str: String): String = { val digest = java.security.MessageDigest.getInstance("MD5") - digest.update(str.getBytes) + digest.update(str.getBytes("utf-8")) new java.math.BigInteger(1, digest.digest).toString(16) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index dfe88b960b..0bb76f31c3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive.execution +import java.io.File + import org.scalatest.BeforeAndAfter import org.apache.spark.sql.hive.test.TestHive @@ -26,7 +28,9 @@ import org.apache.spark.sql.hive.test.TestHive */ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // TODO: bundle in jar files... get from classpath - lazy val hiveQueryDir = TestHive.getHiveFile("ql/src/test/queries/clientpositive") + lazy val hiveQueryDir = TestHive.getHiveFile("ql" + File.separator + "src" + + File.separator + "test" + File.separator + "queries" + File.separator + "clientpositive") + def testCases = hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f) override def beforeAll() { diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 832d45b3ad..718cb19f57 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -64,7 +64,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { override def addSecretKeyToUserCredentials(key: String, secret: String) { val creds = new Credentials() - creds.addSecretKey(new Text(key), secret.getBytes()) + creds.addSecretKey(new Text(key), secret.getBytes("utf-8")) addCurrentUserCredentials(creds) } -- GitLab