diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt index b74ab8968e..8e7047aaa4 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt @@ -16,11 +16,13 @@ package io.element.android.libraries.matrix.impl.room +import io.element.android.libraries.core.data.tryOrNull import io.element.android.libraries.matrix.impl.util.mxCallbackFlow import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.catch import org.matrix.rustcomponents.sdk.RoomList import org.matrix.rustcomponents.sdk.RoomListEntriesListener import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate @@ -41,8 +43,14 @@ fun RoomList.loadingStateFlow(): Flow = } } val result = loadingState(listener) - send(result.state) + try { + send(result.state) + } catch (exception: Exception) { + Timber.d("loadingStateFlow() initialState failed.") + } result.stateStream + }.catch { + Timber.d(it, "loadingStateFlow() failed") }.buffer(Channel.UNLIMITED) fun RoomList.entriesFlow(onInitialList: suspend (List) -> Unit): Flow> = @@ -53,19 +61,16 @@ fun RoomList.entriesFlow(onInitialList: suspend (List) -> Unit): } } val result = entries(listener) - onInitialList(result.entries) + try { + onInitialList(result.entries) + } catch (exception: Exception) { + Timber.d("entriesFlow() onInitialList failed.") + } result.entriesStream + }.catch { + Timber.d(it, "entriesFlow() failed") }.buffer(Channel.UNLIMITED) -fun RoomListService.roomOrNull(roomId: String): RoomListItem? { - return try { - room(roomId) - } catch (exception: Exception) { - Timber.d(exception, "Failed finding room with id=$roomId.") - return null - } -} - fun RoomListService.stateFlow(): Flow = mxCallbackFlow { val listener = object : RoomListServiceStateListener { @@ -73,5 +78,16 @@ fun RoomListService.stateFlow(): Flow = trySendBlocking(state) } } - state(listener) + tryOrNull { + state(listener) + } }.buffer(Channel.UNLIMITED) + +fun RoomListService.roomOrNull(roomId: String): RoomListItem? { + return try { + room(roomId) + } catch (exception: Exception) { + Timber.d(exception, "Failed finding room with id=$roomId.") + return null + } +} diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/sync/SyncServiceExtension.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/sync/SyncServiceExtension.kt index 36dabb71f3..a8e366ff7b 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/sync/SyncServiceExtension.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/sync/SyncServiceExtension.kt @@ -16,6 +16,7 @@ package io.element.android.libraries.matrix.impl.sync +import io.element.android.libraries.core.data.tryOrNull import io.element.android.libraries.matrix.impl.util.mxCallbackFlow import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.trySendBlocking @@ -32,5 +33,7 @@ fun SyncService.stateFlow(): Flow = trySendBlocking(state) } } - state(listener) + tryOrNull { + state(listener) + } }.buffer(Channel.UNLIMITED) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt index 29b75a1dca..9f8bccb1d0 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt @@ -16,6 +16,8 @@ package io.element.android.libraries.matrix.impl.timeline +import io.element.android.libraries.core.data.tryOrNull +import io.element.android.libraries.matrix.impl.util.cancelAndDestroy import io.element.android.libraries.matrix.impl.util.destroyAll import io.element.android.libraries.matrix.impl.util.mxCallbackFlow import kotlinx.coroutines.channels.Channel @@ -24,10 +26,10 @@ import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.catch import org.matrix.rustcomponents.sdk.BackPaginationStatus import org.matrix.rustcomponents.sdk.BackPaginationStatusListener import org.matrix.rustcomponents.sdk.Room -import org.matrix.rustcomponents.sdk.RoomTimelineListenerResult import org.matrix.rustcomponents.sdk.TimelineDiff import org.matrix.rustcomponents.sdk.TimelineItem import org.matrix.rustcomponents.sdk.TimelineListener @@ -35,26 +37,26 @@ import timber.log.Timber internal fun Room.timelineDiffFlow(onInitialList: suspend (List) -> Unit): Flow = callbackFlow { - val roomId = id() - Timber.d("Open timelineDiffFlow for room $roomId") val listener = object : TimelineListener { override fun onUpdate(diff: TimelineDiff) { trySendBlocking(diff) } } - var result: RoomTimelineListenerResult? = null + val roomId = id() + Timber.d("Open timelineDiffFlow for room $roomId") + val result = addTimelineListener(listener) try { - result = addTimelineListener(listener) onInitialList(result.items) } catch (exception: Exception) { Timber.d(exception, "Catch failure in timelineDiffFlow of room $roomId") } awaitClose { Timber.d("Close timelineDiffFlow for room $roomId") - result?.itemsStream?.cancel() - result?.itemsStream?.destroy() - result?.items?.destroyAll() + result.itemsStream.cancelAndDestroy() + result.items.destroyAll() } + }.catch { + Timber.d(it, "timelineDiffFlow() failed") }.buffer(Channel.UNLIMITED) internal fun Room.backPaginationStatusFlow(): Flow = @@ -64,5 +66,7 @@ internal fun Room.backPaginationStatusFlow(): Flow = trySendBlocking(status) } } - subscribeToBackPaginationStatus(listener) + tryOrNull { + subscribeToBackPaginationStatus(listener) + } }.buffer(Channel.UNLIMITED) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt index a347973e89..fbf393e587 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt @@ -21,11 +21,10 @@ import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.flow.callbackFlow import org.matrix.rustcomponents.sdk.TaskHandle -internal fun mxCallbackFlow(block: suspend ProducerScope.() -> TaskHandle) = +internal fun mxCallbackFlow(block: suspend ProducerScope.() -> TaskHandle?) = callbackFlow { - val token: TaskHandle = block(this) + val taskHandle: TaskHandle? = block(this) awaitClose { - token.cancel() - token.destroy() + taskHandle?.cancelAndDestroy() } } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandleBag.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt similarity index 73% rename from libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandleBag.kt rename to libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt index 9a21645351..5842ba1546 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandleBag.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt @@ -19,18 +19,22 @@ package io.element.android.libraries.matrix.impl.util import org.matrix.rustcomponents.sdk.TaskHandle import java.util.concurrent.CopyOnWriteArraySet -class TaskHandleBag(private val tokens: MutableSet = CopyOnWriteArraySet()) : Set by tokens { +fun TaskHandle.cancelAndDestroy() { + cancel() + destroy() +} + +class TaskHandleBag(private val taskHandles: MutableSet = CopyOnWriteArraySet()) : Set by taskHandles { operator fun plusAssign(taskHandle: TaskHandle?) { if (taskHandle == null) return - tokens += taskHandle + taskHandles += taskHandle } fun dispose() { - tokens.forEach { - it.cancel() - it.destroy() + taskHandles.forEach { + it.cancelAndDestroy() } - tokens.clear() + taskHandles.clear() } }