Skip to content
Snippets Groups Projects
Commit 28538596 authored by Tathagata Das's avatar Tathagata Das Committed by Shixiong Zhu
Browse files

[SPARK-14833][SQL][STREAMING][TEST] Refactor StreamTests to test for source...

[SPARK-14833][SQL][STREAMING][TEST] Refactor StreamTests to test for source fault-tolerance correctly.

## What changes were proposed in this pull request?

Current StreamTest allows testing of a streaming Dataset generated explicitly wraps a source. This is different from the actual production code path where the source object is dynamically created through a DataSource object every time a query is started. So all the fault-tolerance testing in FileSourceSuite and FileSourceStressSuite is not really testing the actual code path as they are just reusing the FileStreamSource object.

This PR fixes StreamTest and the FileSource***Suite to test this correctly. Instead of maintaining a mapping of source --> expected offset in StreamTest (which requires reuse of source object), it now maintains a mapping of source index --> offset, so that it is independent of the source object.

Summary of changes
- StreamTest refactored to keep track of offset by source index instead of source
- AddData, AddTextData and AddParquetData updated to find the FileStreamSource object from an active query, so that it can work with sources generated when query is started.
- Refactored unit tests in FileSource***Suite to test using DataFrame/Dataset generated with public, rather than reusing the same FileStreamSource. This correctly tests fault tolerance.

The refactoring changed a lot of indents in FileSourceSuite, so its recommended to hide whitespace changes with this - https://github.com/apache/spark/pull/12592/files?w=1

## How was this patch tested?

Refactored unit tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #12592 from tdas/SPARK-14833.
parent ba5e0b87
No related branches found
No related tags found
No related merge requests found
......@@ -67,12 +67,6 @@ import org.apache.spark.util.Utils
*/
trait StreamTest extends QueryTest with Timeouts {
implicit class RichSource(s: Source) {
def toDF(): DataFrame = Dataset.ofRows(sqlContext, StreamingExecutionRelation(s))
def toDS[A: Encoder](): Dataset[A] = Dataset(sqlContext, StreamingExecutionRelation(s))
}
/** How long to wait for an active stream to catch up when checking a result. */
val streamingTimeout = 10.seconds
......@@ -93,22 +87,21 @@ trait StreamTest extends QueryTest with Timeouts {
AddDataMemory(source, data)
}
/** A trait that can be extended when testing other sources. */
/** A trait that can be extended when testing a source. */
trait AddData extends StreamAction {
def source: Source
/**
* Called to trigger adding the data. Should return the offset that will denote when this
* new data has been processed.
* Called to adding the data to a source. It should find the source to add data to from
* the active query, and then return the source object the data was added, as well as the
* offset of added data.
*/
def addData(): Offset
def addData(query: Option[StreamExecution]): (Source, Offset)
}
case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends AddData {
override def toString: String = s"AddData to $source: ${data.mkString(",")}"
override def addData(): Offset = {
source.addData(data)
override def addData(query: Option[StreamExecution]): (Source, Offset) = {
(source, source.addData(data))
}
}
......@@ -199,7 +192,7 @@ trait StreamTest extends QueryTest with Timeouts {
var currentPlan: LogicalPlan = stream.logicalPlan
var currentStream: StreamExecution = null
var lastStream: StreamExecution = null
val awaiting = new mutable.HashMap[Source, Offset]()
val awaiting = new mutable.HashMap[Int, Offset]() // source index -> offset to wait for
val sink = new MemorySink(stream.schema)
@volatile
......@@ -372,15 +365,53 @@ trait StreamTest extends QueryTest with Timeouts {
verify({ a.run(); true }, s"Assert failed: ${a.message}")
case a: AddData =>
awaiting.put(a.source, a.addData())
try {
// Add data and get the source where it was added, and the expected offset of the
// added data.
val queryToUse = Option(currentStream).orElse(Option(lastStream))
val (source, offset) = a.addData(queryToUse)
def findSourceIndex(plan: LogicalPlan): Option[Int] = {
plan
.collect { case StreamingExecutionRelation(s, _) => s }
.zipWithIndex
.find(_._1 == source)
.map(_._2)
}
// Try to find the index of the source to which data was added. Either get the index
// from the current active query or the original input logical plan.
val sourceIndex =
queryToUse.flatMap { query =>
findSourceIndex(query.logicalPlan)
}.orElse {
findSourceIndex(stream.logicalPlan)
}.getOrElse {
throw new IllegalArgumentException(
"Could find index of the source to which data was added")
}
// Store the expected offset of added data to wait for it later
awaiting.put(sourceIndex, offset)
} catch {
case NonFatal(e) =>
failTest("Error adding data", e)
}
case CheckAnswerRows(expectedAnswer, lastOnly) =>
verify(currentStream != null, "stream not running")
// Block until all data added has been processed
awaiting.foreach { case (source, offset) =>
// Get the map of source index to the current source objects
val indexToSource = currentStream
.logicalPlan
.collect { case StreamingExecutionRelation(s, _) => s }
.zipWithIndex
.map(_.swap)
.toMap
// Block until all data added has been processed for all the source
awaiting.foreach { case (sourceIndex, offset) =>
failAfter(streamingTimeout) {
currentStream.awaitOffset(source, offset)
currentStream.awaitOffset(indexToSource(sourceIndex), offset)
}
}
......
......@@ -30,38 +30,59 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
import testImplicits._
case class AddTextFileData(source: FileStreamSource, content: String, src: File, tmp: File)
extends AddData {
override def addData(): Offset = {
source.withBatchingLocked {
val file = Utils.tempFileWith(new File(tmp, "text"))
stringToFile(file, content).renameTo(new File(src, file.getName))
source.currentOffset
} + 1
/**
* A subclass [[AddData]] for adding data to files. This is meant to use the
* [[FileStreamSource]] actually being used in the execution.
*/
abstract class AddFileData extends AddData {
override def addData(query: Option[StreamExecution]): (Source, Offset) = {
require(
query.nonEmpty,
"Cannot add data when there is no query for finding the active file stream source")
val sources = query.get.logicalPlan.collect {
case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] =>
source.asInstanceOf[FileStreamSource]
}
if (sources.isEmpty) {
throw new Exception(
"Could not find file source in the StreamExecution logical plan to add data to")
} else if (sources.size > 1) {
throw new Exception(
"Could not select the file source in the StreamExecution logical plan as there" +
"are multiple file sources:\n\t" + sources.mkString("\n\t"))
}
val source = sources.head
val newOffset = source.withBatchingLocked {
addData(source)
source.currentOffset + 1
}
logInfo(s"Added data to $source at offset $newOffset")
(source, newOffset)
}
protected def addData(source: FileStreamSource): Unit
}
case class AddParquetFileData(
source: FileStreamSource,
df: DataFrame,
src: File,
tmp: File) extends AddData {
override def addData(): Offset = {
source.withBatchingLocked {
AddParquetFileData.writeToFile(df, src, tmp)
source.currentOffset
} + 1
case class AddTextFileData(content: String, src: File, tmp: File)
extends AddFileData {
override def addData(source: FileStreamSource): Unit = {
val file = Utils.tempFileWith(new File(tmp, "text"))
stringToFile(file, content).renameTo(new File(src, file.getName))
}
}
case class AddParquetFileData(data: DataFrame, src: File, tmp: File) extends AddFileData {
override def addData(source: FileStreamSource): Unit = {
AddParquetFileData.writeToFile(data, src, tmp)
}
}
object AddParquetFileData {
def apply(
source: FileStreamSource,
seq: Seq[String],
src: File,
tmp: File): AddParquetFileData = new AddParquetFileData(source, seq.toDS().toDF(), src, tmp)
def apply(seq: Seq[String], src: File, tmp: File): AddParquetFileData = {
AddParquetFileData(seq.toDS().toDF(), src, tmp)
}
def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
val file = Utils.tempFileWith(new File(tmp, "parquet"))
......@@ -71,11 +92,11 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
}
/** Use `format` and `path` to create FileStreamSource via DataFrameReader */
def createFileStreamSource(
def createFileStream(
format: String,
path: String,
schema: Option[StructType] = None): FileStreamSource = {
val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
schema: Option[StructType] = None): DataFrame = {
val reader =
if (schema.isDefined) {
sqlContext.read.format(format).schema(schema.get)
......@@ -83,14 +104,18 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
sqlContext.read.format(format)
}
reader.stream(path)
.queryExecution.analyzed
}
protected def getSourceFromFileStream(df: DataFrame): FileStreamSource = {
val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
df.queryExecution.analyzed
.collect { case StreamingRelation(dataSource, _, _) =>
// There is only one source in our tests so just set sourceId to 0
dataSource.createSource(s"$checkpointLocation/sources/0").asInstanceOf[FileStreamSource]
}.head
}
def withTempDirs(body: (File, File) => Unit) {
protected def withTempDirs(body: (File, File) => Unit) {
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
try {
......@@ -108,6 +133,14 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
import testImplicits._
/** Use `format` and `path` to create FileStreamSource via DataFrameReader */
private def createFileStreamSource(
format: String,
path: String,
schema: Option[StructType] = None): FileStreamSource = {
getSourceFromFileStream(createFileStream(format, path, schema))
}
private def createFileStreamSourceAndGetSchema(
format: Option[String],
path: Option[String],
......@@ -226,104 +259,88 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("read from text files") {
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val textSource = createFileStreamSource("text", src.getCanonicalPath)
val filtered = textSource.toDF().filter($"value" contains "keep")
testStream(filtered)(
AddTextFileData(textSource, "drop1\nkeep2\nkeep3", src, tmp),
CheckAnswer("keep2", "keep3"),
StopStream,
AddTextFileData(textSource, "drop4\nkeep5\nkeep6", src, tmp),
StartStream,
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddTextFileData(textSource, "drop7\nkeep8\nkeep9", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
)
Utils.deleteRecursively(src)
Utils.deleteRecursively(tmp)
withTempDirs { case (src, tmp) =>
val textStream = createFileStream("text", src.getCanonicalPath)
val filtered = textStream.filter($"value" contains "keep")
testStream(filtered)(
AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
CheckAnswer("keep2", "keep3"),
StopStream,
AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
StartStream,
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
)
}
}
test("read from json files") {
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val textSource = createFileStreamSource("json", src.getCanonicalPath, Some(valueSchema))
val filtered = textSource.toDF().filter($"value" contains "keep")
testStream(filtered)(
AddTextFileData(
textSource,
"{'value': 'drop1'}\n{'value': 'keep2'}\n{'value': 'keep3'}",
src,
tmp),
CheckAnswer("keep2", "keep3"),
StopStream,
AddTextFileData(
textSource,
"{'value': 'drop4'}\n{'value': 'keep5'}\n{'value': 'keep6'}",
src,
tmp),
StartStream,
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddTextFileData(
textSource,
"{'value': 'drop7'}\n{'value': 'keep8'}\n{'value': 'keep9'}",
src,
tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
)
Utils.deleteRecursively(src)
Utils.deleteRecursively(tmp)
withTempDirs { case (src, tmp) =>
val fileStream = createFileStream("json", src.getCanonicalPath, Some(valueSchema))
val filtered = fileStream.filter($"value" contains "keep")
testStream(filtered)(
AddTextFileData(
"{'value': 'drop1'}\n{'value': 'keep2'}\n{'value': 'keep3'}",
src,
tmp),
CheckAnswer("keep2", "keep3"),
StopStream,
AddTextFileData(
"{'value': 'drop4'}\n{'value': 'keep5'}\n{'value': 'keep6'}",
src,
tmp),
StartStream,
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddTextFileData(
"{'value': 'drop7'}\n{'value': 'keep8'}\n{'value': 'keep9'}",
src,
tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
)
}
}
test("read from json files with inferring schema") {
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
// Add a file so that we can infer its schema
stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
withTempDirs { case (src, tmp) =>
val textSource = createFileStreamSource("json", src.getCanonicalPath)
// Add a file so that we can infer its schema
stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
// FileStreamSource should infer the column "c"
val filtered = textSource.toDF().filter($"c" contains "keep")
val fileStream = createFileStream("json", src.getCanonicalPath)
assert(fileStream.schema === StructType(Seq(StructField("c", StringType))))
testStream(filtered)(
AddTextFileData(textSource, "{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6")
)
// FileStreamSource should infer the column "c"
val filtered = fileStream.filter($"c" contains "keep")
Utils.deleteRecursively(src)
Utils.deleteRecursively(tmp)
testStream(filtered)(
AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6")
)
}
}
test("reading from json files inside partitioned directory") {
val src = {
val base = Utils.createTempDir(namePrefix = "streaming.src")
new File(base, "type=X")
}
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
src.mkdirs()
withTempDirs { case (baseSrc, tmp) =>
val src = new File(baseSrc, "type=X")
src.mkdirs()
// Add a file so that we can infer its schema
stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
// Add a file so that we can infer its schema
stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
val textSource = createFileStreamSource("json", src.getCanonicalPath)
val fileStream = createFileStream("json", src.getCanonicalPath)
// FileStreamSource should infer the column "c"
val filtered = textSource.toDF().filter($"c" contains "keep")
// FileStreamSource should infer the column "c"
val filtered = fileStream.filter($"c" contains "keep")
testStream(filtered)(
AddTextFileData(textSource, "{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6")
)
testStream(filtered)(
AddTextFileData("{'c': 'drop4'}\n{'c': 'keep5'}\n{'c': 'keep6'}", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6")
)
}
}
......@@ -333,52 +350,47 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
// Add a file so that we can infer its schema
stringToFile(new File(src, "existing"), "{'k': 'value0'}")
val textSource = createFileStreamSource("json", src.getCanonicalPath)
val fileStream = createFileStream("json", src.getCanonicalPath)
// FileStreamSource should infer the column "k"
val text = textSource.toDF()
assert(text.schema === StructType(Seq(StructField("k", StringType))))
assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
// After creating DF and before starting stream, add data with different schema
// Should not affect the inferred schema any more
stringToFile(new File(src, "existing2"), "{'k': 'value1', 'v': 'new'}")
testStream(text)(
testStream(fileStream)(
// Should not pick up column v in the file added before start
AddTextFileData(textSource, "{'k': 'value2'}", src, tmp),
AddTextFileData("{'k': 'value2'}", src, tmp),
CheckAnswer("value0", "value1", "value2"),
// Should read data in column k, and ignore v
AddTextFileData(textSource, "{'k': 'value3', 'v': 'new'}", src, tmp),
AddTextFileData("{'k': 'value3', 'v': 'new'}", src, tmp),
CheckAnswer("value0", "value1", "value2", "value3"),
// Should ignore rows that do not have the necessary k column
AddTextFileData(textSource, "{'v': 'value4'}", src, tmp),
AddTextFileData("{'v': 'value4'}", src, tmp),
CheckAnswer("value0", "value1", "value2", "value3", null))
}
}
test("read from parquet files") {
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val fileSource = createFileStreamSource("parquet", src.getCanonicalPath, Some(valueSchema))
val filtered = fileSource.toDF().filter($"value" contains "keep")
testStream(filtered)(
AddParquetFileData(fileSource, Seq("drop1", "keep2", "keep3"), src, tmp),
CheckAnswer("keep2", "keep3"),
StopStream,
AddParquetFileData(fileSource, Seq("drop4", "keep5", "keep6"), src, tmp),
StartStream,
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddParquetFileData(fileSource, Seq("drop7", "keep8", "keep9"), src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
)
Utils.deleteRecursively(src)
Utils.deleteRecursively(tmp)
withTempDirs { case (src, tmp) =>
val fileStream = createFileStream("parquet", src.getCanonicalPath, Some(valueSchema))
val filtered = fileStream.filter($"value" contains "keep")
testStream(filtered)(
AddParquetFileData(Seq("drop1", "keep2", "keep3"), src, tmp),
CheckAnswer("keep2", "keep3"),
StopStream,
AddParquetFileData(Seq("drop4", "keep5", "keep6"), src, tmp),
StartStream,
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddParquetFileData(Seq("drop7", "keep8", "keep9"), src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
)
}
}
test("read from parquet files with changing schema") {
......@@ -387,69 +399,62 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
// Add a file so that we can infer its schema
AddParquetFileData.writeToFile(Seq("value0").toDF("k"), src, tmp)
val fileSource = createFileStreamSource("parquet", src.getCanonicalPath)
val parquetData = fileSource.toDF()
val fileStream = createFileStream("parquet", src.getCanonicalPath)
// FileStreamSource should infer the column "k"
assert(parquetData.schema === StructType(Seq(StructField("k", StringType))))
assert(fileStream.schema === StructType(Seq(StructField("k", StringType))))
// After creating DF and before starting stream, add data with different schema
// Should not affect the inferred schema any more
AddParquetFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp)
testStream(parquetData)(
testStream(fileStream)(
// Should not pick up column v in the file added before start
AddParquetFileData(fileSource, Seq("value2").toDF("k"), src, tmp),
AddParquetFileData(Seq("value2").toDF("k"), src, tmp),
CheckAnswer("value0", "value1", "value2"),
// Should read data in column k, and ignore v
AddParquetFileData(fileSource, Seq(("value3", 1)).toDF("k", "v"), src, tmp),
AddParquetFileData(Seq(("value3", 1)).toDF("k", "v"), src, tmp),
CheckAnswer("value0", "value1", "value2", "value3"),
// Should ignore rows that do not have the necessary k column
AddParquetFileData(fileSource, Seq("value5").toDF("v"), src, tmp),
AddParquetFileData(Seq("value5").toDF("v"), src, tmp),
CheckAnswer("value0", "value1", "value2", "value3", null)
)
}
}
test("file stream source without schema") {
val src = Utils.createTempDir(namePrefix = "streaming.src")
// Only "text" doesn't need a schema
createFileStreamSource("text", src.getCanonicalPath)
withTempDir { src =>
// Only "text" doesn't need a schema
createFileStream("text", src.getCanonicalPath)
// Both "json" and "parquet" require a schema if no existing file to infer
intercept[AnalysisException] {
createFileStreamSource("json", src.getCanonicalPath)
}
intercept[AnalysisException] {
createFileStreamSource("parquet", src.getCanonicalPath)
// Both "json" and "parquet" require a schema if no existing file to infer
intercept[AnalysisException] {
createFileStream("json", src.getCanonicalPath)
}
intercept[AnalysisException] {
createFileStream("parquet", src.getCanonicalPath)
}
}
Utils.deleteRecursively(src)
}
test("fault tolerance") {
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val textSource = createFileStreamSource("text", src.getCanonicalPath)
val filtered = textSource.toDF().filter($"value" contains "keep")
testStream(filtered)(
AddTextFileData(textSource, "drop1\nkeep2\nkeep3", src, tmp),
CheckAnswer("keep2", "keep3"),
StopStream,
AddTextFileData(textSource, "drop4\nkeep5\nkeep6", src, tmp),
StartStream,
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddTextFileData(textSource, "drop7\nkeep8\nkeep9", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
)
Utils.deleteRecursively(src)
Utils.deleteRecursively(tmp)
withTempDirs { case (src, tmp) =>
val fileStream = createFileStream("text", src.getCanonicalPath)
val filtered = fileStream.filter($"value" contains "keep")
testStream(filtered)(
AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
CheckAnswer("keep2", "keep3"),
StopStream,
AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
StartStream,
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
)
}
}
}
......@@ -461,10 +466,10 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQ
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val textSource = createFileStreamSource("text", src.getCanonicalPath)
val ds = textSource.toDS[String]().map(_.toInt + 1)
val fileStream = createFileStream("text", src.getCanonicalPath)
val ds = fileStream.as[String].map(_.toInt + 1)
runStressTest(ds, data => {
AddTextFileData(textSource, data.mkString("\n"), src, tmp)
AddTextFileData(data.mkString("\n"), src, tmp)
})
Utils.deleteRecursively(src)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment