From ef74e54fea44a55bf4b72595197a85c6c41cc88f Mon Sep 17 00:00:00 2001 From: ganfra Date: Mon, 7 Nov 2022 18:42:07 +0100 Subject: [PATCH] Add chunk operator on flow --- .../x/core/data/flow/TimingOperators.kt | 86 +++++++++++++++++++ .../x/matrix/room/RoomSummaryDataSource.kt | 8 +- .../x/matrix/timeline/MatrixTimeline.kt | 10 ++- 3 files changed, 99 insertions(+), 5 deletions(-) diff --git a/libraries/core/src/main/java/io/element/android/x/core/data/flow/TimingOperators.kt b/libraries/core/src/main/java/io/element/android/x/core/data/flow/TimingOperators.kt index cfa9f146c1..67e187b746 100644 --- a/libraries/core/src/main/java/io/element/android/x/core/data/flow/TimingOperators.kt +++ b/libraries/core/src/main/java/io/element/android/x/core/data/flow/TimingOperators.kt @@ -1,2 +1,88 @@ package io.element.android.x.core.data.flow +import android.os.SystemClock +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ClosedReceiveChannelException +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.produce +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.isActive +import kotlinx.coroutines.selects.select + +@ExperimentalCoroutinesApi +fun Flow.chunk(durationInMillis: Long): Flow> { + require(durationInMillis > 0) { "Duration should be greater than 0" } + return flow { + coroutineScope { + val events = ArrayList() + val ticker = fixedPeriodTicker(durationInMillis) + try { + val upstreamValues = produce(capacity = Channel.CONFLATED) { + collect { value -> send(value) } + } + while (isActive) { + var hasTimedOut = false + select { + upstreamValues.onReceive { + events.add(it) + } + ticker.onReceive { + hasTimedOut = true + } + } + if (hasTimedOut && events.isNotEmpty()) { + emit(events.toList()) + events.clear() + } + } + } catch (e: ClosedReceiveChannelException) { + // drain remaining events + if (events.isNotEmpty()) emit(events.toList()) + } finally { + ticker.cancel() + } + } + } +} + +@ExperimentalCoroutinesApi +fun Flow.throttleFirst(windowDuration: Long): Flow = flow { + var windowStartTime = SystemClock.elapsedRealtime() + var emitted = false + collect { value -> + val currentTime = SystemClock.elapsedRealtime() + val delta = currentTime - windowStartTime + if (delta >= windowDuration) { + windowStartTime += delta / windowDuration * windowDuration + emitted = false + } + if (!emitted) { + emit(value) + emitted = true + } + } +} + +@ExperimentalCoroutinesApi +fun tickerFlow(scope: CoroutineScope, delayMillis: Long, initialDelayMillis: Long = delayMillis): Flow { + return scope.fixedPeriodTicker(delayMillis, initialDelayMillis).consumeAsFlow() +} + +@ExperimentalCoroutinesApi +private fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMillis: Long = delayMillis): ReceiveChannel { + require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" } + require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" } + return produce(capacity = 0) { + delay(initialDelayMillis) + while (true) { + channel.send(Unit) + delay(delayMillis) + } + } +} diff --git a/libraries/matrix/src/main/java/io/element/android/x/matrix/room/RoomSummaryDataSource.kt b/libraries/matrix/src/main/java/io/element/android/x/matrix/room/RoomSummaryDataSource.kt index 96fa943527..9191c9673d 100644 --- a/libraries/matrix/src/main/java/io/element/android/x/matrix/room/RoomSummaryDataSource.kt +++ b/libraries/matrix/src/main/java/io/element/android/x/matrix/room/RoomSummaryDataSource.kt @@ -1,6 +1,7 @@ package io.element.android.x.matrix.room import io.element.android.x.core.data.CoroutineDispatchers +import io.element.android.x.core.data.flow.chunk import io.element.android.x.matrix.room.message.RoomMessageFactory import io.element.android.x.matrix.sync.roomListDiff import io.element.android.x.matrix.sync.state @@ -34,9 +35,12 @@ internal class RustRoomSummaryDataSource( fun startSync(){ slidingSyncView.roomListDiff() - .onEach { diff -> + .chunk(100) + .onEach { diffs -> updateRoomSummaries { - applyDiff(diff) + diffs.forEach { + applyDiff(it) + } } }.launchIn(coroutineScope) diff --git a/libraries/matrix/src/main/java/io/element/android/x/matrix/timeline/MatrixTimeline.kt b/libraries/matrix/src/main/java/io/element/android/x/matrix/timeline/MatrixTimeline.kt index 6a6dceb30f..e7c06c4af3 100644 --- a/libraries/matrix/src/main/java/io/element/android/x/matrix/timeline/MatrixTimeline.kt +++ b/libraries/matrix/src/main/java/io/element/android/x/matrix/timeline/MatrixTimeline.kt @@ -1,6 +1,7 @@ package io.element.android.x.matrix.timeline import io.element.android.x.core.data.CoroutineDispatchers +import io.element.android.x.core.data.flow.chunk import io.element.android.x.matrix.core.EventId import io.element.android.x.matrix.room.MatrixRoom import io.element.android.x.matrix.room.timelineDiff @@ -42,9 +43,12 @@ class MatrixTimeline( private fun diffFlow(): Flow { return room.timelineDiff() - .onEach { timelineDiff -> + .chunk(100) + .onEach { timelineDiffs -> updateTimelineItems { - applyDiff(timelineDiff) + timelineDiffs.onEach { + applyDiff(it) + } } }.map { } } @@ -107,7 +111,7 @@ class MatrixTimeline( room.addTimelineListener(timelineListener) } - fun dispose(){ + fun dispose() { room.removeTimeline() }