From e9802ffea6f198ef44d4e7c2620bc4c4e3173cac Mon Sep 17 00:00:00 2001 From: ganfra Date: Mon, 31 Jul 2023 11:36:28 +0200 Subject: [PATCH] Small changes after reviews --- .../matrix/impl/room/RoomListExtensions.kt | 33 ++++++++++--------- .../impl/timeline/RoomTimelineExtensions.kt | 13 ++++---- .../matrix/impl/util/CallbackFlow.kt | 4 +-- .../libraries/matrix/impl/util/TaskHandle.kt | 18 +++++----- 4 files changed, 35 insertions(+), 33 deletions(-) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt index 5d92bfde0b..8e7047aaa4 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt @@ -22,6 +22,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.catch import org.matrix.rustcomponents.sdk.RoomList import org.matrix.rustcomponents.sdk.RoomListEntriesListener import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate @@ -41,15 +42,15 @@ fun RoomList.loadingStateFlow(): Flow = trySendBlocking(state) } } - tryOrNull { - val result = loadingState(listener) - try { - send(result.state) - } catch (exception: Exception) { - Timber.d("loadingStateFlow() initialState failed.") - } - result.stateStream + val result = loadingState(listener) + try { + send(result.state) + } catch (exception: Exception) { + Timber.d("loadingStateFlow() initialState failed.") } + result.stateStream + }.catch { + Timber.d(it, "loadingStateFlow() failed") }.buffer(Channel.UNLIMITED) fun RoomList.entriesFlow(onInitialList: suspend (List) -> Unit): Flow> = @@ -59,15 +60,15 @@ fun RoomList.entriesFlow(onInitialList: suspend (List) -> Unit): trySendBlocking(roomEntriesUpdate) } } - tryOrNull { - val result = entries(listener) - try { - onInitialList(result.entries) - } catch (exception: Exception) { - Timber.d(exception, "entriesFlow() onInitialList failed.") - } - result.entriesStream + val result = entries(listener) + try { + onInitialList(result.entries) + } catch (exception: Exception) { + Timber.d("entriesFlow() onInitialList failed.") } + result.entriesStream + }.catch { + Timber.d(it, "entriesFlow() failed") }.buffer(Channel.UNLIMITED) fun RoomListService.stateFlow(): Flow = diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt index 3c5bb26e30..9f8bccb1d0 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt @@ -26,10 +26,10 @@ import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.catch import org.matrix.rustcomponents.sdk.BackPaginationStatus import org.matrix.rustcomponents.sdk.BackPaginationStatusListener import org.matrix.rustcomponents.sdk.Room -import org.matrix.rustcomponents.sdk.RoomTimelineListenerResult import org.matrix.rustcomponents.sdk.TimelineDiff import org.matrix.rustcomponents.sdk.TimelineItem import org.matrix.rustcomponents.sdk.TimelineListener @@ -42,20 +42,21 @@ internal fun Room.timelineDiffFlow(onInitialList: suspend (List) - trySendBlocking(diff) } } - var result: RoomTimelineListenerResult? = null - val roomId = tryOrNull { id() } + val roomId = id() Timber.d("Open timelineDiffFlow for room $roomId") + val result = addTimelineListener(listener) try { - result = addTimelineListener(listener) onInitialList(result.items) } catch (exception: Exception) { Timber.d(exception, "Catch failure in timelineDiffFlow of room $roomId") } awaitClose { Timber.d("Close timelineDiffFlow for room $roomId") - result?.itemsStream?.cancelAndDestroy() - result?.items?.destroyAll() + result.itemsStream.cancelAndDestroy() + result.items.destroyAll() } + }.catch { + Timber.d(it, "timelineDiffFlow() failed") }.buffer(Channel.UNLIMITED) internal fun Room.backPaginationStatusFlow(): Flow = diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt index 283b5f7076..fbf393e587 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt @@ -23,8 +23,8 @@ import org.matrix.rustcomponents.sdk.TaskHandle internal fun mxCallbackFlow(block: suspend ProducerScope.() -> TaskHandle?) = callbackFlow { - val token: TaskHandle? = block(this) + val taskHandle: TaskHandle? = block(this) awaitClose { - token?.cancelAndDestroy() + taskHandle?.cancelAndDestroy() } } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt index 937e73e72e..5842ba1546 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt @@ -19,22 +19,22 @@ package io.element.android.libraries.matrix.impl.util import org.matrix.rustcomponents.sdk.TaskHandle import java.util.concurrent.CopyOnWriteArraySet -class TaskHandleBag(private val tokens: MutableSet = CopyOnWriteArraySet()) : Set by tokens { +fun TaskHandle.cancelAndDestroy() { + cancel() + destroy() +} + +class TaskHandleBag(private val taskHandles: MutableSet = CopyOnWriteArraySet()) : Set by taskHandles { operator fun plusAssign(taskHandle: TaskHandle?) { if (taskHandle == null) return - tokens += taskHandle + taskHandles += taskHandle } fun dispose() { - tokens.forEach { + taskHandles.forEach { it.cancelAndDestroy() } - tokens.clear() + taskHandles.clear() } } - -fun TaskHandle.cancelAndDestroy() { - cancel() - destroy() -}