@ -23,7 +23,6 @@ import io.element.android.libraries.matrix.api.core.ProgressCallback
@@ -23,7 +23,6 @@ import io.element.android.libraries.matrix.api.core.ProgressCallback
import io.element.android.libraries.matrix.api.core.RoomId
import io.element.android.libraries.matrix.api.core.SessionId
import io.element.android.libraries.matrix.api.core.UserId
import io.element.android.libraries.matrix.api.room.location.AssetType
import io.element.android.libraries.matrix.api.media.AudioInfo
import io.element.android.libraries.matrix.api.media.FileInfo
import io.element.android.libraries.matrix.api.media.ImageInfo
@ -32,17 +31,19 @@ import io.element.android.libraries.matrix.api.room.MatrixRoom
@@ -32,17 +31,19 @@ import io.element.android.libraries.matrix.api.room.MatrixRoom
import io.element.android.libraries.matrix.api.room.MatrixRoomMembersState
import io.element.android.libraries.matrix.api.room.MessageEventType
import io.element.android.libraries.matrix.api.room.StateEventType
import io.element.android.libraries.matrix.api.room.location.AssetType
import io.element.android.libraries.matrix.api.room.roomMembers
import io.element.android.libraries.matrix.api.timeline.MatrixTimeline
import io.element.android.libraries.matrix.api.timeline.item.event.EventType
import io.element.android.libraries.matrix.impl.core.toProgressWatcher
import io.element.android.libraries.matrix.impl.room.location.toInner
import io.element.android.libraries.matrix.impl.media.map
import io.element.android.libraries.matrix.impl.room.location.toInner
import io.element.android.libraries.matrix.impl.timeline.RustMatrixTimeline
import io.element.android.libraries.matrix.impl.timeline.backPaginationStatusFlow
import io.element.android.libraries.matrix.impl.timeline.timelineDiffFlow
import io.element.android.services.toolbox.api.systemclock.SystemClock
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
@ -62,6 +63,7 @@ import org.matrix.rustcomponents.sdk.messageEventContentFromMarkdown
@@ -62,6 +63,7 @@ import org.matrix.rustcomponents.sdk.messageEventContentFromMarkdown
import timber.log.Timber
import java.io.File
@OptIn ( ExperimentalCoroutinesApi :: class )
class RustMatrixRoom (
override val sessionId : SessionId ,
private val roomListItem : RoomListItem ,
@ -74,6 +76,11 @@ class RustMatrixRoom(
@@ -74,6 +76,11 @@ class RustMatrixRoom(
override val roomId = RoomId ( innerRoom . id ( ) )
// Create a dispatcher for all room methods...
private val roomDispatcher = coroutineDispatchers . io . limitedParallelism ( 32 )
//...except getMember methods as it could quickly fill the roomDispatcher...
private val roomMembersDispatcher = coroutineDispatchers . io . limitedParallelism ( 8 )
private val roomCoroutineScope = sessionCoroutineScope . childScope ( coroutineDispatchers . main , " RoomScope- $roomId " )
private val _membersStateFlow = MutableStateFlow < MatrixRoomMembersState > ( MatrixRoomMembersState . Unknown )
private val isInit = MutableStateFlow ( false )
@ -83,7 +90,7 @@ class RustMatrixRoom(
@@ -83,7 +90,7 @@ class RustMatrixRoom(
matrixRoom = this ,
innerRoom = innerRoom ,
roomCoroutineScope = roomCoroutineScope ,
coroutineDispatchers = coroutineDispatchers
dispatcher = roomDispatcher
)
}
@ -105,7 +112,7 @@ class RustMatrixRoom(
@@ -105,7 +112,7 @@ class RustMatrixRoom(
timelineLimit = null
)
roomListItem . subscribe ( settings )
roomCoroutineScope . launch ( coroutineDispatchers . computation ) {
roomCoroutineScope . launch ( roomDispatcher ) {
innerRoom . timelineDiffFlow { initialList ->
_timeline . postItems ( initialList )
} . onEach {
@ -175,7 +182,7 @@ class RustMatrixRoom(
@@ -175,7 +182,7 @@ class RustMatrixRoom(
override val activeMemberCount : Long
get ( ) = innerRoom . activeMembersCount ( ) . toLong ( )
override suspend fun updateMembers ( ) : Result < Unit > = withContext ( coroutineDispatchers . io ) {
override suspend fun updateMembers ( ) : Result < Unit > = withContext ( roomMembersDispatcher ) {
val currentState = _membersStateFlow . value
val currentMembers = currentState . roomMembers ( )
_membersStateFlow . value = MatrixRoomMembersState . Pending ( prevRoomMembers = currentMembers )
@ -189,20 +196,20 @@ class RustMatrixRoom(
@@ -189,20 +196,20 @@ class RustMatrixRoom(
}
override suspend fun userDisplayName ( userId : UserId ) : Result < String ? > =
withContext ( coroutineDispatchers . io ) {
withContext ( roomDispatcher ) {
runCatching {
innerRoom . memberDisplayName ( userId . value )
}
}
override suspend fun userAvatarUrl ( userId : UserId ) : Result < String ? > =
withContext ( coroutineDispatchers . io ) {
withContext ( roomDispatcher ) {
runCatching {
innerRoom . memberAvatarUrl ( userId . value )
}
}
override suspend fun sendMessage ( message : String ) : Result < Unit > = withContext ( coroutineDispatchers . io ) {
override suspend fun sendMessage ( message : String ) : Result < Unit > = withContext ( roomDispatcher ) {
val transactionId = genTransactionId ( )
messageEventContentFromMarkdown ( message ) . use { content ->
runCatching {
@ -211,7 +218,7 @@ class RustMatrixRoom(
@@ -211,7 +218,7 @@ class RustMatrixRoom(
}
}
override suspend fun editMessage ( originalEventId : EventId ? , transactionId : String ? , message : String ) : Result < Unit > = withContext ( coroutineDispatchers . io ) {
override suspend fun editMessage ( originalEventId : EventId ? , transactionId : String ? , message : String ) : Result < Unit > = withContext ( roomDispatcher ) {
if ( originalEventId != null ) {
runCatching {
innerRoom . edit ( /* TODO use content */ message , originalEventId . value , transactionId )
@ -224,7 +231,7 @@ class RustMatrixRoom(
@@ -224,7 +231,7 @@ class RustMatrixRoom(
}
}
override suspend fun replyMessage ( eventId : EventId , message : String ) : Result < Unit > = withContext ( coroutineDispatchers . io ) {
override suspend fun replyMessage ( eventId : EventId , message : String ) : Result < Unit > = withContext ( roomDispatcher ) {
val transactionId = genTransactionId ( )
// val content = messageEventContentFromMarkdown(message)
runCatching {
@ -232,50 +239,50 @@ class RustMatrixRoom(
@@ -232,50 +239,50 @@ class RustMatrixRoom(
}
}
override suspend fun redactEvent ( eventId : EventId , reason : String ? ) = withContext ( coroutineDispatchers . io ) {
override suspend fun redactEvent ( eventId : EventId , reason : String ? ) = withContext ( roomDispatcher ) {
val transactionId = genTransactionId ( )
runCatching {
innerRoom . redact ( eventId . value , reason , transactionId )
}
}
override suspend fun leave ( ) : Result < Unit > = withContext ( coroutineDispatchers . io ) {
override suspend fun leave ( ) : Result < Unit > = withContext ( roomDispatcher ) {
runCatching {
innerRoom . leave ( )
}
}
override suspend fun acceptInvitation ( ) : Result < Unit > = withContext ( coroutineDispatchers . io ) {
override suspend fun acceptInvitation ( ) : Result < Unit > = withContext ( roomDispatcher ) {
runCatching {
innerRoom . acceptInvitation ( )
}
}
override suspend fun rejectInvitation ( ) : Result < Unit > = withContext ( coroutineDispatchers . io ) {
override suspend fun rejectInvitation ( ) : Result < Unit > = withContext ( roomDispatcher ) {
runCatching {
innerRoom . rejectInvitation ( )
}
}
override suspend fun inviteUserById ( id : UserId ) : Result < Unit > = withContext ( coroutineDispatchers . io ) {
override suspend fun inviteUserById ( id : UserId ) : Result < Unit > = withContext ( roomDispatcher ) {
runCatching {
innerRoom . inviteUserById ( id . value )
}
}
override suspend fun canInvite ( ) : Result < Boolean > = withContext ( coroutineDispatchers . io ) {
override suspend fun canInvite ( ) : Result < Boolean > = withContext ( roomMembersDispatcher ) {
runCatching {
innerRoom . member ( sessionId . value ) . use ( RoomMember :: canInvite )
}
}
override suspend fun canSendStateEvent ( type : StateEventType ) : Result < Boolean > = withContext ( coroutineDispatchers . io ) {
override suspend fun canSendStateEvent ( type : StateEventType ) : Result < Boolean > = withContext ( roomMembersDispatcher ) {
runCatching {
innerRoom . member ( sessionId . value ) . use { it . canSendState ( type . map ( ) ) }
}
}
override suspend fun canSendEvent ( type : MessageEventType ) : Result < Boolean > = withContext ( coroutineDispatchers . io ) {
override suspend fun canSendEvent ( type : MessageEventType ) : Result < Boolean > = withContext ( roomMembersDispatcher ) {
runCatching {
innerRoom . member ( sessionId . value ) . use { it . canSendMessage ( type . map ( ) ) }
}
@ -305,13 +312,13 @@ class RustMatrixRoom(
@@ -305,13 +312,13 @@ class RustMatrixRoom(
}
}
override suspend fun toggleReaction ( emoji : String , eventId : EventId ) : Result < Unit > = withContext ( coroutineDispatchers . io ) {
override suspend fun toggleReaction ( emoji : String , eventId : EventId ) : Result < Unit > = withContext ( roomDispatcher ) {
runCatching {
innerRoom . toggleReaction ( key = emoji , eventId = eventId . value )
}
}
override suspend fun forwardEvent ( eventId : EventId , roomIds : List < RoomId > ) : Result < Unit > = withContext ( coroutineDispatchers . io ) {
override suspend fun forwardEvent ( eventId : EventId , roomIds : List < RoomId > ) : Result < Unit > = withContext ( roomDispatcher ) {
runCatching {
roomContentForwarder . forward ( fromRoom = innerRoom , eventId = eventId , toRoomIds = roomIds )
} . onFailure {
@ -320,14 +327,14 @@ class RustMatrixRoom(
@@ -320,14 +327,14 @@ class RustMatrixRoom(
}
override suspend fun retrySendMessage ( transactionId : String ) : Result < Unit > =
withContext ( coroutineDispatchers . io ) {
withContext ( roomDispatcher ) {
runCatching {
innerRoom . retrySend ( transactionId )
}
}
override suspend fun cancelSend ( transactionId : String ) : Result < Unit > =
withContext ( coroutineDispatchers . io ) {
withContext ( roomDispatcher ) {
runCatching {
innerRoom . cancelSend ( transactionId )
}
@ -335,40 +342,40 @@ class RustMatrixRoom(
@@ -335,40 +342,40 @@ class RustMatrixRoom(
@OptIn ( ExperimentalUnsignedTypes :: class )
override suspend fun updateAvatar ( mimeType : String , data : ByteArray ) : Result < Unit > =
withContext ( coroutineDispatchers . io ) {
withContext ( roomDispatcher ) {
runCatching {
innerRoom . uploadAvatar ( mimeType , data . toUByteArray ( ) . toList ( ) )
}
}
override suspend fun removeAvatar ( ) : Result < Unit > =
withContext ( coroutineDispatchers . io ) {
withContext ( roomDispatcher ) {
runCatching {
innerRoom . removeAvatar ( )
}
}
override suspend fun setName ( name : String ) : Result < Unit > =
withContext ( coroutineDispatchers . io ) {
withContext ( roomDispatcher ) {
runCatching {
innerRoom . setName ( name )
}
}
override suspend fun setTopic ( topic : String ) : Result < Unit > =
withContext ( coroutineDispatchers . io ) {
withContext ( roomDispatcher ) {
runCatching {
innerRoom . setTopic ( topic )
}
}
private suspend fun fetchMembers ( ) = withContext ( coroutineDispatchers . io ) {
private suspend fun fetchMembers ( ) = withContext ( roomDispatcher ) {
runCatching {
innerRoom . fetchMembers ( )
}
}
override suspend fun reportContent ( eventId : EventId , reason : String , blockUserId : UserId ? ) : Result < Unit > = withContext ( coroutineDispatchers . io ) {
override suspend fun reportContent ( eventId : EventId , reason : String , blockUserId : UserId ? ) : Result < Unit > = withContext ( roomDispatcher ) {
runCatching {
innerRoom . reportContent ( eventId = eventId . value , score = null , reason = reason )
if ( blockUserId != null ) {
@ -383,7 +390,7 @@ class RustMatrixRoom(
@@ -383,7 +390,7 @@ class RustMatrixRoom(
description : String ? ,
zoomLevel : Int ? ,
assetType : AssetType ? ,
) : Result < Unit > = withContext ( coroutineDispatchers . io ) {
) : Result < Unit > = withContext ( roomDispatcher ) {
runCatching {
innerRoom . sendLocation (
body = body ,