From 832fd69153beac780acda231e542dd20cd64b7f3 Mon Sep 17 00:00:00 2001 From: Benoit Marty Date: Mon, 24 Jul 2023 18:04:21 +0200 Subject: [PATCH 1/2] Use an AtomicBoolean instead of a MutableStateFlow to atomically init the RustMatrixRoom. Should improve #951. --- .../matrix/impl/room/RustMatrixRoom.kt | 62 ++++++++++--------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt index 16434aa3c8..6635e7ecea 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt @@ -65,6 +65,7 @@ import org.matrix.rustcomponents.sdk.genTransactionId import org.matrix.rustcomponents.sdk.messageEventContentFromMarkdown import timber.log.Timber import java.io.File +import java.util.concurrent.atomic.AtomicBoolean @OptIn(ExperimentalCoroutinesApi::class) class RustMatrixRoom( @@ -88,7 +89,7 @@ class RustMatrixRoom( private val roomCoroutineScope = sessionCoroutineScope.childScope(coroutineDispatchers.main, "RoomScope-$roomId") private val _membersStateFlow = MutableStateFlow(MatrixRoomMembersState.Unknown) - private val isInit = MutableStateFlow(false) + private val isInit = AtomicBoolean(false) private val _syncUpdateFlow = MutableStateFlow(0L) private val _timeline by lazy { RustMatrixTimeline( @@ -107,41 +108,42 @@ class RustMatrixRoom( override val timeline: MatrixTimeline = _timeline override fun open(): Result { - if (isInit.value) return Result.failure(IllegalStateException("Listener already registered")) - val settings = RoomSubscription( - requiredState = listOf( - RequiredState(key = EventType.STATE_ROOM_CANONICAL_ALIAS, value = ""), - RequiredState(key = EventType.STATE_ROOM_TOPIC, value = ""), - RequiredState(key = EventType.STATE_ROOM_JOIN_RULES, value = ""), - RequiredState(key = EventType.STATE_ROOM_POWER_LEVELS, value = ""), - ), - timelineLimit = null - ) - roomListItem.subscribe(settings) - roomCoroutineScope.launch(roomDispatcher) { - innerRoom.timelineDiffFlow { initialList -> - _timeline.postItems(initialList) - }.onEach { diff -> - if (diff.eventOrigin() == EventItemOrigin.SYNC) { - _syncUpdateFlow.value = systemClock.epochMillis() - } - _timeline.postDiff(diff) - }.launchIn(this) - - innerRoom.backPaginationStatusFlow() - .onEach { - _timeline.postPaginationStatus(it) + return if (isInit.getAndSet(true)) { + Result.failure(IllegalStateException("Listener already registered")) + } else { + val settings = RoomSubscription( + requiredState = listOf( + RequiredState(key = EventType.STATE_ROOM_CANONICAL_ALIAS, value = ""), + RequiredState(key = EventType.STATE_ROOM_TOPIC, value = ""), + RequiredState(key = EventType.STATE_ROOM_JOIN_RULES, value = ""), + RequiredState(key = EventType.STATE_ROOM_POWER_LEVELS, value = ""), + ), + timelineLimit = null + ) + roomListItem.subscribe(settings) + roomCoroutineScope.launch(roomDispatcher) { + innerRoom.timelineDiffFlow { initialList -> + _timeline.postItems(initialList) + }.onEach { diff -> + if (diff.eventOrigin() == EventItemOrigin.SYNC) { + _syncUpdateFlow.value = systemClock.epochMillis() + } + _timeline.postDiff(diff) }.launchIn(this) - fetchMembers() + innerRoom.backPaginationStatusFlow() + .onEach { + _timeline.postPaginationStatus(it) + }.launchIn(this) + + fetchMembers() + } + Result.success(Unit) } - isInit.value = true - return Result.success(Unit) } override fun close() { - if (isInit.value) { - isInit.value = false + if (isInit.getAndSet(false)) { roomCoroutineScope.cancel() roomListItem.unsubscribe() innerRoom.destroy() From e35bb73a86f41a1b199078435699979288968819 Mon Sep 17 00:00:00 2001 From: ganfra Date: Tue, 25 Jul 2023 12:06:36 +0200 Subject: [PATCH 2/2] Rework some MatrixRoom api and fix rust 'destroyed' crash --- .../android/appnav/room/RoomLoadedFlowNode.kt | 10 ++- .../libraries/matrix/api/room/MatrixRoom.kt | 8 +- .../matrix/impl/room/RustMatrixRoom.kt | 75 +++++-------------- .../impl/timeline/RustMatrixTimeline.kt | 42 +++++++++-- .../matrix/test/room/FakeMatrixRoom.kt | 11 ++- .../android/samples/minimal/RoomListScreen.kt | 1 - 6 files changed, 76 insertions(+), 71 deletions(-) diff --git a/appnav/src/main/kotlin/io/element/android/appnav/room/RoomLoadedFlowNode.kt b/appnav/src/main/kotlin/io/element/android/appnav/room/RoomLoadedFlowNode.kt index 73a8579b07..24ec9795f7 100644 --- a/appnav/src/main/kotlin/io/element/android/appnav/room/RoomLoadedFlowNode.kt +++ b/appnav/src/main/kotlin/io/element/android/appnav/room/RoomLoadedFlowNode.kt @@ -20,6 +20,7 @@ import android.os.Parcelable import androidx.compose.runtime.Composable import androidx.compose.runtime.DisposableEffect import androidx.compose.ui.Modifier +import androidx.lifecycle.Lifecycle import androidx.lifecycle.lifecycleScope import com.bumble.appyx.core.composable.Children import com.bumble.appyx.core.lifecycle.subscribe @@ -161,13 +162,16 @@ class RoomLoadedFlowNode @AssistedInject constructor( @Composable override fun View(modifier: Modifier) { - // Rely on the View Lifecycle instead of the Node Lifecycle, + // Rely on the View Lifecycle in addition to the Node Lifecycle, // because this node enters 'onDestroy' before his children, so it can leads to // using the room in a child node where it's already closed. DisposableEffect(Unit) { - inputs.room.open() + inputs.room.subscribeToSync() onDispose { - inputs.room.close() + inputs.room.unsubscribeFromSync() + if (lifecycle.currentState == Lifecycle.State.DESTROYED) { + inputs.room.destroy() + } } } Children( diff --git a/libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/room/MatrixRoom.kt b/libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/room/MatrixRoom.kt index be0ff447b3..1445f85228 100644 --- a/libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/room/MatrixRoom.kt +++ b/libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/room/MatrixRoom.kt @@ -63,7 +63,11 @@ interface MatrixRoom : Closeable { val timeline: MatrixTimeline - fun open(): Result + fun destroy() + + fun subscribeToSync() + + fun unsubscribeFromSync() suspend fun userDisplayName(userId: UserId): Result @@ -133,6 +137,8 @@ interface MatrixRoom : Closeable { zoomLevel: Int? = null, assetType: AssetType? = null, ): Result + + override fun close() = destroy() } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt index 6635e7ecea..89e83b58c6 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt @@ -40,9 +40,6 @@ import io.element.android.libraries.matrix.impl.core.toProgressWatcher import io.element.android.libraries.matrix.impl.media.map import io.element.android.libraries.matrix.impl.room.location.toInner import io.element.android.libraries.matrix.impl.timeline.RustMatrixTimeline -import io.element.android.libraries.matrix.impl.timeline.backPaginationStatusFlow -import io.element.android.libraries.matrix.impl.timeline.eventOrigin -import io.element.android.libraries.matrix.impl.timeline.timelineDiffFlow import io.element.android.libraries.sessionstorage.api.SessionData import io.element.android.services.toolbox.api.systemclock.SystemClock import kotlinx.coroutines.CoroutineScope @@ -51,11 +48,7 @@ import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch import kotlinx.coroutines.withContext -import org.matrix.rustcomponents.sdk.EventItemOrigin import org.matrix.rustcomponents.sdk.RequiredState import org.matrix.rustcomponents.sdk.Room import org.matrix.rustcomponents.sdk.RoomListItem @@ -65,7 +58,6 @@ import org.matrix.rustcomponents.sdk.genTransactionId import org.matrix.rustcomponents.sdk.messageEventContentFromMarkdown import timber.log.Timber import java.io.File -import java.util.concurrent.atomic.AtomicBoolean @OptIn(ExperimentalCoroutinesApi::class) class RustMatrixRoom( @@ -89,7 +81,6 @@ class RustMatrixRoom( private val roomCoroutineScope = sessionCoroutineScope.childScope(coroutineDispatchers.main, "RoomScope-$roomId") private val _membersStateFlow = MutableStateFlow(MatrixRoomMembersState.Unknown) - private val isInit = AtomicBoolean(false) private val _syncUpdateFlow = MutableStateFlow(0L) private val _timeline by lazy { RustMatrixTimeline( @@ -98,6 +89,7 @@ class RustMatrixRoom( roomCoroutineScope = roomCoroutineScope, dispatcher = roomDispatcher, lastLoginTimestamp = sessionData.loginTimestamp, + onNewSyncedEvent = { _syncUpdateFlow.value = systemClock.epochMillis() } ) } @@ -107,48 +99,27 @@ class RustMatrixRoom( override val timeline: MatrixTimeline = _timeline - override fun open(): Result { - return if (isInit.getAndSet(true)) { - Result.failure(IllegalStateException("Listener already registered")) - } else { - val settings = RoomSubscription( - requiredState = listOf( - RequiredState(key = EventType.STATE_ROOM_CANONICAL_ALIAS, value = ""), - RequiredState(key = EventType.STATE_ROOM_TOPIC, value = ""), - RequiredState(key = EventType.STATE_ROOM_JOIN_RULES, value = ""), - RequiredState(key = EventType.STATE_ROOM_POWER_LEVELS, value = ""), - ), - timelineLimit = null - ) - roomListItem.subscribe(settings) - roomCoroutineScope.launch(roomDispatcher) { - innerRoom.timelineDiffFlow { initialList -> - _timeline.postItems(initialList) - }.onEach { diff -> - if (diff.eventOrigin() == EventItemOrigin.SYNC) { - _syncUpdateFlow.value = systemClock.epochMillis() - } - _timeline.postDiff(diff) - }.launchIn(this) - - innerRoom.backPaginationStatusFlow() - .onEach { - _timeline.postPaginationStatus(it) - }.launchIn(this) - - fetchMembers() - } - Result.success(Unit) - } + override fun subscribeToSync() { + val settings = RoomSubscription( + requiredState = listOf( + RequiredState(key = EventType.STATE_ROOM_CANONICAL_ALIAS, value = ""), + RequiredState(key = EventType.STATE_ROOM_TOPIC, value = ""), + RequiredState(key = EventType.STATE_ROOM_JOIN_RULES, value = ""), + RequiredState(key = EventType.STATE_ROOM_POWER_LEVELS, value = ""), + ), + timelineLimit = null + ) + roomListItem.subscribe(settings) } - override fun close() { - if (isInit.getAndSet(false)) { - roomCoroutineScope.cancel() - roomListItem.unsubscribe() - innerRoom.destroy() - roomListItem.destroy() - } + override fun unsubscribeFromSync() { + roomListItem.unsubscribe() + } + + override fun destroy() { + roomCoroutineScope.cancel() + innerRoom.destroy() + roomListItem.destroy() } override val name: String? @@ -365,12 +336,6 @@ class RustMatrixRoom( } } - private suspend fun fetchMembers() = withContext(roomDispatcher) { - runCatching { - innerRoom.fetchMembers() - } - } - override suspend fun reportContent(eventId: EventId, reason: String, blockUserId: UserId?): Result = withContext(roomDispatcher) { runCatching { innerRoom.reportContent(eventId = eventId.value, score = null, reason = reason) diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustMatrixTimeline.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustMatrixTimeline.kt index e213fb623c..d9b6604170 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustMatrixTimeline.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustMatrixTimeline.kt @@ -26,8 +26,8 @@ import io.element.android.libraries.matrix.impl.timeline.item.event.EventMessage import io.element.android.libraries.matrix.impl.timeline.item.event.EventTimelineItemMapper import io.element.android.libraries.matrix.impl.timeline.item.event.TimelineEventContentMapper import io.element.android.libraries.matrix.impl.timeline.item.virtual.VirtualTimelineItemMapper -import kotlinx.coroutines.CompletableDeferred import io.element.android.libraries.matrix.impl.timeline.postprocessor.TimelineEncryptedHistoryPostProcessor +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -37,17 +37,21 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.getAndUpdate +import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.mapLatest +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.sample +import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import org.matrix.rustcomponents.sdk.BackPaginationStatus +import org.matrix.rustcomponents.sdk.EventItemOrigin import org.matrix.rustcomponents.sdk.PaginationOptions import org.matrix.rustcomponents.sdk.Room import org.matrix.rustcomponents.sdk.TimelineDiff import org.matrix.rustcomponents.sdk.TimelineItem import timber.log.Timber -import java.util.concurrent.atomic.AtomicBoolean import java.util.Date +import java.util.concurrent.atomic.AtomicBoolean private const val INITIAL_MAX_SIZE = 50 @@ -57,6 +61,7 @@ class RustMatrixTimeline( private val innerRoom: Room, private val dispatcher: CoroutineDispatcher, private val lastLoginTimestamp: Date?, + private val onNewSyncedEvent: () -> Unit, ) : MatrixTimeline { private val initLatch = CompletableDeferred() @@ -93,13 +98,40 @@ class RustMatrixTimeline( override val paginationState: StateFlow = _paginationState.asStateFlow() + init { + Timber.d("Initialize timeline for room ${matrixRoom.roomId}") + roomCoroutineScope.launch(dispatcher) { + innerRoom.timelineDiffFlow { initialList -> + postItems(initialList) + }.onEach { diff -> + if (diff.eventOrigin() == EventItemOrigin.SYNC) { + onNewSyncedEvent() + } + postDiff(diff) + }.launchIn(this) + + innerRoom.backPaginationStatusFlow() + .onEach { + postPaginationStatus(it) + }.launchIn(this) + + fetchMembers() + } + } + + private suspend fun fetchMembers() = withContext(dispatcher) { + runCatching { + innerRoom.fetchMembers() + } + } + @OptIn(FlowPreview::class, ExperimentalCoroutinesApi::class) override val timelineItems: Flow> = _timelineItems.sample(50) .mapLatest { items -> encryptedHistoryPostProcessor.process(items) } - internal suspend fun postItems(items: List) { + private suspend fun postItems(items: List) { // Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap. items.chunked(INITIAL_MAX_SIZE).reversed().forEach { timelineDiffProcessor.postItems(it) @@ -108,12 +140,12 @@ class RustMatrixTimeline( initLatch.complete(Unit) } - internal suspend fun postDiff(timelineDiff: TimelineDiff) { + private suspend fun postDiff(timelineDiff: TimelineDiff) { initLatch.await() timelineDiffProcessor.postDiff(timelineDiff) } - internal fun postPaginationStatus(status: BackPaginationStatus) { + private fun postPaginationStatus(status: BackPaginationStatus) { _paginationState.getAndUpdate { currentPaginationState -> if (hasEncryptionHistoryBanner()) { return@getAndUpdate currentPaginationState.copy( diff --git a/libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/room/FakeMatrixRoom.kt b/libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/room/FakeMatrixRoom.kt index 7fe7de5b9f..59f6ed57bd 100644 --- a/libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/room/FakeMatrixRoom.kt +++ b/libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/room/FakeMatrixRoom.kt @@ -100,7 +100,6 @@ class FakeMatrixRoom( private val _sentLocations = mutableListOf() val sentLocations: List = _sentLocations - var invitedUserId: UserId? = null private set @@ -128,9 +127,11 @@ class FakeMatrixRoom( override val timeline: MatrixTimeline = matrixTimeline - override fun open(): Result { - return Result.success(Unit) - } + override fun subscribeToSync() = Unit + + override fun unsubscribeFromSync() = Unit + + override fun destroy() = Unit override suspend fun userDisplayName(userId: UserId): Result = simulateLongTask { userDisplayNameResult @@ -283,8 +284,6 @@ class FakeMatrixRoom( return sendLocationResult } - override fun close() = Unit - fun givenLeaveRoomError(throwable: Throwable?) { this.leaveRoomError = throwable } diff --git a/samples/minimal/src/main/kotlin/io/element/android/samples/minimal/RoomListScreen.kt b/samples/minimal/src/main/kotlin/io/element/android/samples/minimal/RoomListScreen.kt index f66de878fb..41dbc8bfe2 100644 --- a/samples/minimal/src/main/kotlin/io/element/android/samples/minimal/RoomListScreen.kt +++ b/samples/minimal/src/main/kotlin/io/element/android/samples/minimal/RoomListScreen.kt @@ -87,7 +87,6 @@ class RoomListScreen( Singleton.appScope.launch { withContext(coroutineDispatchers.io) { matrixClient.getRoom(roomId)!!.use { room -> - room.open() room.timeline.paginateBackwards(20, 50) } }