Skip to content

Commit

Permalink
Fix #667
Browse files Browse the repository at this point in the history
Signed-off-by: matt-ramotar <[email protected]>
  • Loading branch information
matt-ramotar committed Nov 18, 2024
1 parent 1ea394e commit 8a7cefa
Showing 1 changed file with 34 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,15 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
}

is EagerConflictResolutionResult.Success.ConflictsResolved -> {
logger.debug(eagerConflictResolutionResult.value.toString())
val message = when (val result = eagerConflictResolutionResult.value) {
is UpdaterResult.Success.Typed<*> -> result.value.toString()
is UpdaterResult.Success.Untyped -> result.value.toString()
}
logger.debug(message)
}

EagerConflictResolutionResult.Success.NoConflicts -> {
logger.debug(eagerConflictResolutionResult.toString())
logger.debug("No conflicts.")
}
}

Expand Down Expand Up @@ -225,48 +229,46 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
}

@AnyThread
private suspend fun <Response : Any> tryEagerlyResolveConflicts(key: Key): EagerConflictResolutionResult<Response> =
withThreadSafety(key) {
private suspend fun <Response : Any> tryEagerlyResolveConflicts(key: Key): EagerConflictResolutionResult<Response> {

val (latest, conflictsExist) = withThreadSafety(key) {
val latest = delegate.latestOrNull(key)
when {
latest == null || bookkeeper == null || conflictsMightExist(key).not() -> EagerConflictResolutionResult.Success.NoConflicts
else -> {
try {
val updaterResult =
updater.post(key, latest).also { updaterResult ->
if (updaterResult is UpdaterResult.Success) {
updateWriteRequestQueue<Response>(key = key, created = now(), updaterResult = updaterResult)
}
}
val conflictsExist = latest != null && bookkeeper != null && conflictsMightExist(key)
latest to conflictsExist
}

when (updaterResult) {
is UpdaterResult.Error.Exception -> EagerConflictResolutionResult.Error.Exception(updaterResult.error)
is UpdaterResult.Error.Message -> EagerConflictResolutionResult.Error.Message(updaterResult.message)
is UpdaterResult.Success -> EagerConflictResolutionResult.Success.ConflictsResolved(updaterResult)
}
} catch (throwable: Throwable) {
EagerConflictResolutionResult.Error.Exception(throwable)
if (!conflictsExist || latest == null) {
return EagerConflictResolutionResult.Success.NoConflicts
}


return try {
val updaterResult =
updater.post(key, latest).also { updaterResult ->
if (updaterResult is UpdaterResult.Success) {
updateWriteRequestQueue<Response>(key = key, created = now(), updaterResult = updaterResult)
bookkeeper?.clear(key)
}
}
}
}

private suspend fun safeInitWriteRequestQueue(key: Key) =
withThreadSafety(key) {
if (keyToWriteRequestQueue[key] == null) {
keyToWriteRequestQueue[key] = ArrayDeque()
when (updaterResult) {
is UpdaterResult.Error.Exception -> EagerConflictResolutionResult.Error.Exception(updaterResult.error)
is UpdaterResult.Error.Message -> EagerConflictResolutionResult.Error.Message(updaterResult.message)
is UpdaterResult.Success -> EagerConflictResolutionResult.Success.ConflictsResolved(updaterResult)
}
} catch (throwable: Throwable) {
EagerConflictResolutionResult.Error.Exception(throwable)
}
}

private suspend fun safeInitThreadSafety(key: Key) =
private suspend fun safeInitStore(key: Key) {
storeLock.withLock {
if (keyToThreadSafety[key] == null) {
keyToThreadSafety[key] = ThreadSafety()
}
if(keyToWriteRequestQueue[key] == null) {
keyToWriteRequestQueue[key] = ArrayDeque()
}
}

private suspend fun safeInitStore(key: Key) {
safeInitThreadSafety(key)
safeInitWriteRequestQueue(key)
}
}

0 comments on commit 8a7cefa

Please sign in to comment.