Skip to content

Commit

Permalink
[SPARK-50481][CORE] Improve SortShuffleManager.unregisterShuffle to…
Browse files Browse the repository at this point in the history
… skip checksum file logic if checksum is disabled

### What changes were proposed in this pull request?

This PR aims to improve `SortShuffleManager.unregisterShuffle` to skip checksum file logic if checksum is disabled.

### Why are the changes needed?

`SortShuffleManager.unregisterShuffle` depends on `IndexShuffleBlockResolver.removeDataByMap`.

https://github.com/apache/spark/blob/7b974ca758961668a26a1d0c60c91614dac38742/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala#L185-L192

It always tries to check and delete the checksum files even when they doesn't exists.

https://github.com/apache/spark/blob/7b974ca758961668a26a1d0c60c91614dac38742/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala#L198-L201

This PR aims to improve Spark by removing these operations when `checksum` is disabled.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49046 from dongjoon-hyun/SPARK-50481.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
dongjoon-hyun authored and HyukjinKwon committed Dec 4, 2024
1 parent ecc33d2 commit 13315ee
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ private[spark] class IndexShuffleBlockResolver(
private val remoteShuffleMaxDisk: Option[Long] =
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE)

private val checksumEnabled = conf.get(config.SHUFFLE_CHECKSUM_ENABLED)
private lazy val algorithm = conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM)

def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, mapId, None)

/**
Expand Down Expand Up @@ -195,9 +198,11 @@ private[spark] class IndexShuffleBlockResolver(
logWarning(log"Error deleting index ${MDC(PATH, file.getPath())}")
}

file = getChecksumFile(shuffleId, mapId, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM))
if (file.exists() && !file.delete()) {
logWarning(log"Error deleting checksum ${MDC(PATH, file.getPath())}")
if (checksumEnabled) {
file = getChecksumFile(shuffleId, mapId, algorithm)
if (file.exists() && !file.delete()) {
logWarning(log"Error deleting checksum ${MDC(PATH, file.getPath())}")
}
}
}

Expand Down Expand Up @@ -396,8 +401,7 @@ private[spark] class IndexShuffleBlockResolver(
val (checksumFileOpt, checksumTmpOpt) = if (checksumEnabled) {
assert(lengths.length == checksums.length,
"The size of partition lengths and checksums should be equal")
val checksumFile =
getChecksumFile(shuffleId, mapId, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM))
val checksumFile = getChecksumFile(shuffleId, mapId, algorithm)
(Some(checksumFile), Some(createTempFile(checksumFile)))
} else {
(None, None)
Expand Down

0 comments on commit 13315ee

Please sign in to comment.