Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
spark
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
cs525-sp18-g07
spark
Commits
dd7c5d8e
Commit
dd7c5d8e
authored
14 years ago
by
Matei Zaharia
Browse files
Options
Downloads
Patches
Plain Diff
Added initial attempt at a BoundedMemoryCache
parent
edf86fdb
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
src/scala/spark/BoundedMemoryCache.scala
+69
-0
69 additions, 0 deletions
src/scala/spark/BoundedMemoryCache.scala
with
69 additions
and
0 deletions
src/scala/spark/BoundedMemoryCache.scala
0 → 100644
+
69
−
0
View file @
dd7c5d8e
package
spark
import
java.util.LinkedHashMap
/**
* An implementation of Cache that estimates the sizes of its entries and
* attempts to limit its total memory usage to a fraction of the JVM heap.
* Objects' sizes are estimated using SizeEstimator, which has limitations;
* most notably, we will overestimate total memory used if some cache
* entries have pointers to a shared object. Nonetheless, this Cache should
* work well when most of the space is used by arrays of primitives or of
* simple classes.
*/
class
BoundedMemoryCache
extends
Cache
with
Logging
{
private
val
maxBytes
:
Long
=
getMaxBytes
()
logInfo
(
"BoundedMemoryCache.maxBytes = "
+
maxBytes
)
private
var
currentBytes
=
0L
private
val
map
=
new
LinkedHashMap
[
Any
,
Entry
](
32
,
0.75f
,
true
)
// An entry in our map; stores a cached object and its size in bytes
class
Entry
(
val
value
:
Any
,
val
size
:
Long
)
{}
override
def
get
(
key
:
Any
)
:
Any
=
{
synchronized
{
val
entry
=
map
.
get
(
key
)
if
(
entry
!=
null
)
entry
.
value
else
null
}
}
override
def
put
(
key
:
Any
,
value
:
Any
)
{
logInfo
(
"Asked to add key "
+
key
)
val
startTime
=
System
.
currentTimeMillis
val
size
=
SizeEstimator
.
estimate
(
value
.
asInstanceOf
[
AnyRef
])
val
timeTaken
=
System
.
currentTimeMillis
-
startTime
logInfo
(
"Estimated size for key %s is %d"
.
format
(
key
,
size
))
logInfo
(
"Size estimation for key %s took %d ms"
.
format
(
key
,
timeTaken
))
synchronized
{
ensureFreeSpace
(
size
)
logInfo
(
"Adding key "
+
key
)
map
.
put
(
key
,
new
Entry
(
value
,
size
))
currentBytes
+=
size
logInfo
(
"Number of entries is now "
+
map
.
size
)
}
}
private
def
getMaxBytes
()
:
Long
=
{
val
memoryFractionToUse
=
System
.
getProperty
(
"spark.boundedMemoryCache.memoryFraction"
,
"0.75"
).
toDouble
(
Runtime
.
getRuntime
.
totalMemory
*
memoryFractionToUse
).
toLong
}
/**
* Remove least recently used entries from the map until at least space
* bytes are free. Assumes that a lock is held on the BoundedMemoryCache.
*/
private
def
ensureFreeSpace
(
space
:
Long
)
{
logInfo
(
"ensureFreeSpace(%d) called with curBytes=%d, maxBytes=%d"
.
format
(
space
,
currentBytes
,
maxBytes
))
val
iter
=
map
.
entrySet
.
iterator
while
(
maxBytes
-
currentBytes
<
space
&&
iter
.
hasNext
)
{
val
mapEntry
=
iter
.
next
()
logInfo
(
"Dropping key %s of size %d to make space"
.
format
(
mapEntry
.
getKey
,
mapEntry
.
getValue
.
size
))
currentBytes
-=
mapEntry
.
getValue
.
size
iter
.
remove
()
}
}
}
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment