Skip to content
Snippets Groups Projects
Commit c35c60fa authored by proflin's avatar proflin Committed by Sean Owen
Browse files

[SPARK-14028][STREAMING][KINESIS][TESTS] Remove deprecated methods; fix two other warnings

## What changes were proposed in this pull request?

- Removed two methods that has been deprecated since 1.4
- Fixed two other compilation warnings

## How was this patch tested?

existing test suits

Author: proflin <proflin.me@gmail.com>

Closes #11850 from lw-lin/streaming-kinesis-deprecates-warnings.
parent 761c2d1b
No related branches found
No related tags found
No related merge requests found
......@@ -221,51 +221,6 @@ object KinesisUtils {
}
}
/**
* Create an input stream that pulls messages from a Kinesis stream.
* This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
*
* Note:
*
* - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
* on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
* gets AWS credentials.
* - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
* - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name
* in [[org.apache.spark.SparkConf]].
*
* @param ssc StreamingContext object
* @param streamName Kinesis stream name
* @param endpointUrl Endpoint url of Kinesis service
* (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
* See the Kinesis Spark Streaming documentation for more
* details on the different types of checkpoints.
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
* @param storageLevel Storage level to use for storing the received objects
* StorageLevel.MEMORY_AND_DISK_2 is recommended.
*/
@deprecated("use other forms of createStream", "1.4.0")
def createStream(
ssc: StreamingContext,
streamName: String,
endpointUrl: String,
checkpointInterval: Duration,
initialPositionInStream: InitialPositionInStream,
storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]] = {
ssc.withNamedScope("kinesis stream") {
new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl,
getRegionByEndpoint(endpointUrl), initialPositionInStream, ssc.sc.appName,
checkpointInterval, storageLevel, defaultMessageHandler, None)
}
}
/**
* Create an input stream that pulls messages from a Kinesis stream.
* This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
......@@ -453,47 +408,6 @@ object KinesisUtils {
defaultMessageHandler(_), awsAccessKeyId, awsSecretKey)
}
/**
* Create an input stream that pulls messages from a Kinesis stream.
* This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
*
* Note:
* - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
* on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
* gets AWS credentials.
* - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
* - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in
* [[org.apache.spark.SparkConf]].
*
* @param jssc Java StreamingContext object
* @param streamName Kinesis stream name
* @param endpointUrl Endpoint url of Kinesis service
* (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
* See the Kinesis Spark Streaming documentation for more
* details on the different types of checkpoints.
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
* @param storageLevel Storage level to use for storing the received objects
* StorageLevel.MEMORY_AND_DISK_2 is recommended.
*/
@deprecated("use other forms of createStream", "1.4.0")
def createStream(
jssc: JavaStreamingContext,
streamName: String,
endpointUrl: String,
checkpointInterval: Duration,
initialPositionInStream: InitialPositionInStream,
storageLevel: StorageLevel
): JavaReceiverInputDStream[Array[Byte]] = {
createStream(
jssc.ssc, streamName, endpointUrl, checkpointInterval, initialPositionInStream, storageLevel)
}
private def getRegionByEndpoint(endpointUrl: String): String = {
RegionUtils.getRegionByEndpoint(endpointUrl).getName()
}
......
......@@ -17,6 +17,7 @@
package org.apache.spark.streaming.kinesis;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.kinesis.model.Record;
import org.junit.Test;
......@@ -34,11 +35,13 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn
public class JavaKinesisStreamSuite extends LocalJavaStreamingContext {
@Test
public void testKinesisStream() {
// Tests the API, does not actually test data receiving
JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
"https://kinesis.us-west-2.amazonaws.com", new Duration(2000),
InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2());
String dummyEndpointUrl = KinesisTestUtils.defaultEndpointUrl();
String dummyRegionName = RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName();
// Tests the API, does not actually test data receiving
JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, new Duration(2000),
StorageLevel.MEMORY_AND_DISK_2());
ssc.stop();
}
......
......@@ -40,7 +40,7 @@ trait KinesisFunSuite extends SparkFunSuite {
if (shouldRunTests) {
body
} else {
ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")()
ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")(())
}
}
}
......@@ -99,14 +99,10 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
}
test("KinesisUtils API") {
// Tests the API, does not actually test data receiving
val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
dummyEndpointUrl, Seconds(2),
InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
val kinesisStream1 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
dummyEndpointUrl, dummyRegionName,
InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream",
dummyEndpointUrl, dummyRegionName,
InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
dummyAWSAccessKey, dummyAWSSecretKey)
......@@ -154,7 +150,9 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
// Verify that KinesisBackedBlockRDD is generated even when there are no blocks
val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty)
emptyRDD shouldBe a [KinesisBackedBlockRDD[Array[Byte]]]
// Verify it's KinesisBackedBlockRDD[_] rather than KinesisBackedBlockRDD[Array[Byte]], because
// the type parameter will be erased at runtime
emptyRDD shouldBe a [KinesisBackedBlockRDD[_]]
emptyRDD.partitions shouldBe empty
// Verify that the KinesisBackedBlockRDD has isBlockValid = false when blocks are invalid
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment