Skip to content
Snippets Groups Projects
Commit b9c5e5d4 authored by navis.ryu's avatar navis.ryu Committed by Reynold Xin
Browse files

[SPARK-11124] JsonParser/Generator should be closed for resource recycle

Some json parsers are not closed. parser in JacksonParser#parseJson, for example.

Author: navis.ryu <navis@apache.org>

Closes #9130 from navis/SPARK-11124.
parent 4ee2cea2
No related branches found
No related tags found
No related merge requests found
...@@ -2153,6 +2153,10 @@ private[spark] object Utils extends Logging { ...@@ -2153,6 +2153,10 @@ private[spark] object Utils extends Logging {
conf.getInt("spark.executor.instances", 0) == 0 conf.getInt("spark.executor.instances", 0) == 0
} }
def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = {
val resource = createResource
try f.apply(resource) finally resource.close()
}
} }
/** /**
......
...@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult ...@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.types.{StructField, StructType, StringType, DataType} import org.apache.spark.sql.types.{StructField, StructType, StringType, DataType}
import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
import scala.util.parsing.combinator.RegexParsers import scala.util.parsing.combinator.RegexParsers
...@@ -134,16 +135,18 @@ case class GetJsonObject(json: Expression, path: Expression) ...@@ -134,16 +135,18 @@ case class GetJsonObject(json: Expression, path: Expression)
if (parsed.isDefined) { if (parsed.isDefined) {
try { try {
val parser = jsonFactory.createParser(jsonStr.getBytes) Utils.tryWithResource(jsonFactory.createParser(jsonStr.getBytes)) { parser =>
val output = new ByteArrayOutputStream() val output = new ByteArrayOutputStream()
val generator = jsonFactory.createGenerator(output, JsonEncoding.UTF8) val matched = Utils.tryWithResource(
parser.nextToken() jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { generator =>
val matched = evaluatePath(parser, generator, RawStyle, parsed.get) parser.nextToken()
generator.close() evaluatePath(parser, generator, RawStyle, parsed.get)
if (matched) { }
UTF8String.fromBytes(output.toByteArray) if (matched) {
} else { UTF8String.fromBytes(output.toByteArray)
null } else {
null
}
} }
} catch { } catch {
case _: JsonProcessingException => null case _: JsonProcessingException => null
...@@ -250,17 +253,18 @@ case class GetJsonObject(json: Expression, path: Expression) ...@@ -250,17 +253,18 @@ case class GetJsonObject(json: Expression, path: Expression)
// temporarily buffer child matches, the emitted json will need to be // temporarily buffer child matches, the emitted json will need to be
// modified slightly if there is only a single element written // modified slightly if there is only a single element written
val buffer = new StringWriter() val buffer = new StringWriter()
val flattenGenerator = jsonFactory.createGenerator(buffer)
flattenGenerator.writeStartArray()
var dirty = 0 var dirty = 0
while (p.nextToken() != END_ARRAY) { Utils.tryWithResource(jsonFactory.createGenerator(buffer)) { flattenGenerator =>
// track the number of array elements and only emit an outer array if flattenGenerator.writeStartArray()
// we've written more than one element, this matches Hive's behavior
dirty += (if (evaluatePath(p, flattenGenerator, nextStyle, xs)) 1 else 0) while (p.nextToken() != END_ARRAY) {
// track the number of array elements and only emit an outer array if
// we've written more than one element, this matches Hive's behavior
dirty += (if (evaluatePath(p, flattenGenerator, nextStyle, xs)) 1 else 0)
}
flattenGenerator.writeEndArray()
} }
flattenGenerator.writeEndArray()
flattenGenerator.close()
val buf = buffer.getBuffer val buf = buffer.getBuffer
if (dirty > 1) { if (dirty > 1) {
...@@ -370,12 +374,8 @@ case class JsonTuple(children: Seq[Expression]) ...@@ -370,12 +374,8 @@ case class JsonTuple(children: Seq[Expression])
} }
try { try {
val parser = jsonFactory.createParser(json.getBytes) Utils.tryWithResource(jsonFactory.createParser(json.getBytes)) {
parser => parseRow(parser, input)
try {
parseRow(parser, input)
} finally {
parser.close()
} }
} catch { } catch {
case _: JsonProcessingException => case _: JsonProcessingException =>
...@@ -420,12 +420,8 @@ case class JsonTuple(children: Seq[Expression]) ...@@ -420,12 +420,8 @@ case class JsonTuple(children: Seq[Expression])
// write the output directly to UTF8 encoded byte array // write the output directly to UTF8 encoded byte array
if (parser.nextToken() != JsonToken.VALUE_NULL) { if (parser.nextToken() != JsonToken.VALUE_NULL) {
val generator = jsonFactory.createGenerator(output, JsonEncoding.UTF8) Utils.tryWithResource(jsonFactory.createGenerator(output, JsonEncoding.UTF8)) {
generator => copyCurrentStructure(generator, parser)
try {
copyCurrentStructure(generator, parser)
} finally {
generator.close()
} }
row(idx) = UTF8String.fromBytes(output.toByteArray) row(idx) = UTF8String.fromBytes(output.toByteArray)
......
...@@ -23,6 +23,7 @@ import org.apache.spark.rdd.RDD ...@@ -23,6 +23,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
private[sql] object InferSchema { private[sql] object InferSchema {
/** /**
...@@ -47,9 +48,10 @@ private[sql] object InferSchema { ...@@ -47,9 +48,10 @@ private[sql] object InferSchema {
val factory = new JsonFactory() val factory = new JsonFactory()
iter.map { row => iter.map { row =>
try { try {
val parser = factory.createParser(row) Utils.tryWithResource(factory.createParser(row)) { parser =>
parser.nextToken() parser.nextToken()
inferField(parser) inferField(parser)
}
} catch { } catch {
case _: JsonParseException => case _: JsonParseException =>
StructType(Seq(StructField(columnNameOfCorruptRecords, StringType))) StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))
......
...@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils ...@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
private[sql] object JacksonParser { private[sql] object JacksonParser {
def apply( def apply(
...@@ -86,9 +87,9 @@ private[sql] object JacksonParser { ...@@ -86,9 +87,9 @@ private[sql] object JacksonParser {
case (_, StringType) => case (_, StringType) =>
val writer = new ByteArrayOutputStream() val writer = new ByteArrayOutputStream()
val generator = factory.createGenerator(writer, JsonEncoding.UTF8) Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) {
generator.copyCurrentStructure(parser) generator => generator.copyCurrentStructure(parser)
generator.close() }
UTF8String.fromBytes(writer.toByteArray) UTF8String.fromBytes(writer.toByteArray)
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, FloatType) => case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, FloatType) =>
...@@ -245,22 +246,24 @@ private[sql] object JacksonParser { ...@@ -245,22 +246,24 @@ private[sql] object JacksonParser {
iter.flatMap { record => iter.flatMap { record =>
try { try {
val parser = factory.createParser(record) Utils.tryWithResource(factory.createParser(record)) { parser =>
parser.nextToken() parser.nextToken()
convertField(factory, parser, schema) match { convertField(factory, parser, schema) match {
case null => failedRecord(record) case null => failedRecord(record)
case row: InternalRow => row :: Nil case row: InternalRow => row :: Nil
case array: ArrayData => case array: ArrayData =>
if (array.numElements() == 0) { if (array.numElements() == 0) {
Nil Nil
} else { } else {
array.toArray[InternalRow](schema) array.toArray[InternalRow](schema)
} }
case _ => case _ =>
sys.error( sys.error(
s"Failed to parse record $record. Please make sure that each line of the file " + s"Failed to parse record $record. Please make sure that each line of the file " +
"(or each string in the RDD) is a valid JSON object or an array of JSON objects.") "(or each string in the RDD) is a valid JSON object or " +
"an array of JSON objects.")
}
} }
} catch { } catch {
case _: JsonProcessingException => case _: JsonProcessingException =>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment