Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
spark
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
cs525-sp18-g07
spark
Commits
547dcbe4
Commit
547dcbe4
authored
11 years ago
by
Reynold Xin
Browse files
Options
Downloads
Patches
Plain Diff
Cleaned up Scala files in network/netty from Shane's PR.
parent
9e64396c
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
core/src/main/scala/spark/network/netty/ShuffleCopier.scala
+23
-27
23 additions, 27 deletions
core/src/main/scala/spark/network/netty/ShuffleCopier.scala
core/src/main/scala/spark/network/netty/ShuffleSender.scala
+12
-6
12 additions, 6 deletions
core/src/main/scala/spark/network/netty/ShuffleSender.scala
with
35 additions
and
33 deletions
core/src/main/scala/spark/network/netty/ShuffleCopier.scala
+
23
−
27
View file @
547dcbe4
package
spark.network.netty
import
java.util.concurrent.Executors
import
io.netty.buffer.ByteBuf
import
io.netty.channel.ChannelHandlerContext
import
io.netty.channel.ChannelInboundByteHandlerAdapter
import
io.netty.util.CharsetUtil
import
java.util.concurrent.atomic.AtomicInteger
import
java.util.logging.Logger
import
spark.Logging
import
spark.network.ConnectionManagerId
import
java.util.concurrent.Executors
private
[
spark
]
class
ShuffleCopier
extends
Logging
{
def
getBlock
(
cmId
:
ConnectionManagerId
,
blockId
:
String
,
resultCollectCallback
:
(
String
,
Long
,
ByteBuf
)
=>
Unit
)
=
{
def
getBlock
(
cmId
:
ConnectionManagerId
,
blockId
:
String
,
resultCollectCallback
:
(
String
,
Long
,
ByteBuf
)
=>
Unit
)
{
val
handler
=
new
ShuffleClientHandler
(
resultCollectCallback
)
val
handler
=
new
ShuffleCopier
.
ShuffleClientHandler
(
resultCollectCallback
)
val
fc
=
new
FileClient
(
handler
)
fc
.
init
()
fc
.
connect
(
cmId
.
host
,
cmId
.
port
)
...
...
@@ -28,29 +26,28 @@ private[spark] class ShuffleCopier extends Logging {
def
getBlocks
(
cmId
:
ConnectionManagerId
,
blocks
:
Seq
[(
String
,
Long
)],
resultCollectCallback
:
(
String
,
Long
,
ByteBuf
)
=>
Unit
)
=
{
resultCollectCallback
:
(
String
,
Long
,
ByteBuf
)
=>
Unit
)
{
blocks
.
map
{
case
(
blockId
,
size
)
=>
{
getBlock
(
cmId
,
blockId
,
resultCollectCallback
)
}
for
((
blockId
,
size
)
<-
blocks
)
{
getBlock
(
cmId
,
blockId
,
resultCollectCallback
)
}
}
}
private
[
spark
]
class
ShuffleClientHandler
(
val
resultCollectCallBack
:
(
String
,
Long
,
ByteBuf
)
=>
Unit
)
extends
FileClientHandler
with
Logging
{
def
handle
(
ctx
:
ChannelHandlerContext
,
in
:
ByteBuf
,
header
:
FileHeader
)
{
logDebug
(
"Received Block: "
+
header
.
blockId
+
" ("
+
header
.
fileLen
+
"B)"
);
resultCollectCallBack
(
header
.
blockId
,
header
.
fileLen
.
toLong
,
in
.
readBytes
(
header
.
fileLen
))
}
}
private
[
spark
]
object
ShuffleCopier
extends
Logging
{
def
echoResultCollectCallBack
(
blockId
:
String
,
size
:
Long
,
content
:
ByteBuf
)
=
{
logInfo
(
"File: "
+
blockId
+
" content is : \" "
+
content
.
toString
(
CharsetUtil
.
UTF_8
)
+
"\""
)
private
class
ShuffleClientHandler
(
resultCollectCallBack
:
(
String
,
Long
,
ByteBuf
)
=>
Unit
)
extends
FileClientHandler
with
Logging
{
override
def
handle
(
ctx
:
ChannelHandlerContext
,
in
:
ByteBuf
,
header
:
FileHeader
)
{
logDebug
(
"Received Block: "
+
header
.
blockId
+
" ("
+
header
.
fileLen
+
"B)"
);
resultCollectCallBack
(
header
.
blockId
,
header
.
fileLen
.
toLong
,
in
.
readBytes
(
header
.
fileLen
))
}
}
def
echoResultCollectCallBack
(
blockId
:
String
,
size
:
Long
,
content
:
ByteBuf
)
{
logInfo
(
"File: "
+
blockId
+
" content is : \" "
+
content
.
toString
(
CharsetUtil
.
UTF_8
)
+
"\""
)
}
def
runGetBlock
(
host
:
String
,
port
:
Int
,
file
:
String
){
...
...
@@ -71,18 +68,17 @@ private[spark] object ShuffleCopier extends Logging {
val
host
=
args
(
0
)
val
port
=
args
(
1
).
toInt
val
file
=
args
(
2
)
val
threads
=
if
(
args
.
length
>
3
)
args
(
3
).
toInt
else
10
val
threads
=
if
(
args
.
length
>
3
)
args
(
3
).
toInt
else
10
val
copiers
=
Executors
.
newFixedThreadPool
(
80
)
for
(
i
<-
Range
(
0
,
threads
)){
for
(
i
<-
Range
(
0
,
threads
))
{
val
runnable
=
new
Runnable
()
{
def
run
()
{
runGetBlock
(
host
,
port
,
file
)
runGetBlock
(
host
,
port
,
file
)
}
}
copiers
.
execute
(
runnable
)
}
copiers
.
shutdown
}
}
This diff is collapsed.
Click to expand it.
core/src/main/scala/spark/network/netty/ShuffleSender.scala
+
12
−
6
View file @
547dcbe4
package
spark.network.netty
import
spark.Logging
import
java.io.File
import
spark.Logging
private
[
spark
]
class
ShuffleSender
(
val
port
:
Int
,
val
pResolver
:
PathResolver
)
extends
Logging
{
private
[
spark
]
class
ShuffleSender
(
val
port
:
Int
,
val
pResolver
:
PathResolver
)
extends
Logging
{
val
server
=
new
FileServer
(
pResolver
)
Runtime
.
getRuntime
().
addShutdownHook
(
new
Thread
()
{
override
def
run
()
{
...
...
@@ -20,17 +21,22 @@ private[spark] class ShuffleSender(val port: Int, val pResolver:PathResolver) ex
}
}
private
[
spark
]
object
ShuffleSender
{
def
main
(
args
:
Array
[
String
])
{
if
(
args
.
length
<
3
)
{
System
.
err
.
println
(
"Usage: ShuffleSender <port> <subDirsPerLocalDir> <list of shuffle_block_directories>"
)
System
.
err
.
println
(
"Usage: ShuffleSender <port> <subDirsPerLocalDir> <list of shuffle_block_directories>"
)
System
.
exit
(
1
)
}
val
port
=
args
(
0
).
toInt
val
subDirsPerLocalDir
=
args
(
1
).
toInt
val
localDirs
=
args
.
drop
(
2
)
map
{
new
File
(
_
)}
val
localDirs
=
args
.
drop
(
2
).
map
(
new
File
(
_
))
val
pResovler
=
new
PathResolver
{
def
getAbsolutePath
(
blockId
:
String
)
:
String
=
{
override
def
getAbsolutePath
(
blockId
:
String
)
:
String
=
{
if
(!
blockId
.
startsWith
(
"shuffle_"
))
{
throw
new
Exception
(
"Block "
+
blockId
+
" is not a shuffle block"
)
}
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment