From d0212eb0f22473ee5482fe98dafc24e16ffcfc63 Mon Sep 17 00:00:00 2001
From: Shixiong Zhu <shixiong@databricks.com>
Date: Tue, 22 Nov 2016 16:49:15 -0800
Subject: [PATCH] [SPARK-18530][SS][KAFKA] Change Kafka timestamp column type
 to TimestampType

## What changes were proposed in this pull request?

Changed Kafka timestamp column type to TimestampType.

## How was this patch tested?

`test("Kafka column types")`.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #15969 from zsxwing/SPARK-18530.
---
 .../spark/sql/kafka010/KafkaSource.scala      | 16 +++-
 .../spark/sql/kafka010/KafkaSourceSuite.scala | 81 ++++++++++++++++++-
 2 files changed, 93 insertions(+), 4 deletions(-)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 1d0d402b82..d9ab4bb4f8 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -32,9 +32,12 @@ import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.kafka010.KafkaSource._
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.UninterruptibleThread
 
 /**
@@ -282,7 +285,14 @@ private[kafka010] case class KafkaSource(
     // Create an RDD that reads from Kafka and get the (key, value) pair as byte arrays.
     val rdd = new KafkaSourceRDD(
       sc, executorKafkaParams, offsetRanges, pollTimeoutMs, failOnDataLoss).map { cr =>
-      Row(cr.key, cr.value, cr.topic, cr.partition, cr.offset, cr.timestamp, cr.timestampType.id)
+      InternalRow(
+        cr.key,
+        cr.value,
+        UTF8String.fromString(cr.topic),
+        cr.partition,
+        cr.offset,
+        DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)),
+        cr.timestampType.id)
     }
 
     logInfo("GetBatch generating RDD of offset range: " +
@@ -293,7 +303,7 @@ private[kafka010] case class KafkaSource(
       currentPartitionOffsets = Some(untilPartitionOffsets)
     }
 
-    sqlContext.createDataFrame(rdd, schema)
+    sqlContext.internalCreateDataFrame(rdd, schema)
   }
 
   /** Stop this source and free any resources it has allocated. */
@@ -496,7 +506,7 @@ private[kafka010] object KafkaSource {
     StructField("topic", StringType),
     StructField("partition", IntegerType),
     StructField("offset", LongType),
-    StructField("timestamp", LongType),
+    StructField("timestamp", TimestampType),
     StructField("timestampType", IntegerType)
   ))
 
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index cd52fd93d1..f9f62581a3 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark.sql.kafka010
 
+import java.nio.charset.StandardCharsets.UTF_8
 import java.util.Properties
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.Random
 
@@ -33,6 +33,7 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.sql.ForeachWriter
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
 import org.apache.spark.sql.test.SharedSQLContext
 
@@ -551,6 +552,84 @@ class KafkaSourceSuite extends KafkaSourceTest {
     )
   }
 
+  test("Kafka column types") {
+    val now = System.currentTimeMillis()
+    val topic = newTopic()
+    testUtils.createTopic(newTopic(), partitions = 1)
+    testUtils.sendMessages(topic, Array(1).map(_.toString))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("startingOffsets", s"earliest")
+      .option("subscribe", topic)
+      .load()
+
+    val query = kafka
+      .writeStream
+      .format("memory")
+      .outputMode("append")
+      .queryName("kafkaColumnTypes")
+      .start()
+    query.processAllAvailable()
+    val rows = spark.table("kafkaColumnTypes").collect()
+    assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
+    val row = rows(0)
+    assert(row.getAs[Array[Byte]]("key") === null, s"Unexpected results: $row")
+    assert(row.getAs[Array[Byte]]("value") === "1".getBytes(UTF_8), s"Unexpected results: $row")
+    assert(row.getAs[String]("topic") === topic, s"Unexpected results: $row")
+    assert(row.getAs[Int]("partition") === 0, s"Unexpected results: $row")
+    assert(row.getAs[Long]("offset") === 0L, s"Unexpected results: $row")
+    // We cannot check the exact timestamp as it's the time that messages were inserted by the
+    // producer. So here we just use a low bound to make sure the internal conversion works.
+    assert(row.getAs[java.sql.Timestamp]("timestamp").getTime >= now, s"Unexpected results: $row")
+    assert(row.getAs[Int]("timestampType") === 0, s"Unexpected results: $row")
+    query.stop()
+  }
+
+  test("KafkaSource with watermark") {
+    val now = System.currentTimeMillis()
+    val topic = newTopic()
+    testUtils.createTopic(newTopic(), partitions = 1)
+    testUtils.sendMessages(topic, Array(1).map(_.toString))
+
+    val kafka = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("startingOffsets", s"earliest")
+      .option("subscribe", topic)
+      .load()
+
+    val windowedAggregation = kafka
+      .withWatermark("timestamp", "10 seconds")
+      .groupBy(window($"timestamp", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start") as 'window, $"count")
+
+    val query = windowedAggregation
+      .writeStream
+      .format("memory")
+      .outputMode("complete")
+      .queryName("kafkaWatermark")
+      .start()
+    query.processAllAvailable()
+    val rows = spark.table("kafkaWatermark").collect()
+    assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
+    val row = rows(0)
+    // We cannot check the exact window start time as it depands on the time that messages were
+    // inserted by the producer. So here we just use a low bound to make sure the internal
+    // conversion works.
+    assert(
+      row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000,
+      s"Unexpected results: $row")
+    assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row")
+    query.stop()
+  }
+
   private def testFromLatestOffsets(
       topic: String,
       addPartitions: Boolean,
-- 
GitLab