|
|
|
@ -31,6 +31,7 @@ import io.element.android.libraries.matrix.impl.timeline.postprocessor.LoadingIn
@@ -31,6 +31,7 @@ import io.element.android.libraries.matrix.impl.timeline.postprocessor.LoadingIn
|
|
|
|
|
import io.element.android.libraries.matrix.impl.timeline.postprocessor.RoomBeginningPostProcessor |
|
|
|
|
import io.element.android.libraries.matrix.impl.timeline.postprocessor.TimelineEncryptedHistoryPostProcessor |
|
|
|
|
import io.element.android.services.toolbox.api.systemclock.SystemClock |
|
|
|
|
import kotlinx.coroutines.CancellationException |
|
|
|
|
import kotlinx.coroutines.CompletableDeferred |
|
|
|
|
import kotlinx.coroutines.CoroutineDispatcher |
|
|
|
|
import kotlinx.coroutines.CoroutineScope |
|
|
|
@ -38,15 +39,13 @@ import kotlinx.coroutines.coroutineScope
@@ -38,15 +39,13 @@ import kotlinx.coroutines.coroutineScope
|
|
|
|
|
import kotlinx.coroutines.ensureActive |
|
|
|
|
import kotlinx.coroutines.flow.Flow |
|
|
|
|
import kotlinx.coroutines.flow.MutableStateFlow |
|
|
|
|
import kotlinx.coroutines.flow.SharingStarted |
|
|
|
|
import kotlinx.coroutines.flow.StateFlow |
|
|
|
|
import kotlinx.coroutines.flow.combine |
|
|
|
|
import kotlinx.coroutines.flow.distinctUntilChanged |
|
|
|
|
import kotlinx.coroutines.flow.flowOf |
|
|
|
|
import kotlinx.coroutines.flow.getAndUpdate |
|
|
|
|
import kotlinx.coroutines.flow.launchIn |
|
|
|
|
import kotlinx.coroutines.flow.map |
|
|
|
|
import kotlinx.coroutines.flow.onEach |
|
|
|
|
import kotlinx.coroutines.flow.stateIn |
|
|
|
|
import kotlinx.coroutines.launch |
|
|
|
|
import kotlinx.coroutines.withContext |
|
|
|
|
import org.matrix.rustcomponents.sdk.TimelineDiff |
|
|
|
@ -58,6 +57,7 @@ import java.util.concurrent.atomic.AtomicBoolean
@@ -58,6 +57,7 @@ import java.util.concurrent.atomic.AtomicBoolean
|
|
|
|
|
import org.matrix.rustcomponents.sdk.Timeline as InnerTimeline |
|
|
|
|
|
|
|
|
|
private const val INITIAL_MAX_SIZE = 50 |
|
|
|
|
private const val PAGINATION_SIZE = 50 |
|
|
|
|
|
|
|
|
|
class RustTimeline( |
|
|
|
|
private val inner: InnerTimeline, |
|
|
|
@ -105,6 +105,14 @@ class RustTimeline(
@@ -105,6 +105,14 @@ class RustTimeline(
|
|
|
|
|
timelineItemFactory = timelineItemFactory, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
private val backPaginationStatus = MutableStateFlow( |
|
|
|
|
Timeline.PaginationStatus(isPaginating = false, hasMoreToLoad = true) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
private val forwardPaginationStatus = MutableStateFlow( |
|
|
|
|
Timeline.PaginationStatus(isPaginating = false, hasMoreToLoad = !isLive) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
init { |
|
|
|
|
roomCoroutineScope.launch(dispatcher) { |
|
|
|
|
inner.timelineDiffFlow { initialList -> |
|
|
|
@ -130,22 +138,34 @@ class RustTimeline(
@@ -130,22 +138,34 @@ class RustTimeline(
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private fun updatePaginationStatus(direction: Timeline.PaginationDirection, update: (Timeline.PaginationStatus)->Timeline.PaginationStatus){ |
|
|
|
|
when (direction) { |
|
|
|
|
Timeline.PaginationDirection.BACKWARDS -> backPaginationStatus.getAndUpdate(update) |
|
|
|
|
Timeline.PaginationDirection.FORWARDS -> forwardPaginationStatus.getAndUpdate(update) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
override suspend fun paginate(direction: Timeline.PaginationDirection): Result<Boolean> { |
|
|
|
|
initLatch.await() |
|
|
|
|
return runCatching { |
|
|
|
|
if (!canPaginate(direction)) throw TimelineException.CannotPaginate |
|
|
|
|
updatePaginationStatus(direction) { it.copy(isPaginating = true) } |
|
|
|
|
when (direction) { |
|
|
|
|
Timeline.PaginationDirection.BACKWARDS -> inner.paginateBackwards(50u) |
|
|
|
|
Timeline.PaginationDirection.FORWARDS -> inner.paginateForwards(50u) |
|
|
|
|
Timeline.PaginationDirection.BACKWARDS -> inner.paginateBackwards(PAGINATION_SIZE.toUShort()) |
|
|
|
|
Timeline.PaginationDirection.FORWARDS -> inner.paginateForwards(PAGINATION_SIZE.toUShort()) |
|
|
|
|
} |
|
|
|
|
}.onFailure { error -> |
|
|
|
|
updatePaginationStatus(direction) { it.copy(isPaginating = false) } |
|
|
|
|
if (error is CancellationException) { |
|
|
|
|
throw error |
|
|
|
|
} |
|
|
|
|
if (error is TimelineException.CannotPaginate) { |
|
|
|
|
Timber.d("Can't paginate $direction on room ${matrixRoom.roomId} with paginationStatus: ${backPaginationStatus.value}") |
|
|
|
|
} else { |
|
|
|
|
Timber.e(error, "Error paginating $direction on room ${matrixRoom.roomId}") |
|
|
|
|
} |
|
|
|
|
}.onSuccess { |
|
|
|
|
Timber.v("Success paginating $direction for room ${matrixRoom.roomId}") |
|
|
|
|
}.onSuccess { hasReachedEnd -> |
|
|
|
|
updatePaginationStatus(direction) { it.copy(isPaginating = false, hasMoreToLoad = !hasReachedEnd) } |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -164,20 +184,6 @@ class RustTimeline(
@@ -164,20 +184,6 @@ class RustTimeline(
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private val backPaginationStatus: StateFlow<Timeline.PaginationStatus> = inner |
|
|
|
|
.backPaginationStatusFlow() |
|
|
|
|
.map() |
|
|
|
|
.stateIn(roomCoroutineScope, SharingStarted.Eagerly, Timeline.PaginationStatus(isPaginating = false, hasMoreToLoad = true)) |
|
|
|
|
|
|
|
|
|
private val forwardPaginationStatus: StateFlow<Timeline.PaginationStatus> = |
|
|
|
|
when (isLive) { |
|
|
|
|
true -> MutableStateFlow(Timeline.PaginationStatus(isPaginating = false, hasMoreToLoad = false)) |
|
|
|
|
false -> inner |
|
|
|
|
.forwardPaginationStatusFlow() |
|
|
|
|
.map() |
|
|
|
|
.stateIn(roomCoroutineScope, SharingStarted.Eagerly, Timeline.PaginationStatus(isPaginating = false, hasMoreToLoad = true)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
override val timelineItems: Flow<List<MatrixTimelineItem>> = combine( |
|
|
|
|
_timelineItems, |
|
|
|
|
backPaginationStatus.map { it.hasMoreToLoad }.distinctUntilChanged(), |
|
|
|
|