diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt index 463defb284..d23e5efb2a 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt @@ -196,8 +196,6 @@ class RustMatrixRoom( override fun destroy() { roomCoroutineScope.cancel() liveTimeline.close() - innerRoom.destroy() - roomListItem.destroy() } override val displayName: String @@ -627,12 +625,13 @@ class RustMatrixRoom( isLive: Boolean, onNewSyncedEvent: () -> Unit = {}, ): Timeline { + val timelineCoroutineScope = roomCoroutineScope.childScope(coroutineDispatchers.main, "TimelineScope-$roomId-$timeline") return RustTimeline( isKeyBackupEnabled = isKeyBackupEnabled, isLive = isLive, matrixRoom = this, systemClock = systemClock, - roomCoroutineScope = roomCoroutineScope, + coroutineScope = timelineCoroutineScope, dispatcher = roomDispatcher, lastLoginTimestamp = sessionData.loginTimestamp, onNewSyncedEvent = onNewSyncedEvent, diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt index 8d3d056aab..a69cbc67d3 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustRoomFactory.kt @@ -16,6 +16,7 @@ package io.element.android.libraries.matrix.impl.room +import androidx.collection.lruCache import io.element.android.appconfig.TimelineConfig import io.element.android.libraries.core.coroutine.CoroutineDispatchers import io.element.android.libraries.matrix.api.core.RoomId @@ -41,6 +42,8 @@ import org.matrix.rustcomponents.sdk.TimelineEventTypeFilter import timber.log.Timber import org.matrix.rustcomponents.sdk.RoomListService as InnerRoomListService +private const val CACHE_SIZE = 16 + class RustRoomFactory( private val sessionId: SessionId, private val notificationSettingsService: NotificationSettingsService, @@ -55,8 +58,23 @@ class RustRoomFactory( private val getSessionData: suspend () -> SessionData, ) { @OptIn(ExperimentalCoroutinesApi::class) - private val createRoomDispatcher = dispatchers.io.limitedParallelism(1) + private val dispatcher = dispatchers.io.limitedParallelism(1) private val mutex = Mutex() + private var isDestroyed: Boolean = false + + private data class RustRoomObjects( + val roomListItem: RoomListItem, + val fullRoom: Room, + ) + + private val cache = lruCache( + maxSize = CACHE_SIZE, + onEntryRemoved = { evicted, roomId, oldRoom, _ -> + Timber.d("On room removed from cache: $roomId, evicted: $evicted") + oldRoom.roomListItem.close() + oldRoom.fullRoom.close() + } + ) private val matrixRoomInfoMapper = MatrixRoomInfoMapper() @@ -70,30 +88,41 @@ class RustRoomFactory( ) } - suspend fun create(roomId: RoomId): MatrixRoom? = withContext(createRoomDispatcher) { - var cachedPairOfRoom: Pair? + suspend fun destroy() { + withContext(dispatcher) { + mutex.withLock { + Timber.d("Destroying room factory") + cache.evictAll() + isDestroyed = true + } + } + } + + suspend fun create(roomId: RoomId): MatrixRoom? = withContext(dispatcher) { mutex.withLock { - // Check if already in memory... - cachedPairOfRoom = pairOfRoom(roomId) - if (cachedPairOfRoom == null) { + if (isDestroyed) { + Timber.d("Room factory is destroyed, returning null for $roomId") + return@withContext null + } + var roomObjects: RustRoomObjects? = getRoomObjects(roomId) + if (roomObjects == null) { // ... otherwise, lets wait for the SS to load all rooms and check again. roomListService.allRooms.awaitLoaded() - cachedPairOfRoom = pairOfRoom(roomId) + roomObjects = getRoomObjects(roomId) } - } - if (cachedPairOfRoom == null) { - Timber.d("No room found for $roomId") - return@withContext null - } - cachedPairOfRoom?.let { (roomListItem, fullRoom) -> + if (roomObjects == null) { + Timber.d("No room found for $roomId, returning null") + return@withContext null + } + val liveTimeline = roomObjects.fullRoom.timeline() RustMatrixRoom( sessionId = sessionId, isKeyBackupEnabled = isKeyBackupEnabled(), - roomListItem = roomListItem, - innerRoom = fullRoom, - innerTimeline = fullRoom.timeline(), - notificationSettingsService = notificationSettingsService, + roomListItem = roomObjects.roomListItem, + innerRoom = roomObjects.fullRoom, + innerTimeline = liveTimeline, sessionCoroutineScope = sessionCoroutineScope, + notificationSettingsService = notificationSettingsService, coroutineDispatchers = dispatchers, systemClock = systemClock, roomContentForwarder = roomContentForwarder, @@ -104,20 +133,28 @@ class RustRoomFactory( } } - private suspend fun pairOfRoom(roomId: RoomId): Pair? { - val cachedRoomListItem = innerRoomListService.roomOrNull(roomId.value) + private suspend fun getRoomObjects(roomId: RoomId): RustRoomObjects? { + cache[roomId]?.let { + Timber.d("Room found in cache for $roomId") + return it + } + val roomListItem = innerRoomListService.roomOrNull(roomId.value) + if (roomListItem == null) { + Timber.d("Room not found for $roomId") + return null + } val fullRoom = try { - cachedRoomListItem?.fullRoomWithTimeline(filter = eventFilters) + roomListItem.fullRoomWithTimeline(filter = eventFilters) } catch (e: RoomListException) { Timber.e(e, "Failed to get full room with timeline for $roomId") - null + return null } - return if (cachedRoomListItem == null || fullRoom == null) { - Timber.d("No room cached for $roomId") - null - } else { - Timber.d("Found room cached for $roomId") - Pair(cachedRoomListItem, fullRoom) + Timber.d("Got full room with timeline for $roomId") + return RustRoomObjects( + roomListItem = roomListItem, + fullRoom = fullRoom, + ).also { + cache.put(roomId, it) } } } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/MatrixTimelineItemMapper.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/MatrixTimelineItemMapper.kt index c8cb4eef1c..d38c86f0f9 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/MatrixTimelineItemMapper.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/MatrixTimelineItemMapper.kt @@ -26,7 +26,7 @@ import org.matrix.rustcomponents.sdk.TimelineItem class MatrixTimelineItemMapper( private val fetchDetailsForEvent: suspend (EventId) -> Result, - private val roomCoroutineScope: CoroutineScope, + private val coroutineScope: CoroutineScope, private val virtualTimelineItemMapper: VirtualTimelineItemMapper = VirtualTimelineItemMapper(), private val eventTimelineItemMapper: EventTimelineItemMapper = EventTimelineItemMapper(), ) { @@ -49,7 +49,7 @@ class MatrixTimelineItemMapper( return MatrixTimelineItem.Other } - private fun fetchEventDetails(eventId: EventId) = roomCoroutineScope.launch { + private fun fetchEventDetails(eventId: EventId) = coroutineScope.launch { fetchDetailsForEvent(eventId) } } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt index dc012f67b3..2adcab5491 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RoomTimelineExtensions.kt @@ -25,13 +25,13 @@ import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.catch import org.matrix.rustcomponents.sdk.PaginationStatusListener -import org.matrix.rustcomponents.sdk.Timeline import org.matrix.rustcomponents.sdk.TimelineDiff +import org.matrix.rustcomponents.sdk.TimelineInterface import org.matrix.rustcomponents.sdk.TimelineListener import timber.log.Timber import uniffi.matrix_sdk_ui.LiveBackPaginationStatus -internal fun Timeline.liveBackPaginationStatus(): Flow = callbackFlow { +internal fun TimelineInterface.liveBackPaginationStatus(): Flow = callbackFlow { val listener = object : PaginationStatusListener { override fun onUpdate(status: LiveBackPaginationStatus) { trySend(status) @@ -45,7 +45,7 @@ internal fun Timeline.liveBackPaginationStatus(): Flow Timber.d(it, "liveBackPaginationStatus() failed") }.buffer(Channel.UNLIMITED) -internal fun Timeline.timelineDiffFlow(): Flow> = +internal fun TimelineInterface.timelineDiffFlow(): Flow> = callbackFlow { val listener = object : TimelineListener { override fun onUpdate(diff: List) { @@ -62,7 +62,7 @@ internal fun Timeline.timelineDiffFlow(): Flow> = Timber.d(it, "timelineDiffFlow() failed") }.buffer(Channel.UNLIMITED) -internal suspend fun Timeline.runWithTimelineListenerRegistered(action: suspend () -> Unit) { +internal suspend fun TimelineInterface.runWithTimelineListenerRegistered(action: suspend () -> Unit) { val result = addListener(NoOpTimelineListener) try { action() diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt index 9f48e59995..fb42f27f62 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/RustTimeline.kt @@ -56,6 +56,7 @@ import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow @@ -64,6 +65,7 @@ import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.getAndUpdate import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.launch @@ -88,9 +90,9 @@ class RustTimeline( private val inner: InnerTimeline, private val isLive: Boolean, systemClock: SystemClock, - roomCoroutineScope: CoroutineScope, isKeyBackupEnabled: Boolean, private val matrixRoom: MatrixRoom, + private val coroutineScope: CoroutineScope, private val dispatcher: CoroutineDispatcher, lastLoginTimestamp: Date?, private val roomContentForwarder: RoomContentForwarder, @@ -106,7 +108,7 @@ class RustTimeline( private val inReplyToMapper = InReplyToMapper(timelineEventContentMapper) private val timelineItemMapper = MatrixTimelineItemMapper( fetchDetailsForEvent = this::fetchDetailsForEvent, - roomCoroutineScope = roomCoroutineScope, + coroutineScope = coroutineScope, virtualTimelineItemMapper = VirtualTimelineItemMapper(), eventTimelineItemMapper = EventTimelineItemMapper( contentMapper = timelineEventContentMapper @@ -124,7 +126,7 @@ class RustTimeline( ) private val timelineItemsSubscriber = TimelineItemsSubscriber( timeline = inner, - roomCoroutineScope = roomCoroutineScope, + timelineCoroutineScope = coroutineScope, timelineDiffProcessor = timelineDiffProcessor, initLatch = initLatch, isInit = isInit, @@ -145,13 +147,11 @@ class RustTimeline( ) init { - roomCoroutineScope.launch(dispatcher) { - fetchMembers() - if (isLive) { - // When timeline is live, we need to listen to the back pagination status as - // sdk can automatically paginate backwards. - registerBackPaginationStatusListener() - } + coroutineScope.fetchMembers() + if (isLive) { + // When timeline is live, we need to listen to the back pagination status as + // sdk can automatically paginate backwards. + coroutineScope.registerBackPaginationStatusListener() } } @@ -243,9 +243,12 @@ class RustTimeline( } }.onStart { timelineItemsSubscriber.subscribeIfNeeded() + }.onCompletion { + timelineItemsSubscriber.unsubscribeIfNeeded() } override fun close() { + coroutineScope.cancel() inner.close() } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt index 0205ac20e5..a4086c2810 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/timeline/TimelineItemsSubscriber.kt @@ -40,10 +40,9 @@ private const val INITIAL_MAX_SIZE = 50 * This class is responsible for subscribing to a timeline and post the items/diffs to the timelineDiffProcessor. * It will also trigger a callback when a new synced event is received. * It will also handle the initial items and make sure they are posted before any diff. - * When closing the room subscription, it will also unsubscribe automatically. */ internal class TimelineItemsSubscriber( - roomCoroutineScope: CoroutineScope, + timelineCoroutineScope: CoroutineScope, dispatcher: CoroutineDispatcher, private val timeline: Timeline, private val timelineDiffProcessor: MatrixTimelineDiffProcessor, @@ -54,8 +53,12 @@ internal class TimelineItemsSubscriber( private var subscriptionCount = 0 private val mutex = Mutex() - private val coroutineScope = roomCoroutineScope.childScope(dispatcher, "TimelineItemsSubscriber") + private val coroutineScope = timelineCoroutineScope.childScope(dispatcher, "TimelineItemsSubscriber") + /** + * Add a subscription to the timeline and start posting items/diffs to the timelineDiffProcessor. + * It will also trigger a callback when a new synced event is received. + */ suspend fun subscribeIfNeeded() = mutex.withLock { if (subscriptionCount == 0) { timeline.timelineDiffFlow() @@ -70,6 +73,11 @@ internal class TimelineItemsSubscriber( subscriptionCount++ } + /** + * Remove a subscription to the timeline and unsubscribe if needed. + * The timeline will be unsubscribed when the last subscription is removed. + * If the timelineCoroutineScope is cancelled, the timeline will be unsubscribed automatically. + */ suspend fun unsubscribeIfNeeded() = mutex.withLock { when (subscriptionCount) { 0 -> return@withLock