diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 5b5fa8c93ec5269b9635a220526cd8700e61484f..78a4acdf4b1bfc2cbcf2089f09a5e8b51d0d74c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -60,7 +60,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Scanning partitioned HadoopFsRelation case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) if t.partitionSpec.partitionColumns.nonEmpty => - t.refresh() val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray logInfo { @@ -88,7 +87,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Scanning non-partitioned HadoopFsRelation case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) => - t.refresh() // See buildPartitionedTableScan for the reason that we need to create a shard // broadcast HadoopConf. val sharedHadoopConf = SparkHadoopUtil.get.conf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index b34a272ec547fe0b7e4aa184c4e0f0fe340f26a9..5bb9e62310a50680f1b8574cbf7a7c0c16b6d441 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -22,20 +22,22 @@ import java.io.CharArrayWriter import com.fasterxml.jackson.core.JsonFactory import com.google.common.base.Objects import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.io.{Text, LongWritable, NullWritable} +import org.apache.hadoop.io.{LongWritable, NullWritable, Text} import org.apache.hadoop.mapred.{JobConf, TextInputFormat} -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat -import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext, Job} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat +import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} + import org.apache.spark.Logging +import org.apache.spark.broadcast.Broadcast import org.apache.spark.mapred.SparkHadoopMapRedUtil - import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.PartitionSpec import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{AnalysisException, Row, SQLContext} +import org.apache.spark.util.SerializableConfiguration private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { @@ -108,6 +110,15 @@ private[sql] class JSONRelation( jsonSchema } + override private[sql] def buildScan( + requiredColumns: Array[String], + filters: Array[Filter], + inputPaths: Array[String], + broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = { + refresh() + super.buildScan(requiredColumns, filters, inputPaths, broadcastedConf) + } + override def buildScan( requiredColumns: Array[String], filters: Array[Filter], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 4aafec0e2df271328ae59f932912562dbe4a1749..6bcabbab4f77b9c72286473f4ac4ef5c8f36825d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -555,7 +555,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio }) } - private[sql] final def buildScan( + private[sql] def buildScan( requiredColumns: Array[String], filters: Array[Filter], inputPaths: Array[String], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 39d18d712ef8cca09e217447ac0ea3034e12a616..cdbfaf6455fe4bc4fc1285219983219f003a1c32 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -32,9 +32,9 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll { var path: File = null - override def beforeAll: Unit = { + override def beforeAll(): Unit = { path = Utils.createTempDir() - val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""")) caseInsensitiveContext.read.json(rdd).registerTempTable("jt") sql( s""" @@ -46,7 +46,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll { """.stripMargin) } - override def afterAll: Unit = { + override def afterAll(): Unit = { caseInsensitiveContext.dropTempTable("jsonTable") caseInsensitiveContext.dropTempTable("jt") Utils.deleteRecursively(path) @@ -110,7 +110,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll { ) // Writing the table to less part files. - val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""), 5) + val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 5) caseInsensitiveContext.read.json(rdd1).registerTempTable("jt1") sql( s""" @@ -122,7 +122,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll { ) // Writing the table to more part files. - val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""), 10) + val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 10) caseInsensitiveContext.read.json(rdd2).registerTempTable("jt2") sql( s"""