Skip to content
Snippets Groups Projects
Commit e48ebc4e authored by jerryshao's avatar jerryshao Committed by Shixiong Zhu
Browse files

[SPARK-15698][SQL][STREAMING][FOLLW-UP] Fix FileStream source and sink log get configuration issue

## What changes were proposed in this pull request?

This issue was introduced in the previous commit of SPARK-15698. Mistakenly change the way to get configuration back to original one, so here with the follow up PR to revert them up.

## How was this patch tested?

N/A

Ping zsxwing , please review again, sorry to bring the inconvenience. Thanks a lot.

Author: jerryshao <sshao@hortonworks.com>

Closes #15173 from jerryshao/SPARK-15698-follow.
parent 1ea49916
No related branches found
No related tags found
No related merge requests found
......@@ -84,14 +84,11 @@ class FileStreamSinkLog(
private implicit val formats = Serialization.formats(NoTypeHints)
protected override val fileCleanupDelayMs =
sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
protected override val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay
protected override val isDeletingExpiredLog =
sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION)
protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion
protected override val compactInterval =
sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
protected override val compactInterval = sparkSession.sessionState.conf.fileSinkLogCompactInterval
require(compactInterval > 0,
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
"to a positive value.")
......
......@@ -39,16 +39,15 @@ class FileStreamSourceLog(
// Configurations about metadata compaction
protected override val compactInterval =
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
sparkSession.sessionState.conf.fileSourceLogCompactInterval
require(compactInterval > 0,
s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " +
s"positive value.")
protected override val fileCleanupDelayMs =
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)
sparkSession.sessionState.conf.fileSourceLogCleanupDelay
protected override val isDeletingExpiredLog =
sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)
protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSourceLogDeletion
private implicit val formats = Serialization.formats(NoTypeHints)
......
......@@ -620,10 +620,16 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
def fileSinkLogCompatInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL)
def fileSinkLogCompactInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL)
def fileSinkLogCleanupDelay: Long = getConf(FILE_SINK_LOG_CLEANUP_DELAY)
def fileSourceLogDeletion: Boolean = getConf(FILE_SOURCE_LOG_DELETION)
def fileSourceLogCompactInterval: Int = getConf(FILE_SOURCE_LOG_COMPACT_INTERVAL)
def fileSourceLogCleanupDelay: Long = getConf(FILE_SOURCE_LOG_CLEANUP_DELAY)
def streamingSchemaInference: Boolean = getConf(STREAMING_SCHEMA_INFERENCE)
def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment