Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
spark
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
cs525-sp18-g07
spark
Commits
c9cad03c
Commit
c9cad03c
authored
14 years ago
by
Mosharaf Chowdhury
Browse files
Options
Downloads
Patches
Plain Diff
- Using the new Cache implementation.
- Removed unused code related to dualMode (deprecated).
parent
73714da5
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
src/scala/spark/Broadcast.scala
+1
-1
1 addition, 1 deletion
src/scala/spark/Broadcast.scala
src/scala/spark/ChainedBroadcast.scala
+4
-6
4 additions, 6 deletions
src/scala/spark/ChainedBroadcast.scala
src/scala/spark/DfsBroadcast.scala
+1
-3
1 addition, 3 deletions
src/scala/spark/DfsBroadcast.scala
with
6 additions
and
10 deletions
src/scala/spark/Broadcast.scala
+
1
−
1
View file @
c9cad03c
...
@@ -67,7 +67,7 @@ extends Logging {
...
@@ -67,7 +67,7 @@ extends Logging {
@serializable
@serializable
case
class
SourceInfo
(
val
hostAddress
:
String
,
val
listenPort
:
Int
,
case
class
SourceInfo
(
val
hostAddress
:
String
,
val
listenPort
:
Int
,
val
totalBlocks
:
Int
,
val
totalBytes
:
Int
,
val
replicaID
:
Int
)
val
totalBlocks
:
Int
,
val
totalBytes
:
Int
)
extends
Comparable
[
SourceInfo
]
with
Logging
{
extends
Comparable
[
SourceInfo
]
with
Logging
{
var
currentLeechers
=
0
var
currentLeechers
=
0
...
...
This diff is collapsed.
Click to expand it.
src/scala/spark/ChainedBroadcast.scala
+
4
−
6
View file @
c9cad03c
package
spark
package
spark
import
com.google.common.collect.MapMaker
import
java.io._
import
java.io._
import
java.net._
import
java.net._
import
java.util.
{
Comparator
,
PriorityQueue
,
Random
,
UUID
}
import
java.util.
{
Comparator
,
PriorityQueue
,
Random
,
UUID
}
...
@@ -84,7 +82,7 @@ extends Broadcast with Logging {
...
@@ -84,7 +82,7 @@ extends Broadcast with Logging {
pqOfSources
=
new
PriorityQueue
[
SourceInfo
]
pqOfSources
=
new
PriorityQueue
[
SourceInfo
]
val
masterSource_0
=
val
masterSource_0
=
SourceInfo
(
hostAddress
,
listenPort
,
totalBlocks
,
totalBytes
,
0
)
SourceInfo
(
hostAddress
,
listenPort
,
totalBlocks
,
totalBytes
)
pqOfSources
.
add
(
masterSource_0
)
pqOfSources
.
add
(
masterSource_0
)
// Register with the Tracker
// Register with the Tracker
...
@@ -288,7 +286,7 @@ extends Broadcast with Logging {
...
@@ -288,7 +286,7 @@ extends Broadcast with Logging {
logInfo
(
"Connected to Master's guiding object"
)
logInfo
(
"Connected to Master's guiding object"
)
// Send local source information
// Send local source information
oosMaster
.
writeObject
(
SourceInfo
(
hostAddress
,
listenPort
,
-
1
,
-
1
,
0
))
oosMaster
.
writeObject
(
SourceInfo
(
hostAddress
,
listenPort
,
-
1
,
-
1
))
oosMaster
.
flush
oosMaster
.
flush
// Receive source information from Master
// Receive source information from Master
...
@@ -520,7 +518,7 @@ extends Broadcast with Logging {
...
@@ -520,7 +518,7 @@ extends Broadcast with Logging {
// Add this new (if it can finish) source to the PQ of sources
// Add this new (if it can finish) source to the PQ of sources
thisWorkerInfo
=
SourceInfo
(
sourceInfo
.
hostAddress
,
thisWorkerInfo
=
SourceInfo
(
sourceInfo
.
hostAddress
,
sourceInfo
.
listenPort
,
totalBlocks
,
totalBytes
,
0
)
sourceInfo
.
listenPort
,
totalBlocks
,
totalBytes
)
logInfo
(
"Adding possible new source to pqOfSources: "
+
thisWorkerInfo
)
logInfo
(
"Adding possible new source to pqOfSources: "
+
thisWorkerInfo
)
pqOfSources
.
add
(
thisWorkerInfo
)
pqOfSources
.
add
(
thisWorkerInfo
)
}
}
...
@@ -713,7 +711,7 @@ extends Broadcast with Logging {
...
@@ -713,7 +711,7 @@ extends Broadcast with Logging {
private
object
ChainedBroadcast
private
object
ChainedBroadcast
extends
Logging
{
extends
Logging
{
val
values
=
new
MapMaker
().
softValues
().
makeMap
[
UUID
,
Any
]
val
values
=
Cache
.
newKeySpace
()
var
valueToGuidePortMap
=
Map
[
UUID
,
Int
]
()
var
valueToGuidePortMap
=
Map
[
UUID
,
Int
]
()
...
...
This diff is collapsed.
Click to expand it.
src/scala/spark/DfsBroadcast.scala
+
1
−
3
View file @
c9cad03c
package
spark
package
spark
import
com.google.common.collect.MapMaker
import
java.io._
import
java.io._
import
java.net._
import
java.net._
import
java.util.UUID
import
java.util.UUID
...
@@ -56,7 +54,7 @@ extends Broadcast with Logging {
...
@@ -56,7 +54,7 @@ extends Broadcast with Logging {
private
object
DfsBroadcast
private
object
DfsBroadcast
extends
Logging
{
extends
Logging
{
val
values
=
new
MapMaker
().
softValues
().
makeMap
[
UUID
,
Any
]
val
values
=
Cache
.
newKeySpace
()
private
var
initialized
=
false
private
var
initialized
=
false
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment