Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
spark
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
cs525-sp18-g07
spark
Commits
56524587
Commit
56524587
authored
12 years ago
by
Richard Benkovsky
Browse files
Options
Downloads
Patches
Plain Diff
BoundedMemoryCache.put fails when estimated size of 'value' is larger than cache capacity
parent
10716b17
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
core/src/main/scala/spark/BoundedMemoryCache.scala
+34
-17
34 additions, 17 deletions
core/src/main/scala/spark/BoundedMemoryCache.scala
core/src/test/scala/spark/BoundedMemoryCacheTest.scala
+31
-0
31 additions, 0 deletions
core/src/test/scala/spark/BoundedMemoryCacheTest.scala
with
65 additions
and
17 deletions
core/src/main/scala/spark/BoundedMemoryCache.scala
+
34
−
17
View file @
56524587
...
...
@@ -9,16 +9,16 @@ import java.util.LinkedHashMap
* some cache entries have pointers to a shared object. Nonetheless, this Cache should work well
* when most of the space is used by arrays of primitives or of simple classes.
*/
class
BoundedMemoryCache
extends
Cache
with
Logging
{
private
val
maxBytes
:
Long
=
getMaxBytes
()
class
BoundedMemoryCache
(
maxBytes
:
Long
)
extends
Cache
with
Logging
{
logInfo
(
"BoundedMemoryCache.maxBytes = "
+
maxBytes
)
def
this
()
{
this
(
BoundedMemoryCache
.
getMaxBytes
)
}
private
var
currentBytes
=
0L
private
val
map
=
new
LinkedHashMap
[(
Any
,
Int
)
,
Entry
](
32
,
0.75f
,
true
)
// An entry in our map; stores a cached object and its size in bytes
class
Entry
(
val
value
:
Any
,
val
size
:
Long
)
{}
override
def
get
(
datasetId
:
Any
,
partition
:
Int
)
:
Any
=
{
synchronized
{
val
entry
=
map
.
get
((
datasetId
,
partition
))
...
...
@@ -33,13 +33,11 @@ class BoundedMemoryCache extends Cache with Logging {
override
def
put
(
datasetId
:
Any
,
partition
:
Int
,
value
:
Any
)
:
CachePutResponse
=
{
val
key
=
(
datasetId
,
partition
)
logInfo
(
"Asked to add key "
+
key
)
val
startTime
=
System
.
currentTimeMillis
val
size
=
SizeEstimator
.
estimate
(
value
.
asInstanceOf
[
AnyRef
])
val
timeTaken
=
System
.
currentTimeMillis
-
startTime
logInfo
(
"Estimated size for key %s is %d"
.
format
(
key
,
size
))
logInfo
(
"Size estimation for key %s took %d ms"
.
format
(
key
,
timeTaken
))
val
size
=
estimateValueSize
(
key
,
value
)
synchronized
{
if
(
ensureFreeSpace
(
datasetId
,
size
))
{
if
(
size
>
getCapacity
)
{
return
CachePutFailure
()
}
else
if
(
ensureFreeSpace
(
datasetId
,
size
))
{
logInfo
(
"Adding key "
+
key
)
map
.
put
(
key
,
new
Entry
(
value
,
size
))
currentBytes
+=
size
...
...
@@ -54,10 +52,16 @@ class BoundedMemoryCache extends Cache with Logging {
override
def
getCapacity
:
Long
=
maxBytes
private
def
getMaxBytes
()
:
Long
=
{
val
memoryFractionToUse
=
System
.
getProperty
(
"spark.boundedMemoryCache.memoryFraction"
,
"0.66"
).
toDouble
(
Runtime
.
getRuntime
.
maxMemory
*
memoryFractionToUse
).
toLong
/**
* Estimate sizeOf 'value'
*/
private
def
estimateValueSize
(
key
:
(
Any
,
Int
),
value
:
Any
)
=
{
val
startTime
=
System
.
currentTimeMillis
val
size
=
SizeEstimator
.
estimate
(
value
.
asInstanceOf
[
AnyRef
])
val
timeTaken
=
System
.
currentTimeMillis
-
startTime
logInfo
(
"Estimated size for key %s is %d"
.
format
(
key
,
size
))
logInfo
(
"Size estimation for key %s took %d ms"
.
format
(
key
,
timeTaken
))
size
}
/**
...
...
@@ -85,8 +89,21 @@ class BoundedMemoryCache extends Cache with Logging {
}
protected
def
reportEntryDropped
(
datasetId
:
Any
,
partition
:
Int
,
entry
:
Entry
)
{
logInfo
(
"Dropping key (%s, %d) of size %d to make space"
.
format
(
datasetId
,
partition
,
entry
.
size
))
logInfo
(
"Dropping key (%s, %d) of size %d to make space"
.
format
(
datasetId
,
partition
,
entry
.
size
))
SparkEnv
.
get
.
cacheTracker
.
dropEntry
(
datasetId
,
partition
)
}
}
// An entry in our map; stores a cached object and its size in bytes
case
class
Entry
(
value
:
Any
,
size
:
Long
)
object
BoundedMemoryCache
{
/**
* Get maximum cache capacity from system configuration
*/
def
getMaxBytes
:
Long
=
{
val
memoryFractionToUse
=
System
.
getProperty
(
"spark.boundedMemoryCache.memoryFraction"
,
"0.66"
).
toDouble
(
Runtime
.
getRuntime
.
maxMemory
*
memoryFractionToUse
).
toLong
}
}
This diff is collapsed.
Click to expand it.
core/src/test/scala/spark/BoundedMemoryCacheTest.scala
0 → 100644
+
31
−
0
View file @
56524587
package
spark
import
org.scalatest.FunSuite
class
BoundedMemoryCacheTest
extends
FunSuite
{
test
(
"constructor test"
)
{
val
cache
=
new
BoundedMemoryCache
(
40
)
expect
(
40
)(
cache
.
getCapacity
)
}
test
(
"caching"
)
{
val
cache
=
new
BoundedMemoryCache
(
40
)
{
//TODO sorry about this, but there is not better way how to skip 'cacheTracker.dropEntry'
override
protected
def
reportEntryDropped
(
datasetId
:
Any
,
partition
:
Int
,
entry
:
Entry
)
{
logInfo
(
"Dropping key (%s, %d) of size %d to make space"
.
format
(
datasetId
,
partition
,
entry
.
size
))
}
}
//should be OK
expect
(
CachePutSuccess
(
30
))(
cache
.
put
(
"1"
,
0
,
"Meh"
))
//we cannot add this to cache (there is not enough space in cache) & we cannot evict the only value from
//cache because it's from the same dataset
expect
(
CachePutFailure
())(
cache
.
put
(
"1"
,
1
,
"Meh"
))
//should be OK, dataset '1' can be evicted from cache
expect
(
CachePutSuccess
(
30
))(
cache
.
put
(
"2"
,
0
,
"Meh"
))
//should fail, cache should obey it's capacity
expect
(
CachePutFailure
())(
cache
.
put
(
"3"
,
0
,
"Very_long_and_useless_string"
))
}
}
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment