Browse Source

Merge pull request #998 from vector-im/feature/fga/safer_callback_flows

No crash when room is already destroyed...
pull/1003/head
ganfra 1 year ago committed by GitHub
parent
commit
fb2b548be9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 34
      libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt
  2. 3
      libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/sync/SyncServiceExtension.kt
  3. 20
      libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt
  4. 7
      libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt
  5. 16
      libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt

34
libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt

@ -16,11 +16,13 @@ @@ -16,11 +16,13 @@
package io.element.android.libraries.matrix.impl.room
import io.element.android.libraries.core.data.tryOrNull
import io.element.android.libraries.matrix.impl.util.mxCallbackFlow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.catch
import org.matrix.rustcomponents.sdk.RoomList
import org.matrix.rustcomponents.sdk.RoomListEntriesListener
import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate
@ -41,8 +43,14 @@ fun RoomList.loadingStateFlow(): Flow<RoomListLoadingState> = @@ -41,8 +43,14 @@ fun RoomList.loadingStateFlow(): Flow<RoomListLoadingState> =
}
}
val result = loadingState(listener)
try {
send(result.state)
} catch (exception: Exception) {
Timber.d("loadingStateFlow() initialState failed.")
}
result.stateStream
}.catch {
Timber.d(it, "loadingStateFlow() failed")
}.buffer(Channel.UNLIMITED)
fun RoomList.entriesFlow(onInitialList: suspend (List<RoomListEntry>) -> Unit): Flow<List<RoomListEntriesUpdate>> =
@ -53,18 +61,15 @@ fun RoomList.entriesFlow(onInitialList: suspend (List<RoomListEntry>) -> Unit): @@ -53,18 +61,15 @@ fun RoomList.entriesFlow(onInitialList: suspend (List<RoomListEntry>) -> Unit):
}
}
val result = entries(listener)
try {
onInitialList(result.entries)
result.entriesStream
}.buffer(Channel.UNLIMITED)
fun RoomListService.roomOrNull(roomId: String): RoomListItem? {
return try {
room(roomId)
} catch (exception: Exception) {
Timber.d(exception, "Failed finding room with id=$roomId.")
return null
Timber.d("entriesFlow() onInitialList failed.")
}
}
result.entriesStream
}.catch {
Timber.d(it, "entriesFlow() failed")
}.buffer(Channel.UNLIMITED)
fun RoomListService.stateFlow(): Flow<RoomListServiceState> =
mxCallbackFlow {
@ -73,5 +78,16 @@ fun RoomListService.stateFlow(): Flow<RoomListServiceState> = @@ -73,5 +78,16 @@ fun RoomListService.stateFlow(): Flow<RoomListServiceState> =
trySendBlocking(state)
}
}
tryOrNull {
state(listener)
}
}.buffer(Channel.UNLIMITED)
fun RoomListService.roomOrNull(roomId: String): RoomListItem? {
return try {
room(roomId)
} catch (exception: Exception) {
Timber.d(exception, "Failed finding room with id=$roomId.")
return null
}
}

3
libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/sync/SyncServiceExtension.kt

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
package io.element.android.libraries.matrix.impl.sync
import io.element.android.libraries.core.data.tryOrNull
import io.element.android.libraries.matrix.impl.util.mxCallbackFlow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.trySendBlocking
@ -32,5 +33,7 @@ fun SyncService.stateFlow(): Flow<SyncServiceState> = @@ -32,5 +33,7 @@ fun SyncService.stateFlow(): Flow<SyncServiceState> =
trySendBlocking(state)
}
}
tryOrNull {
state(listener)
}
}.buffer(Channel.UNLIMITED)

20
libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt

@ -16,6 +16,8 @@ @@ -16,6 +16,8 @@
package io.element.android.libraries.matrix.impl.timeline
import io.element.android.libraries.core.data.tryOrNull
import io.element.android.libraries.matrix.impl.util.cancelAndDestroy
import io.element.android.libraries.matrix.impl.util.destroyAll
import io.element.android.libraries.matrix.impl.util.mxCallbackFlow
import kotlinx.coroutines.channels.Channel
@ -24,10 +26,10 @@ import kotlinx.coroutines.channels.trySendBlocking @@ -24,10 +26,10 @@ import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.catch
import org.matrix.rustcomponents.sdk.BackPaginationStatus
import org.matrix.rustcomponents.sdk.BackPaginationStatusListener
import org.matrix.rustcomponents.sdk.Room
import org.matrix.rustcomponents.sdk.RoomTimelineListenerResult
import org.matrix.rustcomponents.sdk.TimelineDiff
import org.matrix.rustcomponents.sdk.TimelineItem
import org.matrix.rustcomponents.sdk.TimelineListener
@ -35,26 +37,26 @@ import timber.log.Timber @@ -35,26 +37,26 @@ import timber.log.Timber
internal fun Room.timelineDiffFlow(onInitialList: suspend (List<TimelineItem>) -> Unit): Flow<TimelineDiff> =
callbackFlow {
val roomId = id()
Timber.d("Open timelineDiffFlow for room $roomId")
val listener = object : TimelineListener {
override fun onUpdate(diff: TimelineDiff) {
trySendBlocking(diff)
}
}
var result: RoomTimelineListenerResult? = null
val roomId = id()
Timber.d("Open timelineDiffFlow for room $roomId")
val result = addTimelineListener(listener)
try {
result = addTimelineListener(listener)
onInitialList(result.items)
} catch (exception: Exception) {
Timber.d(exception, "Catch failure in timelineDiffFlow of room $roomId")
}
awaitClose {
Timber.d("Close timelineDiffFlow for room $roomId")
result?.itemsStream?.cancel()
result?.itemsStream?.destroy()
result?.items?.destroyAll()
result.itemsStream.cancelAndDestroy()
result.items.destroyAll()
}
}.catch {
Timber.d(it, "timelineDiffFlow() failed")
}.buffer(Channel.UNLIMITED)
internal fun Room.backPaginationStatusFlow(): Flow<BackPaginationStatus> =
@ -64,5 +66,7 @@ internal fun Room.backPaginationStatusFlow(): Flow<BackPaginationStatus> = @@ -64,5 +66,7 @@ internal fun Room.backPaginationStatusFlow(): Flow<BackPaginationStatus> =
trySendBlocking(status)
}
}
tryOrNull {
subscribeToBackPaginationStatus(listener)
}
}.buffer(Channel.UNLIMITED)

7
libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt

@ -21,11 +21,10 @@ import kotlinx.coroutines.channels.awaitClose @@ -21,11 +21,10 @@ import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.callbackFlow
import org.matrix.rustcomponents.sdk.TaskHandle
internal fun <T> mxCallbackFlow(block: suspend ProducerScope<T>.() -> TaskHandle) =
internal fun <T> mxCallbackFlow(block: suspend ProducerScope<T>.() -> TaskHandle?) =
callbackFlow {
val token: TaskHandle = block(this)
val taskHandle: TaskHandle? = block(this)
awaitClose {
token.cancel()
token.destroy()
taskHandle?.cancelAndDestroy()
}
}

16
libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandleBag.kt → libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt

@ -19,18 +19,22 @@ package io.element.android.libraries.matrix.impl.util @@ -19,18 +19,22 @@ package io.element.android.libraries.matrix.impl.util
import org.matrix.rustcomponents.sdk.TaskHandle
import java.util.concurrent.CopyOnWriteArraySet
class TaskHandleBag(private val tokens: MutableSet<TaskHandle> = CopyOnWriteArraySet()) : Set<TaskHandle> by tokens {
fun TaskHandle.cancelAndDestroy() {
cancel()
destroy()
}
class TaskHandleBag(private val taskHandles: MutableSet<TaskHandle> = CopyOnWriteArraySet()) : Set<TaskHandle> by taskHandles {
operator fun plusAssign(taskHandle: TaskHandle?) {
if (taskHandle == null) return
tokens += taskHandle
taskHandles += taskHandle
}
fun dispose() {
tokens.forEach {
it.cancel()
it.destroy()
taskHandles.forEach {
it.cancelAndDestroy()
}
tokens.clear()
taskHandles.clear()
}
}
Loading…
Cancel
Save