diff --git a/appnav/src/main/kotlin/io/element/android/appnav/room/RoomLoadedFlowNode.kt b/appnav/src/main/kotlin/io/element/android/appnav/room/RoomLoadedFlowNode.kt index 73a8579b07..24ec9795f7 100644 --- a/appnav/src/main/kotlin/io/element/android/appnav/room/RoomLoadedFlowNode.kt +++ b/appnav/src/main/kotlin/io/element/android/appnav/room/RoomLoadedFlowNode.kt @@ -20,6 +20,7 @@ import android.os.Parcelable import androidx.compose.runtime.Composable import androidx.compose.runtime.DisposableEffect import androidx.compose.ui.Modifier +import androidx.lifecycle.Lifecycle import androidx.lifecycle.lifecycleScope import com.bumble.appyx.core.composable.Children import com.bumble.appyx.core.lifecycle.subscribe @@ -161,13 +162,16 @@ class RoomLoadedFlowNode @AssistedInject constructor( @Composable override fun View(modifier: Modifier) { - // Rely on the View Lifecycle instead of the Node Lifecycle, + // Rely on the View Lifecycle in addition to the Node Lifecycle, // because this node enters 'onDestroy' before his children, so it can leads to // using the room in a child node where it's already closed. DisposableEffect(Unit) { - inputs.room.open() + inputs.room.subscribeToSync() onDispose { - inputs.room.close() + inputs.room.unsubscribeFromSync() + if (lifecycle.currentState == Lifecycle.State.DESTROYED) { + inputs.room.destroy() + } } } Children( diff --git a/libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/room/MatrixRoom.kt b/libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/room/MatrixRoom.kt index be0ff447b3..1445f85228 100644 --- a/libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/room/MatrixRoom.kt +++ b/libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/room/MatrixRoom.kt @@ -63,7 +63,11 @@ interface MatrixRoom : Closeable { val timeline: MatrixTimeline - fun open(): Result + fun destroy() + + fun subscribeToSync() + + fun unsubscribeFromSync() suspend fun userDisplayName(userId: UserId): Result @@ -133,6 +137,8 @@ interface MatrixRoom : Closeable { zoomLevel: Int? = null, assetType: AssetType? = null, ): Result + + override fun close() = destroy() } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt index 16434aa3c8..89e83b58c6 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt @@ -40,9 +40,6 @@ import io.element.android.libraries.matrix.impl.core.toProgressWatcher import io.element.android.libraries.matrix.impl.media.map import io.element.android.libraries.matrix.impl.room.location.toInner import io.element.android.libraries.matrix.impl.timeline.RustMatrixTimeline -import io.element.android.libraries.matrix.impl.timeline.backPaginationStatusFlow -import io.element.android.libraries.matrix.impl.timeline.eventOrigin -import io.element.android.libraries.matrix.impl.timeline.timelineDiffFlow import io.element.android.libraries.sessionstorage.api.SessionData import io.element.android.services.toolbox.api.systemclock.SystemClock import kotlinx.coroutines.CoroutineScope @@ -51,11 +48,7 @@ import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch import kotlinx.coroutines.withContext -import org.matrix.rustcomponents.sdk.EventItemOrigin import org.matrix.rustcomponents.sdk.RequiredState import org.matrix.rustcomponents.sdk.Room import org.matrix.rustcomponents.sdk.RoomListItem @@ -88,7 +81,6 @@ class RustMatrixRoom( private val roomCoroutineScope = sessionCoroutineScope.childScope(coroutineDispatchers.main, "RoomScope-$roomId") private val _membersStateFlow = MutableStateFlow(MatrixRoomMembersState.Unknown) - private val isInit = MutableStateFlow(false) private val _syncUpdateFlow = MutableStateFlow(0L) private val _timeline by lazy { RustMatrixTimeline( @@ -97,6 +89,7 @@ class RustMatrixRoom( roomCoroutineScope = roomCoroutineScope, dispatcher = roomDispatcher, lastLoginTimestamp = sessionData.loginTimestamp, + onNewSyncedEvent = { _syncUpdateFlow.value = systemClock.epochMillis() } ) } @@ -106,8 +99,7 @@ class RustMatrixRoom( override val timeline: MatrixTimeline = _timeline - override fun open(): Result { - if (isInit.value) return Result.failure(IllegalStateException("Listener already registered")) + override fun subscribeToSync() { val settings = RoomSubscription( requiredState = listOf( RequiredState(key = EventType.STATE_ROOM_CANONICAL_ALIAS, value = ""), @@ -118,35 +110,16 @@ class RustMatrixRoom( timelineLimit = null ) roomListItem.subscribe(settings) - roomCoroutineScope.launch(roomDispatcher) { - innerRoom.timelineDiffFlow { initialList -> - _timeline.postItems(initialList) - }.onEach { diff -> - if (diff.eventOrigin() == EventItemOrigin.SYNC) { - _syncUpdateFlow.value = systemClock.epochMillis() - } - _timeline.postDiff(diff) - }.launchIn(this) - - innerRoom.backPaginationStatusFlow() - .onEach { - _timeline.postPaginationStatus(it) - }.launchIn(this) + } - fetchMembers() - } - isInit.value = true - return Result.success(Unit) + override fun unsubscribeFromSync() { + roomListItem.unsubscribe() } - override fun close() { - if (isInit.value) { - isInit.value = false - roomCoroutineScope.cancel() - roomListItem.unsubscribe() - innerRoom.destroy() - roomListItem.destroy() - } + override fun destroy() { + roomCoroutineScope.cancel() + innerRoom.destroy() + roomListItem.destroy() } override val name: String? @@ -363,12 +336,6 @@ class RustMatrixRoom( } } - private suspend fun fetchMembers() = withContext(roomDispatcher) { - runCatching { - innerRoom.fetchMembers() - } - } - override suspend fun reportContent(eventId: EventId, reason: String, blockUserId: UserId?): Result = withContext(roomDispatcher) { runCatching { innerRoom.reportContent(eventId = eventId.value, score = null, reason = reason) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustMatrixTimeline.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustMatrixTimeline.kt index e213fb623c..d9b6604170 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustMatrixTimeline.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustMatrixTimeline.kt @@ -26,8 +26,8 @@ import io.element.android.libraries.matrix.impl.timeline.item.event.EventMessage import io.element.android.libraries.matrix.impl.timeline.item.event.EventTimelineItemMapper import io.element.android.libraries.matrix.impl.timeline.item.event.TimelineEventContentMapper import io.element.android.libraries.matrix.impl.timeline.item.virtual.VirtualTimelineItemMapper -import kotlinx.coroutines.CompletableDeferred import io.element.android.libraries.matrix.impl.timeline.postprocessor.TimelineEncryptedHistoryPostProcessor +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -37,17 +37,21 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.getAndUpdate +import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.mapLatest +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.sample +import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import org.matrix.rustcomponents.sdk.BackPaginationStatus +import org.matrix.rustcomponents.sdk.EventItemOrigin import org.matrix.rustcomponents.sdk.PaginationOptions import org.matrix.rustcomponents.sdk.Room import org.matrix.rustcomponents.sdk.TimelineDiff import org.matrix.rustcomponents.sdk.TimelineItem import timber.log.Timber -import java.util.concurrent.atomic.AtomicBoolean import java.util.Date +import java.util.concurrent.atomic.AtomicBoolean private const val INITIAL_MAX_SIZE = 50 @@ -57,6 +61,7 @@ class RustMatrixTimeline( private val innerRoom: Room, private val dispatcher: CoroutineDispatcher, private val lastLoginTimestamp: Date?, + private val onNewSyncedEvent: () -> Unit, ) : MatrixTimeline { private val initLatch = CompletableDeferred() @@ -93,13 +98,40 @@ class RustMatrixTimeline( override val paginationState: StateFlow = _paginationState.asStateFlow() + init { + Timber.d("Initialize timeline for room ${matrixRoom.roomId}") + roomCoroutineScope.launch(dispatcher) { + innerRoom.timelineDiffFlow { initialList -> + postItems(initialList) + }.onEach { diff -> + if (diff.eventOrigin() == EventItemOrigin.SYNC) { + onNewSyncedEvent() + } + postDiff(diff) + }.launchIn(this) + + innerRoom.backPaginationStatusFlow() + .onEach { + postPaginationStatus(it) + }.launchIn(this) + + fetchMembers() + } + } + + private suspend fun fetchMembers() = withContext(dispatcher) { + runCatching { + innerRoom.fetchMembers() + } + } + @OptIn(FlowPreview::class, ExperimentalCoroutinesApi::class) override val timelineItems: Flow> = _timelineItems.sample(50) .mapLatest { items -> encryptedHistoryPostProcessor.process(items) } - internal suspend fun postItems(items: List) { + private suspend fun postItems(items: List) { // Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap. items.chunked(INITIAL_MAX_SIZE).reversed().forEach { timelineDiffProcessor.postItems(it) @@ -108,12 +140,12 @@ class RustMatrixTimeline( initLatch.complete(Unit) } - internal suspend fun postDiff(timelineDiff: TimelineDiff) { + private suspend fun postDiff(timelineDiff: TimelineDiff) { initLatch.await() timelineDiffProcessor.postDiff(timelineDiff) } - internal fun postPaginationStatus(status: BackPaginationStatus) { + private fun postPaginationStatus(status: BackPaginationStatus) { _paginationState.getAndUpdate { currentPaginationState -> if (hasEncryptionHistoryBanner()) { return@getAndUpdate currentPaginationState.copy( diff --git a/libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/room/FakeMatrixRoom.kt b/libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/room/FakeMatrixRoom.kt index 7fe7de5b9f..59f6ed57bd 100644 --- a/libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/room/FakeMatrixRoom.kt +++ b/libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/room/FakeMatrixRoom.kt @@ -100,7 +100,6 @@ class FakeMatrixRoom( private val _sentLocations = mutableListOf() val sentLocations: List = _sentLocations - var invitedUserId: UserId? = null private set @@ -128,9 +127,11 @@ class FakeMatrixRoom( override val timeline: MatrixTimeline = matrixTimeline - override fun open(): Result { - return Result.success(Unit) - } + override fun subscribeToSync() = Unit + + override fun unsubscribeFromSync() = Unit + + override fun destroy() = Unit override suspend fun userDisplayName(userId: UserId): Result = simulateLongTask { userDisplayNameResult @@ -283,8 +284,6 @@ class FakeMatrixRoom( return sendLocationResult } - override fun close() = Unit - fun givenLeaveRoomError(throwable: Throwable?) { this.leaveRoomError = throwable } diff --git a/samples/minimal/src/main/kotlin/io/element/android/samples/minimal/RoomListScreen.kt b/samples/minimal/src/main/kotlin/io/element/android/samples/minimal/RoomListScreen.kt index f66de878fb..41dbc8bfe2 100644 --- a/samples/minimal/src/main/kotlin/io/element/android/samples/minimal/RoomListScreen.kt +++ b/samples/minimal/src/main/kotlin/io/element/android/samples/minimal/RoomListScreen.kt @@ -87,7 +87,6 @@ class RoomListScreen( Singleton.appScope.launch { withContext(coroutineDispatchers.io) { matrixClient.getRoom(roomId)!!.use { room -> - room.open() room.timeline.paginateBackwards(20, 50) } }