Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20386][Spark Core]modify the log info if the block exists on the slave already #17683

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -497,11 +497,18 @@ private[spark] class BlockManagerInfo(

updateLastSeenMs()

var blockExists = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although you could just assign val blockExists = _blocks.containsKey(blockId) once and reuse it instead of a var, it's minor. This looks good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good idea, i have modified it, thank you

var originalMemSize: Long = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why pull out a var here?

var originalDiskSize: Long = 0
var originalLevel: StorageLevel = StorageLevel.NONE

if (_blocks.containsKey(blockId)) {
// The block exists on the slave already.
val blockStatus: BlockStatus = _blocks.get(blockId)
val originalLevel: StorageLevel = blockStatus.storageLevel
val originalMemSize: Long = blockStatus.memSize
originalLevel = blockStatus.storageLevel
originalMemSize = blockStatus.memSize
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BLocks of code even farther down can reuse these new values and need similar changes to the log comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok,thanks for suggestion, I have modified it

originalDiskSize = blockStatus.diskSize
blockExists = true

if (originalLevel.useMemory) {
_remainingMem += originalMemSize
Expand All @@ -520,32 +527,42 @@ private[spark] class BlockManagerInfo(
blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0)
_blocks.put(blockId, blockStatus)
_remainingMem -= memSize
logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
Utils.bytesToString(_remainingMem)))
if (blockExists) {
logInfo(s"Updated $blockId in memory on ${blockManagerId.hostPort}" +
s" (current size: ${Utils.bytesToString(memSize)}," +
s" original size: ${Utils.bytesToString(originalMemSize)}," +
s" free: ${Utils.bytesToString(_remainingMem)})")
} else {
logInfo(s"Added $blockId in memory on ${blockManagerId.hostPort}" +
s" (size: ${Utils.bytesToString(memSize)}, free: ${Utils.bytesToString(_remainingMem)})")
}
}
if (storageLevel.useDisk) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the disk case need a similar treatment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I have modified the disk case, and add a "update" style log info, any other suggestions? thank you

blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize)
_blocks.put(blockId, blockStatus)
logInfo("Added %s on disk on %s (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
if (blockExists) {
logInfo(s"Updated $blockId on disk on ${blockManagerId.hostPort}" +
s" (current size: ${Utils.bytesToString(diskSize)}," +
s" original size: ${Utils.bytesToString(originalDiskSize)})")
} else {
logInfo(s"Added $blockId on disk on ${blockManagerId.hostPort}" +
s" (size: ${Utils.bytesToString(diskSize)})")
}
}
if (!blockId.isBroadcast && blockStatus.isCached) {
_cachedBlocks += blockId
}
} else if (_blocks.containsKey(blockId)) {
} else if (blockExists) {
// If isValid is not true, drop the block.
val blockStatus: BlockStatus = _blocks.get(blockId)
_blocks.remove(blockId)
_cachedBlocks -= blockId
if (blockStatus.storageLevel.useMemory) {
logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize),
Utils.bytesToString(_remainingMem)))
if (originalLevel.useMemory) {
logInfo(s"Removed $blockId on ${blockManagerId.hostPort} in memory" +
s" (size: ${Utils.bytesToString(originalMemSize)}, free: ${Utils.bytesToString(_remainingMem)})")
}
if (blockStatus.storageLevel.useDisk) {
logInfo("Removed %s on %s on disk (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
if (originalLevel.useDisk) {
logInfo(s"Removed $blockId on ${blockManagerId.hostPort} on disk" +
s" (size: ${Utils.bytesToString(originalDiskSize)})")
}
}
}
Expand Down