Skip to content
Snippets Groups Projects
Commit 0386f42e authored by Henry Saputra's avatar Henry Saputra Committed by Reynold Xin
Browse files

Merge pull request #529 from hsaputra/cleanup_right_arrowop_scala

Change the ⇒ character (maybe from scalariform) to => in Scala code for style consistency

Looks like there are some ⇒ Unicode character (maybe from scalariform) in Scala code.
This PR is to change it to => to get some consistency on the Scala code.

If we want to use ⇒ as default we could use sbt plugin scalariform to make sure all Scala code has ⇒ instead of =>

And remove unused imports found in TwitterInputDStream.scala while I was there =)

Author: Henry Saputra <hsaputra@apache.org>

== Merge branch commits ==

commit 29c1771d346dff901b0b778f764e6b4409900234
Author: Henry Saputra <hsaputra@apache.org>
Date:   Sat Feb 1 22:05:16 2014 -0800

    Change the ⇒ character (maybe from scalariform) to => in Scala code for style consistency.
parent a8cf3ec1
No related branches found
No related tags found
No related merge requests found
...@@ -88,7 +88,7 @@ extends Actor with Receiver { ...@@ -88,7 +88,7 @@ extends Actor with Receiver {
override def preStart = remotePublisher ! SubscribeReceiver(context.self) override def preStart = remotePublisher ! SubscribeReceiver(context.self)
def receive = { def receive = {
case msg pushBlock(msg.asInstanceOf[T]) case msg => pushBlock(msg.asInstanceOf[T])
} }
override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self) override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
......
...@@ -17,14 +17,11 @@ ...@@ -17,14 +17,11 @@
package org.apache.spark.streaming.twitter package org.apache.spark.streaming.twitter
import java.util.prefs.Preferences
import twitter4j._ import twitter4j._
import twitter4j.auth.Authorization import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder import twitter4j.conf.ConfigurationBuilder
import twitter4j.conf.PropertyConfiguration
import twitter4j.auth.OAuthAuthorization import twitter4j.auth.OAuthAuthorization
import twitter4j.auth.AccessToken
import org.apache.spark._
import org.apache.spark.streaming._ import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel
......
...@@ -31,7 +31,7 @@ import org.apache.spark.streaming.receivers._ ...@@ -31,7 +31,7 @@ import org.apache.spark.streaming.receivers._
*/ */
private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String, private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
subscribe: Subscribe, subscribe: Subscribe,
bytesToObjects: Seq[ByteString] Iterator[T]) bytesToObjects: Seq[ByteString] => Iterator[T])
extends Actor with Receiver with Logging { extends Actor with Receiver with Logging {
override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self), override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
...@@ -39,16 +39,16 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String, ...@@ -39,16 +39,16 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
def receive: Receive = { def receive: Receive = {
case Connecting logInfo("connecting ...") case Connecting => logInfo("connecting ...")
case m: ZMQMessage case m: ZMQMessage =>
logDebug("Received message for:" + m.frame(0)) logDebug("Received message for:" + m.frame(0))
//We ignore first frame for processing as it is the topic //We ignore first frame for processing as it is the topic
val bytes = m.frames.tail val bytes = m.frames.tail
pushBlock(bytesToObjects(bytes)) pushBlock(bytesToObjects(bytes))
case Closed logInfo("received closed ") case Closed => logInfo("received closed ")
} }
} }
...@@ -46,7 +46,7 @@ object ZeroMQUtils { ...@@ -46,7 +46,7 @@ object ZeroMQUtils {
ssc: StreamingContext, ssc: StreamingContext,
publisherUrl: String, publisherUrl: String,
subscribe: Subscribe, subscribe: Subscribe,
bytesToObjects: Seq[ByteString] Iterator[T], bytesToObjects: Seq[ByteString] => Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
): DStream[T] = { ): DStream[T] = {
......
...@@ -37,8 +37,8 @@ object ReceiverSupervisorStrategy { ...@@ -37,8 +37,8 @@ object ReceiverSupervisorStrategy {
val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
15 millis) { 15 millis) {
case _: RuntimeException Restart case _: RuntimeException => Restart
case _: Exception Escalate case _: Exception => Escalate
} }
} }
...@@ -66,7 +66,7 @@ object ReceiverSupervisorStrategy { ...@@ -66,7 +66,7 @@ object ReceiverSupervisorStrategy {
*/ */
trait Receiver { trait Receiver {
self: Actor // to ensure that this can be added to Actor classes only self: Actor => // to ensure that this can be added to Actor classes only
/** /**
* Push an iterator received data into Spark Streaming for processing * Push an iterator received data into Spark Streaming for processing
...@@ -139,25 +139,25 @@ private[streaming] class ActorReceiver[T: ClassTag]( ...@@ -139,25 +139,25 @@ private[streaming] class ActorReceiver[T: ClassTag](
def receive = { def receive = {
case Data(iter: Iterator[_]) pushBlock(iter.asInstanceOf[Iterator[T]]) case Data(iter: Iterator[_]) => pushBlock(iter.asInstanceOf[Iterator[T]])
case Data(msg) case Data(msg) =>
blocksGenerator += msg.asInstanceOf[T] blocksGenerator += msg.asInstanceOf[T]
n.incrementAndGet n.incrementAndGet
case props: Props case props: Props =>
val worker = context.actorOf(props) val worker = context.actorOf(props)
logInfo("Started receiver worker at:" + worker.path) logInfo("Started receiver worker at:" + worker.path)
sender ! worker sender ! worker
case (props: Props, name: String) case (props: Props, name: String) =>
val worker = context.actorOf(props, name) val worker = context.actorOf(props, name)
logInfo("Started receiver worker at:" + worker.path) logInfo("Started receiver worker at:" + worker.path)
sender ! worker sender ! worker
case _: PossiblyHarmful => hiccups.incrementAndGet() case _: PossiblyHarmful => hiccups.incrementAndGet()
case _: Statistics case _: Statistics =>
val workers = context.children val workers = context.children
sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n")) sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n"))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment