Browse Source

Sending queue : adjust to match the latest rust api

pull/3023/head
ganfra 3 months ago
parent
commit
9250745333
  1. 4
      appnav/src/main/kotlin/io/element/android/appnav/LoggedInFlowNode.kt
  2. 41
      appnav/src/main/kotlin/io/element/android/appnav/loggedin/SendQueues.kt
  3. 118
      appnav/src/test/kotlin/io/element/android/appnav/loggedin/SendQueuesTest.kt
  4. 79
      appnav/src/test/kotlin/io/element/android/appnav/loggedin/SendingQueueTest.kt
  5. 8
      libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/MatrixClient.kt
  6. 2
      libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/room/MatrixRoom.kt
  7. 21
      libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt
  8. 6
      libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt
  9. 10
      libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/FakeMatrixClient.kt
  10. 3
      libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/room/FakeMatrixRoom.kt

4
appnav/src/main/kotlin/io/element/android/appnav/LoggedInFlowNode.kt

@ -41,7 +41,7 @@ import dagger.assisted.AssistedInject
import im.vector.app.features.analytics.plan.JoinedRoom import im.vector.app.features.analytics.plan.JoinedRoom
import io.element.android.anvilannotations.ContributesNode import io.element.android.anvilannotations.ContributesNode
import io.element.android.appnav.loggedin.LoggedInNode import io.element.android.appnav.loggedin.LoggedInNode
import io.element.android.appnav.loggedin.SendingQueue import io.element.android.appnav.loggedin.SendQueues
import io.element.android.appnav.room.RoomFlowNode import io.element.android.appnav.room.RoomFlowNode
import io.element.android.appnav.room.RoomNavigationTarget import io.element.android.appnav.room.RoomNavigationTarget
import io.element.android.appnav.room.joined.JoinedRoomLoadedFlowNode import io.element.android.appnav.room.joined.JoinedRoomLoadedFlowNode
@ -103,7 +103,7 @@ class LoggedInFlowNode @AssistedInject constructor(
private val roomDirectoryEntryPoint: RoomDirectoryEntryPoint, private val roomDirectoryEntryPoint: RoomDirectoryEntryPoint,
private val shareEntryPoint: ShareEntryPoint, private val shareEntryPoint: ShareEntryPoint,
private val matrixClient: MatrixClient, private val matrixClient: MatrixClient,
private val sendingQueue: SendingQueue, private val sendingQueue: SendQueues,
snackbarDispatcher: SnackbarDispatcher, snackbarDispatcher: SnackbarDispatcher,
) : BaseFlowNode<LoggedInFlowNode.NavTarget>( ) : BaseFlowNode<LoggedInFlowNode.NavTarget>(
backstack = BackStack( backstack = BackStack(

41
appnav/src/main/kotlin/io/element/android/appnav/loggedin/SendingQueue.kt → appnav/src/main/kotlin/io/element/android/appnav/loggedin/SendQueues.kt

@ -22,44 +22,39 @@ import io.element.android.features.networkmonitor.api.NetworkStatus
import io.element.android.libraries.di.SessionScope import io.element.android.libraries.di.SessionScope
import io.element.android.libraries.di.SingleIn import io.element.android.libraries.di.SingleIn
import io.element.android.libraries.matrix.api.MatrixClient import io.element.android.libraries.matrix.api.MatrixClient
import io.element.android.libraries.matrix.api.core.RoomId
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import timber.log.Timber import timber.log.Timber
import java.util.concurrent.atomic.AtomicInteger
import javax.inject.Inject import javax.inject.Inject
private const val SENDING_QUEUE_MIN_RETRY_DELAY = 250L
@VisibleForTesting @VisibleForTesting
const val SENDING_QUEUE_MAX_RETRY_DELAY = 5000L const val SENDING_QUEUE_RETRY_DELAY = 1500L
@SingleIn(SessionScope::class) @SingleIn(SessionScope::class)
class SendingQueue @Inject constructor( class SendQueues @Inject constructor(
private val matrixClient: MatrixClient, private val matrixClient: MatrixClient,
private val networkMonitor: NetworkMonitor, private val networkMonitor: NetworkMonitor,
) { ) {
private val retryCount = AtomicInteger(0)
fun launchIn(coroutineScope: CoroutineScope) { fun launchIn(coroutineScope: CoroutineScope) {
combine( networkMonitor.connectivity
networkMonitor.connectivity, .onEach { networkStatus ->
matrixClient.sendingQueueStatus(), matrixClient.setAllSendQueuesEnabled(enabled = networkStatus == NetworkStatus.Online)
) { networkStatus, isSendingQueueEnabled ->
Pair(networkStatus, isSendingQueueEnabled)
}.onEach { (networkStatus, isSendingQueueEnabled) ->
Timber.d("Network status: $networkStatus, isSendingQueueEnabled: $isSendingQueueEnabled")
if (networkStatus == NetworkStatus.Online && !isSendingQueueEnabled) {
val retryDelay =
(SENDING_QUEUE_MIN_RETRY_DELAY * retryCount.incrementAndGet()).coerceIn(SENDING_QUEUE_MIN_RETRY_DELAY, SENDING_QUEUE_MAX_RETRY_DELAY)
Timber.d("Retry enabling sending queue in $retryDelay ms")
delay(retryDelay)
} else {
retryCount.set(0)
} }
matrixClient.setSendingQueueEnabled(enabled = networkStatus == NetworkStatus.Online) .launchIn(coroutineScope)
}.launchIn(coroutineScope)
matrixClient.sendQueueDisabledFlow()
.onEach { roomId: RoomId ->
Timber.d("Send queue disabled for room $roomId")
if (networkMonitor.connectivity.value == NetworkStatus.Online) {
delay(SENDING_QUEUE_RETRY_DELAY)
matrixClient.getRoom(roomId)?.use { room ->
room.setSendQueueEnabled(enabled = true)
}
}
}.launchIn(coroutineScope)
} }
} }

118
appnav/src/test/kotlin/io/element/android/appnav/loggedin/SendQueuesTest.kt

@ -0,0 +1,118 @@
/*
* Copyright (c) 2024 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.element.android.appnav.loggedin
import io.element.android.features.networkmonitor.api.NetworkStatus
import io.element.android.features.networkmonitor.test.FakeNetworkMonitor
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.test.FakeMatrixClient
import io.element.android.libraries.matrix.test.room.FakeMatrixRoom
import io.element.android.tests.testutils.lambda.assert
import io.element.android.tests.testutils.lambda.lambdaRecorder
import io.element.android.tests.testutils.lambda.value
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.runCurrent
import kotlinx.coroutines.test.runTest
import org.junit.Test
@OptIn(ExperimentalCoroutinesApi::class) class SendQueuesTest {
private val matrixClient = FakeMatrixClient()
private val room = FakeMatrixRoom()
private val networkMonitor = FakeNetworkMonitor()
private val sut = SendQueues(matrixClient, networkMonitor)
@Test
fun `test network status online and sending queue failed`() = runTest {
val sendQueueDisabledFlow = MutableSharedFlow<RoomId>(replay = 1)
val setAllSendQueuesEnabledLambda = lambdaRecorder { _: Boolean -> }
matrixClient.sendQueueDisabledFlow = sendQueueDisabledFlow
matrixClient.setAllSendQueuesEnabledLambda = setAllSendQueuesEnabledLambda
matrixClient.givenGetRoomResult(room.roomId, room)
val setRoomSendQueueEnabledLambda = lambdaRecorder { _: Boolean -> }
room.setSendQueueEnabledLambda = setRoomSendQueueEnabledLambda
sut.launchIn(backgroundScope)
sendQueueDisabledFlow.emit(room.roomId)
advanceTimeBy(SENDING_QUEUE_RETRY_DELAY)
runCurrent()
assert(setAllSendQueuesEnabledLambda)
.isCalledOnce()
.with(value(true))
assert(setRoomSendQueueEnabledLambda)
.isCalledOnce()
.with(value(true))
}
@Test
fun `test network status offline and sending queue failed`() = runTest {
val sendQueueDisabledFlow = MutableSharedFlow<RoomId>(replay = 1)
val setAllSendQueuesEnabledLambda = lambdaRecorder { _: Boolean -> }
matrixClient.sendQueueDisabledFlow = sendQueueDisabledFlow
matrixClient.setAllSendQueuesEnabledLambda = setAllSendQueuesEnabledLambda
networkMonitor.connectivity.value = NetworkStatus.Offline
matrixClient.givenGetRoomResult(room.roomId, room)
val setRoomSendQueueEnabledLambda = lambdaRecorder { _: Boolean -> }
room.setSendQueueEnabledLambda = setRoomSendQueueEnabledLambda
sut.launchIn(backgroundScope)
sendQueueDisabledFlow.emit(room.roomId)
advanceTimeBy(SENDING_QUEUE_RETRY_DELAY)
runCurrent()
assert(setAllSendQueuesEnabledLambda)
.isCalledOnce()
.with(value(false))
assert(setRoomSendQueueEnabledLambda)
.isNeverCalled()
}
@Test
fun `test network status getting offline and online`() = runTest {
val setEnableSendingQueueLambda = lambdaRecorder { _: Boolean -> }
matrixClient.setAllSendQueuesEnabledLambda = setEnableSendingQueueLambda
sut.launchIn(backgroundScope)
advanceTimeBy(SENDING_QUEUE_RETRY_DELAY)
networkMonitor.connectivity.value = NetworkStatus.Offline
advanceTimeBy(SENDING_QUEUE_RETRY_DELAY)
networkMonitor.connectivity.value = NetworkStatus.Online
advanceTimeBy(SENDING_QUEUE_RETRY_DELAY)
assert(setEnableSendingQueueLambda)
.isCalledExactly(3)
.withSequence(
listOf(value(true)),
listOf(value(false)),
listOf(value(true)),
)
}
}

79
appnav/src/test/kotlin/io/element/android/appnav/loggedin/SendingQueueTest.kt

@ -1,79 +0,0 @@
/*
* Copyright (c) 2024 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.element.android.appnav.loggedin
import io.element.android.features.networkmonitor.api.NetworkStatus
import io.element.android.features.networkmonitor.test.FakeNetworkMonitor
import io.element.android.libraries.matrix.test.FakeMatrixClient
import io.element.android.tests.testutils.lambda.assert
import io.element.android.tests.testutils.lambda.lambdaRecorder
import io.element.android.tests.testutils.lambda.value
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.runTest
import org.junit.Test
@OptIn(ExperimentalCoroutinesApi::class) class SendingQueueTest {
private val matrixClient = FakeMatrixClient()
private val networkMonitor = FakeNetworkMonitor()
private val sut = SendingQueue(matrixClient, networkMonitor)
@Test
fun `test network status online and sending queue is disabled`() = runTest {
val sendingQueueStatusFlow = MutableStateFlow(false)
val setEnableSendingQueueLambda = lambdaRecorder { _: Boolean -> }
matrixClient.sendingQueueStatusFlow = sendingQueueStatusFlow
matrixClient.setSendingQueueEnabledLambda = setEnableSendingQueueLambda
sut.launchIn(backgroundScope)
advanceTimeBy(SENDING_QUEUE_MAX_RETRY_DELAY)
sendingQueueStatusFlow.value = true
advanceTimeBy(SENDING_QUEUE_MAX_RETRY_DELAY)
assert(setEnableSendingQueueLambda)
.isCalledExactly(2)
.withSequence(
listOf(value(true)),
listOf(value(true))
)
}
@Test
fun `test network status getting offline and online`() = runTest {
val sendingQueueStatusFlow = MutableStateFlow(true)
val setEnableSendingQueueLambda = lambdaRecorder { _: Boolean -> }
matrixClient.sendingQueueStatusFlow = sendingQueueStatusFlow
matrixClient.setSendingQueueEnabledLambda = setEnableSendingQueueLambda
sut.launchIn(backgroundScope)
advanceTimeBy(SENDING_QUEUE_MAX_RETRY_DELAY)
networkMonitor.connectivity.value = NetworkStatus.Offline
advanceTimeBy(SENDING_QUEUE_MAX_RETRY_DELAY)
networkMonitor.connectivity.value = NetworkStatus.Online
advanceTimeBy(SENDING_QUEUE_MAX_RETRY_DELAY)
assert(setEnableSendingQueueLambda)
.isCalledExactly(3)
.withSequence(
listOf(value(true)),
listOf(value(false)),
listOf(value(true)),
)
}
}

8
libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/MatrixClient.kt

@ -114,11 +114,11 @@ interface MatrixClient : Closeable {
* so it's required to manually re-enable it as soon as * so it's required to manually re-enable it as soon as
* connectivity is back on the device. * connectivity is back on the device.
*/ */
suspend fun setSendingQueueEnabled(enabled: Boolean) suspend fun setAllSendQueuesEnabled(enabled: Boolean)
/** /**
* Returns the current status of the sending queue as a [StateFlow]. * Returns a flow of room IDs that have send queue being disabled.
* If true, the sending queue is enabled. * This flow will emit a new value whenever the send queue is disabled for a room.
*/ */
fun sendingQueueStatus(): StateFlow<Boolean> fun sendQueueDisabledFlow(): Flow<RoomId>
} }

2
libraries/matrix/api/src/main/kotlin/io/element/android/libraries/matrix/api/room/MatrixRoom.kt

@ -335,5 +335,7 @@ interface MatrixRoom : Closeable {
*/ */
suspend fun sendCallNotificationIfNeeded(): Result<Unit> suspend fun sendCallNotificationIfNeeded(): Result<Unit>
suspend fun setSendQueueEnabled(enabled: Boolean)
override fun close() = destroy() override fun close() = destroy()
} }

21
libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/RustMatrixClient.kt

@ -95,10 +95,11 @@ import kotlinx.coroutines.withTimeout
import org.matrix.rustcomponents.sdk.BackupState import org.matrix.rustcomponents.sdk.BackupState
import org.matrix.rustcomponents.sdk.Client import org.matrix.rustcomponents.sdk.Client
import org.matrix.rustcomponents.sdk.ClientDelegate import org.matrix.rustcomponents.sdk.ClientDelegate
import org.matrix.rustcomponents.sdk.ClientException
import org.matrix.rustcomponents.sdk.IgnoredUsersListener import org.matrix.rustcomponents.sdk.IgnoredUsersListener
import org.matrix.rustcomponents.sdk.NotificationProcessSetup import org.matrix.rustcomponents.sdk.NotificationProcessSetup
import org.matrix.rustcomponents.sdk.PowerLevels import org.matrix.rustcomponents.sdk.PowerLevels
import org.matrix.rustcomponents.sdk.SendingQueueStatusListener import org.matrix.rustcomponents.sdk.SendQueueRoomErrorListener
import org.matrix.rustcomponents.sdk.TaskHandle import org.matrix.rustcomponents.sdk.TaskHandle
import org.matrix.rustcomponents.sdk.use import org.matrix.rustcomponents.sdk.use
import timber.log.Timber import timber.log.Timber
@ -554,20 +555,18 @@ class RustMatrixClient(
}.distinctUntilChanged() }.distinctUntilChanged()
} }
override suspend fun setSendingQueueEnabled(enabled: Boolean) = withContext(sessionDispatcher) { override suspend fun setAllSendQueuesEnabled(enabled: Boolean) = withContext(sessionDispatcher) {
Timber.i("setSendingQueueEnabled($enabled)") Timber.i("setAllSendQueuesEnabled($enabled)")
client.enableSendingQueue(enabled) client.enableAllSendQueues(enabled)
} }
override fun sendingQueueStatus(): StateFlow<Boolean> = mxCallbackFlow { override fun sendQueueDisabledFlow(): Flow<RoomId> = mxCallbackFlow {
client.subscribeToSendingQueueStatus(object : SendingQueueStatusListener { client.subscribeToSendQueueStatus(object : SendQueueRoomErrorListener {
override fun onValue(newValue: Boolean) { override fun onError(roomId: String, error: ClientException) {
channel.trySend(newValue) trySend(RoomId(roomId))
} }
}) })
} }.buffer(Channel.UNLIMITED)
.buffer(Channel.UNLIMITED)
.stateIn(sessionCoroutineScope, started = SharingStarted.Eagerly, initialValue = true)
private suspend fun File.getCacheSize( private suspend fun File.getCacheSize(
includeCryptoDb: Boolean = false, includeCryptoDb: Boolean = false,

6
libraries/matrix/impl/src/main/kotlin/io/element/android/libraries/matrix/impl/room/RustMatrixRoom.kt

@ -84,6 +84,7 @@ import org.matrix.rustcomponents.sdk.WidgetCapabilities
import org.matrix.rustcomponents.sdk.WidgetCapabilitiesProvider import org.matrix.rustcomponents.sdk.WidgetCapabilitiesProvider
import org.matrix.rustcomponents.sdk.getElementCallRequiredPermissions import org.matrix.rustcomponents.sdk.getElementCallRequiredPermissions
import org.matrix.rustcomponents.sdk.use import org.matrix.rustcomponents.sdk.use
import timber.log.Timber
import uniffi.matrix_sdk.RoomPowerLevelChanges import uniffi.matrix_sdk.RoomPowerLevelChanges
import java.io.File import java.io.File
import org.matrix.rustcomponents.sdk.Room as InnerRoom import org.matrix.rustcomponents.sdk.Room as InnerRoom
@ -594,6 +595,11 @@ class RustMatrixRoom(
innerRoom.sendCallNotificationIfNeeded() innerRoom.sendCallNotificationIfNeeded()
} }
override suspend fun setSendQueueEnabled(enabled: Boolean) = withContext(roomDispatcher) {
Timber.d("setSendQueuesEnabled: $enabled")
innerRoom.enableSendQueue(enabled)
}
private fun createTimeline( private fun createTimeline(
timeline: InnerTimeline, timeline: InnerTimeline,
isLive: Boolean, isLive: Boolean,

10
libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/FakeMatrixClient.kt

@ -54,8 +54,10 @@ import kotlinx.collections.immutable.ImmutableList
import kotlinx.collections.immutable.persistentListOf import kotlinx.collections.immutable.persistentListOf
import kotlinx.collections.immutable.toImmutableList import kotlinx.collections.immutable.toImmutableList
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.TestScope
import java.util.Optional import java.util.Optional
@ -300,12 +302,12 @@ class FakeMatrixClient(
override fun getRoomInfoFlow(roomId: RoomId) = getRoomInfoFlowLambda(roomId) override fun getRoomInfoFlow(roomId: RoomId) = getRoomInfoFlowLambda(roomId)
var setSendingQueueEnabledLambda = lambdaRecorder(ensureNeverCalled = true) { _: Boolean -> var setAllSendQueuesEnabledLambda = lambdaRecorder(ensureNeverCalled = true) { _: Boolean ->
// no-op // no-op
} }
override suspend fun setSendingQueueEnabled(enabled: Boolean) = setSendingQueueEnabledLambda(enabled) override suspend fun setAllSendQueuesEnabled(enabled: Boolean) = setAllSendQueuesEnabledLambda(enabled)
var sendingQueueStatusFlow = MutableStateFlow(true) var sendQueueDisabledFlow = emptyFlow<RoomId>()
override fun sendingQueueStatus(): StateFlow<Boolean> = sendingQueueStatusFlow override fun sendQueueDisabledFlow(): Flow<RoomId> = sendQueueDisabledFlow
} }

3
libraries/matrix/test/src/main/kotlin/io/element/android/libraries/matrix/test/room/FakeMatrixRoom.kt

@ -525,6 +525,9 @@ class FakeMatrixRoom(
return sendCallNotificationIfNeededResult() return sendCallNotificationIfNeededResult()
} }
var setSendQueueEnabledLambda = { _: Boolean -> }
override suspend fun setSendQueueEnabled(enabled: Boolean) = setSendQueueEnabledLambda(enabled)
override fun getWidgetDriver(widgetSettings: MatrixWidgetSettings): Result<MatrixWidgetDriver> = getWidgetDriverResult override fun getWidgetDriver(widgetSettings: MatrixWidgetSettings): Result<MatrixWidgetDriver> = getWidgetDriverResult
fun givenRoomMembersState(state: MatrixRoomMembersState) { fun givenRoomMembersState(state: MatrixRoomMembersState) {

Loading…
Cancel
Save