Skip to content
Snippets Groups Projects
Commit 93a16434 authored by James Phillpotts's avatar James Phillpotts
Browse files

Allow other twitter authorizations than username/password

parent 1057fccf
No related branches found
No related tags found
No related merge requests found
......@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import twitter4j.Status
import twitter4j.auth.{Authorization, BasicAuthorization}
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
......@@ -372,8 +373,20 @@ class StreamingContext private (
password: String,
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[Status] = twitterStream(new BasicAuthorization(username, password), filters, storageLevel)
/**
* Create a input stream that returns tweets received from Twitter.
* @param twitterAuth Twitter4J authentication
* @param filters Set of filter strings to get only those tweets that match them
* @param storageLevel Storage level to use for storing the received objects
*/
def twitterStream(
twitterAuth: Authorization,
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[Status] = {
val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel)
val inputStream = new TwitterInputDStream(this, twitterAuth, filters, storageLevel)
registerInputStream(inputStream)
inputStream
}
......
......@@ -6,6 +6,7 @@ import storage.StorageLevel
import twitter4j._
import twitter4j.auth.BasicAuthorization
import twitter4j.auth.Authorization
/* A stream of Twitter statuses, potentially filtered by one or more keywords.
*
......@@ -16,21 +17,19 @@ import twitter4j.auth.BasicAuthorization
private[streaming]
class TwitterInputDStream(
@transient ssc_ : StreamingContext,
username: String,
password: String,
twitterAuth: Authorization,
filters: Seq[String],
storageLevel: StorageLevel
) extends NetworkInputDStream[Status](ssc_) {
override def getReceiver(): NetworkReceiver[Status] = {
new TwitterReceiver(username, password, filters, storageLevel)
new TwitterReceiver(twitterAuth, filters, storageLevel)
}
}
private[streaming]
class TwitterReceiver(
username: String,
password: String,
twitterAuth: Authorization,
filters: Seq[String],
storageLevel: StorageLevel
) extends NetworkReceiver[Status] {
......@@ -40,8 +39,7 @@ class TwitterReceiver(
protected override def onStart() {
blockGenerator.start()
twitterStream = new TwitterStreamFactory()
.getInstance(new BasicAuthorization(username, password))
twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
twitterStream.addListener(new StatusListener {
def onStatus(status: Status) = {
blockGenerator += status
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment