Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
spark
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
cs525-sp18-g07
spark
Commits
8d775448
Commit
8d775448
authored
14 years ago
by
Mosharaf Chowdhury
Browse files
Options
Downloads
Patches
Plain Diff
- Resolved some of the simpler TODOs related to different timeout and wait periods.
- Removed unused code.
parent
830496b9
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
src/scala/spark/Broadcast.scala
+14
-66
14 additions, 66 deletions
src/scala/spark/Broadcast.scala
with
14 additions
and
66 deletions
src/scala/spark/Broadcast.scala
+
14
−
66
View file @
8d775448
...
@@ -435,7 +435,7 @@ extends BroadcastRecipe with Logging {
...
@@ -435,7 +435,7 @@ extends BroadcastRecipe with Logging {
// TODO: Must fix this. This might never break if broadcast fails.
// TODO: Must fix this. This might never break if broadcast fails.
// We should be able to break and send false. Also need to kill threads
// We should be able to break and send false. Also need to kill threads
while
(
hasBlocks
<
totalBlocks
)
{
while
(
hasBlocks
<
totalBlocks
)
{
Thread
.
sleep
(
1000
)
Thread
.
sleep
(
BroadcastBT
.
MaxKnockInterval
)
}
}
return
true
return
true
...
@@ -473,14 +473,13 @@ extends BroadcastRecipe with Logging {
...
@@ -473,14 +473,13 @@ extends BroadcastRecipe with Logging {
}
}
// Sleep for a while before starting some more threads
// Sleep for a while before starting some more threads
// TODO: Whats up with this?
Thread
.
sleep
(
BroadcastBT
.
MinKnockInterval
)
Thread
.
sleep
(
500
)
}
}
// Shutdown the thread pool
// Shutdown the thread pool
threadPool
.
shutdown
threadPool
.
shutdown
}
}
//
TODO:
Right now picking the one that has the most blocks this peer wants
// Right now picking the one that has the most blocks this peer wants
// Also picking peer randomly if no one has anything interesting
// Also picking peer randomly if no one has anything interesting
private
def
pickPeerToTalkTo
:
SourceInfo
=
{
private
def
pickPeerToTalkTo
:
SourceInfo
=
{
var
curPeer
:
SourceInfo
=
null
var
curPeer
:
SourceInfo
=
null
...
@@ -509,7 +508,7 @@ extends BroadcastRecipe with Logging {
...
@@ -509,7 +508,7 @@ extends BroadcastRecipe with Logging {
}
}
// Always pick randomly or randomly pick randomly?
// Always pick randomly or randomly pick randomly?
//
TODO: Now its always
//
Now always picking randomly
if
(
curPeer
==
null
&&
peersNotInUse
.
size
>
0
)
{
if
(
curPeer
==
null
&&
peersNotInUse
.
size
>
0
)
{
// Pick uniformly the i'th required peer
// Pick uniformly the i'th required peer
var
i
=
BroadcastBT
.
ranGen
.
nextInt
(
peersNotInUse
.
size
)
var
i
=
BroadcastBT
.
ranGen
.
nextInt
(
peersNotInUse
.
size
)
...
@@ -548,9 +547,8 @@ extends BroadcastRecipe with Logging {
...
@@ -548,9 +547,8 @@ extends BroadcastRecipe with Logging {
}
}
}
}
// TODO: Fix a value for timeout timer
var
timeOutTimer
=
new
Timer
var
timeOutTimer
=
new
Timer
timeOutTimer
.
schedule
(
timeOutTask
,
1
*
1000
)
timeOutTimer
.
schedule
(
timeOutTask
,
BroadcastBT
.
MaxKnockInterval
)
logInfo
(
"TalkToPeer started... => "
+
peerToTalkTo
)
logInfo
(
"TalkToPeer started... => "
+
peerToTalkTo
)
...
@@ -638,13 +636,13 @@ extends BroadcastRecipe with Logging {
...
@@ -638,13 +636,13 @@ extends BroadcastRecipe with Logging {
case
eofe
:
java.io.EOFException
=>
{
}
case
eofe
:
java.io.EOFException
=>
{
}
case
e
:
Exception
=>
{
case
e
:
Exception
=>
{
logInfo
(
"TalktoPeer had a "
+
e
)
logInfo
(
"TalktoPeer had a "
+
e
)
// Remove
this pInfo
from listOfSources
//
TODO:
Remove
'newPeerToTalkTo'
from listOfSources
//
TODO:
We probably should have the following in some form, but not
// We probably should have the following in some form, but not
// really here. This exception can happen if the sender just breaks connection
// really here. This exception can happen if the sender just breaks connection
//
listOfSources.synchronized {
//
listOfSources.synchronized {
//
logInfo ("Exception in TalkToPeer. Removing source: " + peerToTalkTo)
//
logInfo ("Exception in TalkToPeer. Removing source: " + peerToTalkTo)
//
listOfSources = listOfSources - peerToTalkTo
//
listOfSources = listOfSources - peerToTalkTo
//
}
//
}
}
}
}
finally
{
}
finally
{
// blockToAskFor != -1 => there was an exception
// blockToAskFor != -1 => there was an exception
...
@@ -918,8 +916,7 @@ extends BroadcastRecipe with Logging {
...
@@ -918,8 +916,7 @@ extends BroadcastRecipe with Logging {
class
ServeMultipleRequests
class
ServeMultipleRequests
extends
Thread
with
Logging
{
extends
Thread
with
Logging
{
override
def
run
:
Unit
=
{
override
def
run
:
Unit
=
{
// TODO: Not sure if this will be able to fix the number of outgoing links
// Server at most BroadcastBT.MaxRxPeers peers
// We should have a timeout mechanism on the receiver side
var
threadPool
=
var
threadPool
=
BroadcastBT
.
newDaemonFixedThreadPool
(
BroadcastBT
.
MaxRxPeers
)
BroadcastBT
.
newDaemonFixedThreadPool
(
BroadcastBT
.
MaxRxPeers
)
...
@@ -1006,13 +1003,6 @@ extends BroadcastRecipe with Logging {
...
@@ -1006,13 +1003,6 @@ extends BroadcastRecipe with Logging {
sendBlock
(
blockToSend
)
sendBlock
(
blockToSend
)
rxSourceInfo
.
hasBlocksBitVector
.
set
(
blockToSend
)
rxSourceInfo
.
hasBlocksBitVector
.
set
(
blockToSend
)
// val sentBlock = pickAndSendBlock (rxSourceInfo.hasBlocksBitVector)
// if (sentBlock < 0) {
// keepSending = false
// } else {
// rxSourceInfo.hasBlocksBitVector.set (sentBlock)
// }
numBlocksToSend
=
numBlocksToSend
-
1
numBlocksToSend
=
numBlocksToSend
-
1
// Receive latest SourceInfo from the receiver
// Receive latest SourceInfo from the receiver
...
@@ -1023,7 +1013,6 @@ extends BroadcastRecipe with Logging {
...
@@ -1023,7 +1013,6 @@ extends BroadcastRecipe with Logging {
curTime
=
System
.
currentTimeMillis
curTime
=
System
.
currentTimeMillis
}
}
}
catch
{
}
catch
{
// TODO: Need to add better exception handling here
// If something went wrong, e.g., the worker at the other end died etc.
// If something went wrong, e.g., the worker at the other end died etc.
// then close everything up
// then close everything up
// Exception can happen if the receiver stops receiving
// Exception can happen if the receiver stops receiving
...
@@ -1045,46 +1034,11 @@ extends BroadcastRecipe with Logging {
...
@@ -1045,46 +1034,11 @@ extends BroadcastRecipe with Logging {
oos
.
flush
oos
.
flush
}
catch
{
}
catch
{
case
e
:
Exception
=>
{
case
e
:
Exception
=>
{
logInfo
(
"
pickAndS
endBlock had a "
+
e
)
logInfo
(
"
s
endBlock had a "
+
e
)
}
}
}
}
logInfo
(
"Sent block: "
+
blockToSend
+
" to "
+
clientSocket
)
logInfo
(
"Sent block: "
+
blockToSend
+
" to "
+
clientSocket
)
}
}
// Right now picking the rarest first block
private
def
pickAndSendBlock
(
rxHasBlocksBitVector
:
BitSet
)
:
Int
=
{
var
blockIndex
=
-
1
var
minCopies
=
Int
.
MaxValue
var
nextIndex
=
-
1
// Figure out which blocks the receiver doesn't have
var
tempHasBlocksBitVector
=
rxHasBlocksBitVector
.
clone
.
asInstanceOf
[
BitSet
]
tempHasBlocksBitVector
.
flip
(
0
,
tempHasBlocksBitVector
.
size
)
hasBlocksBitVector
.
synchronized
{
tempHasBlocksBitVector
.
and
(
hasBlocksBitVector
)
}
// Traverse over all the blocks
numCopiesSent
.
synchronized
{
do
{
nextIndex
=
tempHasBlocksBitVector
.
nextSetBit
(
nextIndex
+
1
)
if
(
nextIndex
!=
-
1
&&
numCopiesSent
(
nextIndex
)
<
minCopies
)
{
minCopies
=
numCopiesSent
(
nextIndex
)
numCopiesSent
(
nextIndex
)
=
numCopiesSent
(
nextIndex
)
+
1
blockIndex
=
nextIndex
}
}
while
(
nextIndex
!=
-
1
)
}
if
(
blockIndex
<
0
)
{
logInfo
(
"No block to send..."
)
}
else
{
sendBlock
(
blockIndex
)
}
return
blockIndex
}
}
}
}
}
}
}
...
@@ -1231,12 +1185,8 @@ extends Logging {
...
@@ -1231,12 +1185,8 @@ extends Logging {
private
var
trackMV
:
TrackMultipleValues
=
null
private
var
trackMV
:
TrackMultipleValues
=
null
// newSpeed = ALPHA * oldSpeed + (1 - ALPHA) * curSpeed
private
val
ALPHA
=
0.7
// 125.0 MBps = 1 Gbps link
private
val
MaxMBps_
=
125.0
// A peer syncs back to Guide after waiting randomly within following limits
// A peer syncs back to Guide after waiting randomly within following limits
// Also used thoughout the code for small and large waits/timeouts
private
var
MinKnockInterval_
=
500
private
var
MinKnockInterval_
=
500
private
var
MaxKnockInterval_
=
999
private
var
MaxKnockInterval_
=
999
...
@@ -1316,8 +1266,6 @@ extends Logging {
...
@@ -1316,8 +1266,6 @@ extends Logging {
def
isMaster
=
isMaster_
def
isMaster
=
isMaster_
def
MaxMBps
=
MaxMBps_
def
MinKnockInterval
=
MinKnockInterval_
def
MinKnockInterval
=
MinKnockInterval_
def
MaxKnockInterval
=
MaxKnockInterval_
def
MaxKnockInterval
=
MaxKnockInterval_
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment