Skip to content
Snippets Groups Projects
Commit 993923c8 authored by hongshen's avatar hongshen Committed by Sean Owen
Browse files

[SPARK-16985] Change dataFormat from yyyyMMddHHmm to yyyyMMddHHmmss

## What changes were proposed in this pull request?

In our cluster, sometimes the sql output maybe overrided. When I submit some sql, all insert into the same table, and the sql will cost less one minute, here is the detail,
1 sql1, 11:03 insert into table.
2 sql2, 11:04:11 insert into table.
3 sql3, 11:04:48 insert into table.
4 sql4, 11:05 insert into table.
5 sql5, 11:06 insert into table.
The sql3's output file will override the sql2's output file. here is the log:
```
16/05/04 11:04:11 INFO hive.SparkHiveHadoopWriter: XXfinalPath=hdfs://tl-sng-gdt-nn-tdw.tencent-distribute.com:54310/tmp/assorz/tdw-tdwadmin/20160504/04559505496526517_-1_1204544348/10000/_tmp.p_20160428/attempt_201605041104_0001_m_000000_1

16/05/04 11:04:48 INFO hive.SparkHiveHadoopWriter: XXfinalPath=hdfs://tl-sng-gdt-nn-tdw.tencent-distribute.com:54310/tmp/assorz/tdw-tdwadmin/20160504/04559505496526517_-1_212180468/10000/_tmp.p_20160428/attempt_201605041104_0001_m_000000_1

```

The reason is the output file use SimpleDateFormat("yyyyMMddHHmm"), if two sql insert into the same table in the same minute, the output will be overrite. I think we should change dateFormat to "yyyyMMddHHmmss", in our cluster, we can't finished a sql in one second.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Author: hongshen <shenh062326@126.com>

Closes #14574 from shenh062326/SPARK-16985.
parent 00e103a6
No related branches found
No related tags found
No related merge requests found
...@@ -67,7 +67,7 @@ class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable { ...@@ -67,7 +67,7 @@ class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable {
def setup(jobid: Int, splitid: Int, attemptid: Int) { def setup(jobid: Int, splitid: Int, attemptid: Int) {
setIDs(jobid, splitid, attemptid) setIDs(jobid, splitid, attemptid)
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(now), HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmmss").format(now),
jobid, splitID, attemptID, conf.value) jobid, splitID, attemptID, conf.value)
} }
...@@ -162,7 +162,7 @@ class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable { ...@@ -162,7 +162,7 @@ class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable {
private[spark] private[spark]
object SparkHadoopWriter { object SparkHadoopWriter {
def createJobID(time: Date, id: Int): JobID = { def createJobID(time: Date, id: Int): JobID = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm") val formatter = new SimpleDateFormat("yyyyMMddHHmmss")
val jobtrackerID = formatter.format(time) val jobtrackerID = formatter.format(time)
new JobID(jobtrackerID, id) new JobID(jobtrackerID, id)
} }
......
...@@ -241,7 +241,7 @@ class HadoopRDD[K, V]( ...@@ -241,7 +241,7 @@ class HadoopRDD[K, V](
var reader: RecordReader[K, V] = null var reader: RecordReader[K, V] = null
val inputFormat = getInputFormat(jobConf) val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime), HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmmss").format(createTime),
context.stageId, theSplit.index, context.attemptNumber, jobConf) context.stageId, theSplit.index, context.attemptNumber, jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
......
...@@ -77,7 +77,7 @@ class NewHadoopRDD[K, V]( ...@@ -77,7 +77,7 @@ class NewHadoopRDD[K, V](
// private val serializableConf = new SerializableWritable(_conf) // private val serializableConf = new SerializableWritable(_conf)
private val jobTrackerId: String = { private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm") val formatter = new SimpleDateFormat("yyyyMMddHHmmss")
formatter.format(new Date()) formatter.format(new Date())
} }
......
...@@ -1079,7 +1079,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) ...@@ -1079,7 +1079,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf val hadoopConf = conf
val job = NewAPIHadoopJob.getInstance(hadoopConf) val job = NewAPIHadoopJob.getInstance(hadoopConf)
val formatter = new SimpleDateFormat("yyyyMMddHHmm") val formatter = new SimpleDateFormat("yyyyMMddHHmmss")
val jobtrackerID = formatter.format(new Date()) val jobtrackerID = formatter.format(new Date())
val stageId = self.id val stageId = self.id
val jobConfiguration = job.getConfiguration val jobConfiguration = job.getConfiguration
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment