Browse Source

Small changes after reviews

pull/998/head
ganfra 1 year ago
parent
commit
e9802ffea6
  1. 33
      libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt
  2. 13
      libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt
  3. 4
      libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt
  4. 18
      libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt

33
libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RoomListExtensions.kt

@ -22,6 +22,7 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.catch
import org.matrix.rustcomponents.sdk.RoomList import org.matrix.rustcomponents.sdk.RoomList
import org.matrix.rustcomponents.sdk.RoomListEntriesListener import org.matrix.rustcomponents.sdk.RoomListEntriesListener
import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate import org.matrix.rustcomponents.sdk.RoomListEntriesUpdate
@ -41,15 +42,15 @@ fun RoomList.loadingStateFlow(): Flow<RoomListLoadingState> =
trySendBlocking(state) trySendBlocking(state)
} }
} }
tryOrNull { val result = loadingState(listener)
val result = loadingState(listener) try {
try { send(result.state)
send(result.state) } catch (exception: Exception) {
} catch (exception: Exception) { Timber.d("loadingStateFlow() initialState failed.")
Timber.d("loadingStateFlow() initialState failed.")
}
result.stateStream
} }
result.stateStream
}.catch {
Timber.d(it, "loadingStateFlow() failed")
}.buffer(Channel.UNLIMITED) }.buffer(Channel.UNLIMITED)
fun RoomList.entriesFlow(onInitialList: suspend (List<RoomListEntry>) -> Unit): Flow<List<RoomListEntriesUpdate>> = fun RoomList.entriesFlow(onInitialList: suspend (List<RoomListEntry>) -> Unit): Flow<List<RoomListEntriesUpdate>> =
@ -59,15 +60,15 @@ fun RoomList.entriesFlow(onInitialList: suspend (List<RoomListEntry>) -> Unit):
trySendBlocking(roomEntriesUpdate) trySendBlocking(roomEntriesUpdate)
} }
} }
tryOrNull { val result = entries(listener)
val result = entries(listener) try {
try { onInitialList(result.entries)
onInitialList(result.entries) } catch (exception: Exception) {
} catch (exception: Exception) { Timber.d("entriesFlow() onInitialList failed.")
Timber.d(exception, "entriesFlow() onInitialList failed.")
}
result.entriesStream
} }
result.entriesStream
}.catch {
Timber.d(it, "entriesFlow() failed")
}.buffer(Channel.UNLIMITED) }.buffer(Channel.UNLIMITED)
fun RoomListService.stateFlow(): Flow<RoomListServiceState> = fun RoomListService.stateFlow(): Flow<RoomListServiceState> =

13
libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt

@ -26,10 +26,10 @@ import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.catch
import org.matrix.rustcomponents.sdk.BackPaginationStatus import org.matrix.rustcomponents.sdk.BackPaginationStatus
import org.matrix.rustcomponents.sdk.BackPaginationStatusListener import org.matrix.rustcomponents.sdk.BackPaginationStatusListener
import org.matrix.rustcomponents.sdk.Room import org.matrix.rustcomponents.sdk.Room
import org.matrix.rustcomponents.sdk.RoomTimelineListenerResult
import org.matrix.rustcomponents.sdk.TimelineDiff import org.matrix.rustcomponents.sdk.TimelineDiff
import org.matrix.rustcomponents.sdk.TimelineItem import org.matrix.rustcomponents.sdk.TimelineItem
import org.matrix.rustcomponents.sdk.TimelineListener import org.matrix.rustcomponents.sdk.TimelineListener
@ -42,20 +42,21 @@ internal fun Room.timelineDiffFlow(onInitialList: suspend (List<TimelineItem>) -
trySendBlocking(diff) trySendBlocking(diff)
} }
} }
var result: RoomTimelineListenerResult? = null val roomId = id()
val roomId = tryOrNull { id() }
Timber.d("Open timelineDiffFlow for room $roomId") Timber.d("Open timelineDiffFlow for room $roomId")
val result = addTimelineListener(listener)
try { try {
result = addTimelineListener(listener)
onInitialList(result.items) onInitialList(result.items)
} catch (exception: Exception) { } catch (exception: Exception) {
Timber.d(exception, "Catch failure in timelineDiffFlow of room $roomId") Timber.d(exception, "Catch failure in timelineDiffFlow of room $roomId")
} }
awaitClose { awaitClose {
Timber.d("Close timelineDiffFlow for room $roomId") Timber.d("Close timelineDiffFlow for room $roomId")
result?.itemsStream?.cancelAndDestroy() result.itemsStream.cancelAndDestroy()
result?.items?.destroyAll() result.items.destroyAll()
} }
}.catch {
Timber.d(it, "timelineDiffFlow() failed")
}.buffer(Channel.UNLIMITED) }.buffer(Channel.UNLIMITED)
internal fun Room.backPaginationStatusFlow(): Flow<BackPaginationStatus> = internal fun Room.backPaginationStatusFlow(): Flow<BackPaginationStatus> =

4
libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/CallbackFlow.kt

@ -23,8 +23,8 @@ import org.matrix.rustcomponents.sdk.TaskHandle
internal fun <T> mxCallbackFlow(block: suspend ProducerScope<T>.() -> TaskHandle?) = internal fun <T> mxCallbackFlow(block: suspend ProducerScope<T>.() -> TaskHandle?) =
callbackFlow { callbackFlow {
val token: TaskHandle? = block(this) val taskHandle: TaskHandle? = block(this)
awaitClose { awaitClose {
token?.cancelAndDestroy() taskHandle?.cancelAndDestroy()
} }
} }

18
libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/util/TaskHandle.kt

@ -19,22 +19,22 @@ package io.element.android.libraries.matrix.impl.util
import org.matrix.rustcomponents.sdk.TaskHandle import org.matrix.rustcomponents.sdk.TaskHandle
import java.util.concurrent.CopyOnWriteArraySet import java.util.concurrent.CopyOnWriteArraySet
class TaskHandleBag(private val tokens: MutableSet<TaskHandle> = CopyOnWriteArraySet()) : Set<TaskHandle> by tokens { fun TaskHandle.cancelAndDestroy() {
cancel()
destroy()
}
class TaskHandleBag(private val taskHandles: MutableSet<TaskHandle> = CopyOnWriteArraySet()) : Set<TaskHandle> by taskHandles {
operator fun plusAssign(taskHandle: TaskHandle?) { operator fun plusAssign(taskHandle: TaskHandle?) {
if (taskHandle == null) return if (taskHandle == null) return
tokens += taskHandle taskHandles += taskHandle
} }
fun dispose() { fun dispose() {
tokens.forEach { taskHandles.forEach {
it.cancelAndDestroy() it.cancelAndDestroy()
} }
tokens.clear() taskHandles.clear()
} }
} }
fun TaskHandle.cancelAndDestroy() {
cancel()
destroy()
}

Loading…
Cancel
Save