From 92507453339f58150ddd5734e81d2464ded80d69 Mon Sep 17 00:00:00 2001 From: ganfra Date: Wed, 12 Jun 2024 15:15:04 +0200 Subject: [PATCH] Sending queue : adjust to match the latest rust api --- .../android/appnav/LoggedInFlowNode.kt | 4 +- .../{SendingQueue.kt => SendQueues.kt} | 41 +++--- .../android/appnav/loggedin/SendQueuesTest.kt | 118 ++++++++++++++++++ .../appnav/loggedin/SendingQueueTest.kt | 79 ------------ .../libraries/matrix/api/MatrixClient.kt | 8 +- .../libraries/matrix/api/room/MatrixRoom.kt | 2 + .../libraries/matrix/impl/RustMatrixClient.kt | 21 ++-- .../matrix/impl/room/RustMatrixRoom.kt | 6 + .../libraries/matrix/test/FakeMatrixClient.kt | 10 +- .../matrix/test/room/FakeMatrixRoom.kt | 3 + 10 files changed, 169 insertions(+), 123 deletions(-) rename appnav/src/main/kotlin/io/element/android/appnav/loggedin/{SendingQueue.kt => SendQueues.kt} (53%) create mode 100644 appnav/src/test/kotlin/io/element/android/appnav/loggedin/SendQueuesTest.kt delete mode 100644 appnav/src/test/kotlin/io/element/android/appnav/loggedin/SendingQueueTest.kt diff --git a/appnav/src/main/kotlin/io/element/android/appnav/LoggedInFlowNode.kt b/appnav/src/main/kotlin/io/element/android/appnav/LoggedInFlowNode.kt index b91cf18464..219ec2167a 100644 --- a/appnav/src/main/kotlin/io/element/android/appnav/LoggedInFlowNode.kt +++ b/appnav/src/main/kotlin/io/element/android/appnav/LoggedInFlowNode.kt @@ -41,7 +41,7 @@ import dagger.assisted.AssistedInject import im.vector.app.features.analytics.plan.JoinedRoom import io.element.android.anvilannotations.ContributesNode import io.element.android.appnav.loggedin.LoggedInNode -import io.element.android.appnav.loggedin.SendingQueue +import io.element.android.appnav.loggedin.SendQueues import io.element.android.appnav.room.RoomFlowNode import io.element.android.appnav.room.RoomNavigationTarget import io.element.android.appnav.room.joined.JoinedRoomLoadedFlowNode @@ -103,7 +103,7 @@ class LoggedInFlowNode @AssistedInject constructor( private val roomDirectoryEntryPoint: RoomDirectoryEntryPoint, private val shareEntryPoint: ShareEntryPoint, private val matrixClient: MatrixClient, - private val sendingQueue: SendingQueue, + private val sendingQueue: SendQueues, snackbarDispatcher: SnackbarDispatcher, ) : BaseFlowNode( backstack = BackStack( diff --git a/appnav/src/main/kotlin/io/element/android/appnav/loggedin/SendingQueue.kt b/appnav/src/main/kotlin/io/element/android/appnav/loggedin/SendQueues.kt similarity index 53% rename from appnav/src/main/kotlin/io/element/android/appnav/loggedin/SendingQueue.kt rename to appnav/src/main/kotlin/io/element/android/appnav/loggedin/SendQueues.kt index 979683de96..e52e9ac758 100644 --- a/appnav/src/main/kotlin/io/element/android/appnav/loggedin/SendingQueue.kt +++ b/appnav/src/main/kotlin/io/element/android/appnav/loggedin/SendQueues.kt @@ -22,44 +22,39 @@ import io.element.android.features.networkmonitor.api.NetworkStatus import io.element.android.libraries.di.SessionScope import io.element.android.libraries.di.SingleIn import io.element.android.libraries.matrix.api.MatrixClient +import io.element.android.libraries.matrix.api.core.RoomId import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import timber.log.Timber -import java.util.concurrent.atomic.AtomicInteger import javax.inject.Inject -private const val SENDING_QUEUE_MIN_RETRY_DELAY = 250L - @VisibleForTesting -const val SENDING_QUEUE_MAX_RETRY_DELAY = 5000L +const val SENDING_QUEUE_RETRY_DELAY = 1500L @SingleIn(SessionScope::class) -class SendingQueue @Inject constructor( +class SendQueues @Inject constructor( private val matrixClient: MatrixClient, private val networkMonitor: NetworkMonitor, ) { - private val retryCount = AtomicInteger(0) fun launchIn(coroutineScope: CoroutineScope) { - combine( - networkMonitor.connectivity, - matrixClient.sendingQueueStatus(), - ) { networkStatus, isSendingQueueEnabled -> - Pair(networkStatus, isSendingQueueEnabled) - }.onEach { (networkStatus, isSendingQueueEnabled) -> - Timber.d("Network status: $networkStatus, isSendingQueueEnabled: $isSendingQueueEnabled") - if (networkStatus == NetworkStatus.Online && !isSendingQueueEnabled) { - val retryDelay = - (SENDING_QUEUE_MIN_RETRY_DELAY * retryCount.incrementAndGet()).coerceIn(SENDING_QUEUE_MIN_RETRY_DELAY, SENDING_QUEUE_MAX_RETRY_DELAY) - Timber.d("Retry enabling sending queue in $retryDelay ms") - delay(retryDelay) - } else { - retryCount.set(0) + networkMonitor.connectivity + .onEach { networkStatus -> + matrixClient.setAllSendQueuesEnabled(enabled = networkStatus == NetworkStatus.Online) } - matrixClient.setSendingQueueEnabled(enabled = networkStatus == NetworkStatus.Online) - }.launchIn(coroutineScope) + .launchIn(coroutineScope) + + matrixClient.sendQueueDisabledFlow() + .onEach { roomId: RoomId -> + Timber.d("Send queue disabled for room $roomId") + if (networkMonitor.connectivity.value == NetworkStatus.Online) { + delay(SENDING_QUEUE_RETRY_DELAY) + matrixClient.getRoom(roomId)?.use { room -> + room.setSendQueueEnabled(enabled = true) + } + } + }.launchIn(coroutineScope) } } diff --git a/appnav/src/test/kotlin/io/element/android/appnav/loggedin/SendQueuesTest.kt b/appnav/src/test/kotlin/io/element/android/appnav/loggedin/SendQueuesTest.kt new file mode 100644 index 0000000000..682fb08d31 --- /dev/null +++ b/appnav/src/test/kotlin/io/element/android/appnav/loggedin/SendQueuesTest.kt @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2024 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.element.android.appnav.loggedin + +import io.element.android.features.networkmonitor.api.NetworkStatus +import io.element.android.features.networkmonitor.test.FakeNetworkMonitor +import io.element.android.libraries.matrix.api.core.RoomId +import io.element.android.libraries.matrix.test.FakeMatrixClient +import io.element.android.libraries.matrix.test.room.FakeMatrixRoom +import io.element.android.tests.testutils.lambda.assert +import io.element.android.tests.testutils.lambda.lambdaRecorder +import io.element.android.tests.testutils.lambda.value +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.test.advanceTimeBy +import kotlinx.coroutines.test.runCurrent +import kotlinx.coroutines.test.runTest +import org.junit.Test + +@OptIn(ExperimentalCoroutinesApi::class) class SendQueuesTest { + private val matrixClient = FakeMatrixClient() + private val room = FakeMatrixRoom() + private val networkMonitor = FakeNetworkMonitor() + private val sut = SendQueues(matrixClient, networkMonitor) + + @Test + fun `test network status online and sending queue failed`() = runTest { + + val sendQueueDisabledFlow = MutableSharedFlow(replay = 1) + val setAllSendQueuesEnabledLambda = lambdaRecorder { _: Boolean -> } + matrixClient.sendQueueDisabledFlow = sendQueueDisabledFlow + matrixClient.setAllSendQueuesEnabledLambda = setAllSendQueuesEnabledLambda + matrixClient.givenGetRoomResult(room.roomId, room) + + val setRoomSendQueueEnabledLambda = lambdaRecorder { _: Boolean -> } + room.setSendQueueEnabledLambda = setRoomSendQueueEnabledLambda + + sut.launchIn(backgroundScope) + + sendQueueDisabledFlow.emit(room.roomId) + advanceTimeBy(SENDING_QUEUE_RETRY_DELAY) + runCurrent() + + assert(setAllSendQueuesEnabledLambda) + .isCalledOnce() + .with(value(true)) + + assert(setRoomSendQueueEnabledLambda) + .isCalledOnce() + .with(value(true)) + + } + + @Test + fun `test network status offline and sending queue failed`() = runTest { + + val sendQueueDisabledFlow = MutableSharedFlow(replay = 1) + + val setAllSendQueuesEnabledLambda = lambdaRecorder { _: Boolean -> } + matrixClient.sendQueueDisabledFlow = sendQueueDisabledFlow + matrixClient.setAllSendQueuesEnabledLambda = setAllSendQueuesEnabledLambda + networkMonitor.connectivity.value = NetworkStatus.Offline + matrixClient.givenGetRoomResult(room.roomId, room) + + val setRoomSendQueueEnabledLambda = lambdaRecorder { _: Boolean -> } + room.setSendQueueEnabledLambda = setRoomSendQueueEnabledLambda + + sut.launchIn(backgroundScope) + + sendQueueDisabledFlow.emit(room.roomId) + advanceTimeBy(SENDING_QUEUE_RETRY_DELAY) + runCurrent() + + assert(setAllSendQueuesEnabledLambda) + .isCalledOnce() + .with(value(false)) + + assert(setRoomSendQueueEnabledLambda) + .isNeverCalled() + + } + + @Test + fun `test network status getting offline and online`() = runTest { + + val setEnableSendingQueueLambda = lambdaRecorder { _: Boolean -> } + matrixClient.setAllSendQueuesEnabledLambda = setEnableSendingQueueLambda + + sut.launchIn(backgroundScope) + advanceTimeBy(SENDING_QUEUE_RETRY_DELAY) + networkMonitor.connectivity.value = NetworkStatus.Offline + advanceTimeBy(SENDING_QUEUE_RETRY_DELAY) + networkMonitor.connectivity.value = NetworkStatus.Online + advanceTimeBy(SENDING_QUEUE_RETRY_DELAY) + + assert(setEnableSendingQueueLambda) + .isCalledExactly(3) + .withSequence( + listOf(value(true)), + listOf(value(false)), + listOf(value(true)), + ) + } +} diff --git a/appnav/src/test/kotlin/io/element/android/appnav/loggedin/SendingQueueTest.kt b/appnav/src/test/kotlin/io/element/android/appnav/loggedin/SendingQueueTest.kt deleted file mode 100644 index a31ef0efc3..0000000000 --- a/appnav/src/test/kotlin/io/element/android/appnav/loggedin/SendingQueueTest.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2024 New Vector Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.element.android.appnav.loggedin - -import io.element.android.features.networkmonitor.api.NetworkStatus -import io.element.android.features.networkmonitor.test.FakeNetworkMonitor -import io.element.android.libraries.matrix.test.FakeMatrixClient -import io.element.android.tests.testutils.lambda.assert -import io.element.android.tests.testutils.lambda.lambdaRecorder -import io.element.android.tests.testutils.lambda.value -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.test.advanceTimeBy -import kotlinx.coroutines.test.runTest -import org.junit.Test - -@OptIn(ExperimentalCoroutinesApi::class) class SendingQueueTest { - private val matrixClient = FakeMatrixClient() - private val networkMonitor = FakeNetworkMonitor() - private val sut = SendingQueue(matrixClient, networkMonitor) - - @Test - fun `test network status online and sending queue is disabled`() = runTest { - val sendingQueueStatusFlow = MutableStateFlow(false) - val setEnableSendingQueueLambda = lambdaRecorder { _: Boolean -> } - matrixClient.sendingQueueStatusFlow = sendingQueueStatusFlow - matrixClient.setSendingQueueEnabledLambda = setEnableSendingQueueLambda - - sut.launchIn(backgroundScope) - - advanceTimeBy(SENDING_QUEUE_MAX_RETRY_DELAY) - sendingQueueStatusFlow.value = true - advanceTimeBy(SENDING_QUEUE_MAX_RETRY_DELAY) - - assert(setEnableSendingQueueLambda) - .isCalledExactly(2) - .withSequence( - listOf(value(true)), - listOf(value(true)) - ) - } - - @Test - fun `test network status getting offline and online`() = runTest { - val sendingQueueStatusFlow = MutableStateFlow(true) - val setEnableSendingQueueLambda = lambdaRecorder { _: Boolean -> } - matrixClient.sendingQueueStatusFlow = sendingQueueStatusFlow - matrixClient.setSendingQueueEnabledLambda = setEnableSendingQueueLambda - - sut.launchIn(backgroundScope) - advanceTimeBy(SENDING_QUEUE_MAX_RETRY_DELAY) - networkMonitor.connectivity.value = NetworkStatus.Offline - advanceTimeBy(SENDING_QUEUE_MAX_RETRY_DELAY) - networkMonitor.connectivity.value = NetworkStatus.Online - advanceTimeBy(SENDING_QUEUE_MAX_RETRY_DELAY) - - assert(setEnableSendingQueueLambda) - .isCalledExactly(3) - .withSequence( - listOf(value(true)), - listOf(value(false)), - listOf(value(true)), - ) - } -} diff --git a/libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/MatrixClient.kt b/libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/MatrixClient.kt index 6680726672..4dd00984f1 100644 --- a/libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/MatrixClient.kt +++ b/libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/MatrixClient.kt @@ -114,11 +114,11 @@ interface MatrixClient : Closeable { * so it's required to manually re-enable it as soon as * connectivity is back on the device. */ - suspend fun setSendingQueueEnabled(enabled: Boolean) + suspend fun setAllSendQueuesEnabled(enabled: Boolean) /** - * Returns the current status of the sending queue as a [StateFlow]. - * If true, the sending queue is enabled. + * Returns a flow of room IDs that have send queue being disabled. + * This flow will emit a new value whenever the send queue is disabled for a room. */ - fun sendingQueueStatus(): StateFlow + fun sendQueueDisabledFlow(): Flow } diff --git a/libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/room/MatrixRoom.kt b/libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/room/MatrixRoom.kt index 458b17209c..4cfc303974 100644 --- a/libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/room/MatrixRoom.kt +++ b/libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/room/MatrixRoom.kt @@ -335,5 +335,7 @@ interface MatrixRoom : Closeable { */ suspend fun sendCallNotificationIfNeeded(): Result + suspend fun setSendQueueEnabled(enabled: Boolean) + override fun close() = destroy() } diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt index a8186de574..ebb871541d 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt @@ -95,10 +95,11 @@ import kotlinx.coroutines.withTimeout import org.matrix.rustcomponents.sdk.BackupState import org.matrix.rustcomponents.sdk.Client import org.matrix.rustcomponents.sdk.ClientDelegate +import org.matrix.rustcomponents.sdk.ClientException import org.matrix.rustcomponents.sdk.IgnoredUsersListener import org.matrix.rustcomponents.sdk.NotificationProcessSetup import org.matrix.rustcomponents.sdk.PowerLevels -import org.matrix.rustcomponents.sdk.SendingQueueStatusListener +import org.matrix.rustcomponents.sdk.SendQueueRoomErrorListener import org.matrix.rustcomponents.sdk.TaskHandle import org.matrix.rustcomponents.sdk.use import timber.log.Timber @@ -554,20 +555,18 @@ class RustMatrixClient( }.distinctUntilChanged() } - override suspend fun setSendingQueueEnabled(enabled: Boolean) = withContext(sessionDispatcher) { - Timber.i("setSendingQueueEnabled($enabled)") - client.enableSendingQueue(enabled) + override suspend fun setAllSendQueuesEnabled(enabled: Boolean) = withContext(sessionDispatcher) { + Timber.i("setAllSendQueuesEnabled($enabled)") + client.enableAllSendQueues(enabled) } - override fun sendingQueueStatus(): StateFlow = mxCallbackFlow { - client.subscribeToSendingQueueStatus(object : SendingQueueStatusListener { - override fun onValue(newValue: Boolean) { - channel.trySend(newValue) + override fun sendQueueDisabledFlow(): Flow = mxCallbackFlow { + client.subscribeToSendQueueStatus(object : SendQueueRoomErrorListener { + override fun onError(roomId: String, error: ClientException) { + trySend(RoomId(roomId)) } }) - } - .buffer(Channel.UNLIMITED) - .stateIn(sessionCoroutineScope, started = SharingStarted.Eagerly, initialValue = true) + }.buffer(Channel.UNLIMITED) private suspend fun File.getCacheSize( includeCryptoDb: Boolean = false, diff --git a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt index 4ee8c220f1..28318540ce 100644 --- a/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt +++ b/libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt @@ -84,6 +84,7 @@ import org.matrix.rustcomponents.sdk.WidgetCapabilities import org.matrix.rustcomponents.sdk.WidgetCapabilitiesProvider import org.matrix.rustcomponents.sdk.getElementCallRequiredPermissions import org.matrix.rustcomponents.sdk.use +import timber.log.Timber import uniffi.matrix_sdk.RoomPowerLevelChanges import java.io.File import org.matrix.rustcomponents.sdk.Room as InnerRoom @@ -594,6 +595,11 @@ class RustMatrixRoom( innerRoom.sendCallNotificationIfNeeded() } + override suspend fun setSendQueueEnabled(enabled: Boolean) = withContext(roomDispatcher) { + Timber.d("setSendQueuesEnabled: $enabled") + innerRoom.enableSendQueue(enabled) + } + private fun createTimeline( timeline: InnerTimeline, isLive: Boolean, diff --git a/libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/FakeMatrixClient.kt b/libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/FakeMatrixClient.kt index 0f7fae3a91..8f17c3547d 100644 --- a/libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/FakeMatrixClient.kt +++ b/libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/FakeMatrixClient.kt @@ -54,8 +54,10 @@ import kotlinx.collections.immutable.ImmutableList import kotlinx.collections.immutable.persistentListOf import kotlinx.collections.immutable.toImmutableList import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.test.TestScope import java.util.Optional @@ -300,12 +302,12 @@ class FakeMatrixClient( override fun getRoomInfoFlow(roomId: RoomId) = getRoomInfoFlowLambda(roomId) - var setSendingQueueEnabledLambda = lambdaRecorder(ensureNeverCalled = true) { _: Boolean -> + var setAllSendQueuesEnabledLambda = lambdaRecorder(ensureNeverCalled = true) { _: Boolean -> // no-op } - override suspend fun setSendingQueueEnabled(enabled: Boolean) = setSendingQueueEnabledLambda(enabled) + override suspend fun setAllSendQueuesEnabled(enabled: Boolean) = setAllSendQueuesEnabledLambda(enabled) - var sendingQueueStatusFlow = MutableStateFlow(true) - override fun sendingQueueStatus(): StateFlow = sendingQueueStatusFlow + var sendQueueDisabledFlow = emptyFlow() + override fun sendQueueDisabledFlow(): Flow = sendQueueDisabledFlow } diff --git a/libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/room/FakeMatrixRoom.kt b/libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/room/FakeMatrixRoom.kt index 281c763c03..d5dddf3b7e 100644 --- a/libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/room/FakeMatrixRoom.kt +++ b/libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/room/FakeMatrixRoom.kt @@ -525,6 +525,9 @@ class FakeMatrixRoom( return sendCallNotificationIfNeededResult() } + var setSendQueueEnabledLambda = { _: Boolean -> } + override suspend fun setSendQueueEnabled(enabled: Boolean) = setSendQueueEnabledLambda(enabled) + override fun getWidgetDriver(widgetSettings: MatrixWidgetSettings): Result = getWidgetDriverResult fun givenRoomMembersState(state: MatrixRoomMembersState) {