From 9198ed384d56b57d5e423a955400c1cd3fc8c616 Mon Sep 17 00:00:00 2001 From: Eduard Kuzmenko Date: Tue, 29 Jun 2021 19:23:16 +0300 Subject: [PATCH] Refactor stream --- src/helpers/schedulers.ts | 30 +----- src/helpers/schedulers/debounce.ts | 31 ++++++ src/lib/serviceWorker/stream.ts | 159 +++++++++++++++++------------ 3 files changed, 129 insertions(+), 91 deletions(-) create mode 100644 src/helpers/schedulers/debounce.ts diff --git a/src/helpers/schedulers.ts b/src/helpers/schedulers.ts index 9a0d0e83..9c641b72 100644 --- a/src/helpers/schedulers.ts +++ b/src/helpers/schedulers.ts @@ -6,36 +6,12 @@ // * Jolly Cobra's schedulers import { AnyToVoidFunction, NoneToVoidFunction } from "../types"; +import _debounce from './schedulers/debounce'; //type Scheduler = typeof requestAnimationFrame | typeof onTickEnd | typeof runNow; -export function debounce( - fn: F, - ms: number, - shouldRunFirst = true, - shouldRunLast = true, -) { - let waitingTimeout: number | null = null; - - return (...args: Parameters) => { - if(waitingTimeout) { - clearTimeout(waitingTimeout); - waitingTimeout = null; - } else if(shouldRunFirst) { - // @ts-ignore - fn(...args); - } - - waitingTimeout = setTimeout(() => { - if(shouldRunLast) { - // @ts-ignore - fn(...args); - } - - waitingTimeout = null; - }, ms) as any; - }; -} +const debounce = _debounce; +export {debounce}; export function throttle( fn: F, diff --git a/src/helpers/schedulers/debounce.ts b/src/helpers/schedulers/debounce.ts new file mode 100644 index 00000000..3aaf87cf --- /dev/null +++ b/src/helpers/schedulers/debounce.ts @@ -0,0 +1,31 @@ +// * Jolly Cobra's schedulers + +import { AnyToVoidFunction } from "../../types"; + +export default function debounce( + fn: F, + ms: number, + shouldRunFirst = true, + shouldRunLast = true, +) { + let waitingTimeout: number | null = null; + + return (...args: Parameters) => { + if(waitingTimeout) { + clearTimeout(waitingTimeout); + waitingTimeout = null; + } else if(shouldRunFirst) { + // @ts-ignore + fn(...args); + } + + waitingTimeout = setTimeout(() => { + if(shouldRunLast) { + // @ts-ignore + fn(...args); + } + + waitingTimeout = null; + }, ms) as any; + }; +} diff --git a/src/lib/serviceWorker/stream.ts b/src/lib/serviceWorker/stream.ts index 839e2309..60c83dd4 100644 --- a/src/lib/serviceWorker/stream.ts +++ b/src/lib/serviceWorker/stream.ts @@ -6,88 +6,119 @@ import { deferredPromise } from "../../helpers/cancellablePromise"; import { notifySomeone } from "../../helpers/context"; +import debounce from "../../helpers/schedulers/debounce"; import { isSafari } from "../../helpers/userAgent"; -import { UploadFile } from "../../layer"; +import { InputFileLocation, UploadFile } from "../../layer"; import { DownloadOptions } from "../mtproto/apiFileManager"; import { RequestFilePartTask, deferredPromises, incrementTaskId } from "./index.service"; import timeout from "./timeout"; -export default function onStreamFetch(event: FetchEvent, params: string) { - const range = parseRange(event.request.headers.get('Range')); - let [offset, end] = range; +type StreamRange = [number, number]; +type StreamId = string; +const streams: Map = new Map(); +class Stream { + private destroyDebounced: () => void; + private id: StreamId; + private limitPart: number; + + constructor(private info: DownloadOptions) { + this.id = Stream.getId(info); + streams.set(this.id, this); + + // ! если грузить очень большое видео чанками по 512Кб в мобильном Safari, то стрим не запустится + this.limitPart = info.size > (75 * 1024 * 1024) ? STREAM_CHUNK_UPPER_LIMIT : STREAM_CHUNK_MIDDLE_LIMIT; + this.destroyDebounced = debounce(this.destroy, 15000, false, true); + } - const info: DownloadOptions = JSON.parse(decodeURIComponent(params)); - //const fileName = getFileNameByLocation(info.location); + private destroy = () => { + streams.delete(this.id); + }; + + private requestFilePart(alignedOffset: number, limit: number) { + const task: RequestFilePartTask = { + type: 'requestFilePart', + id: incrementTaskId(), + payload: [this.info.dcId, this.info.location, alignedOffset, limit] + }; + + notifySomeone(task); + + const deferred = deferredPromises[task.id] = deferredPromise(); + return deferred; + } - // ! если грузить очень большое видео чанками по 512Кб в мобильном Safari, то стрим не запустится - const limitPart = info.size > (75 * 1024 * 1024) ? STREAM_CHUNK_UPPER_LIMIT : STREAM_CHUNK_MIDDLE_LIMIT; + public requestRange(range: StreamRange) { + this.destroyDebounced(); - /* if(info.size > limitPart && isSafari && offset === limitPart) { - //end = info.size - 1; - //offset = info.size - 1 - limitPart; - offset = info.size - (info.size % limitPart); - } */ + const possibleResponse = responseForSafariFirstRange(range, this.info.mimeType, this.info.size); + if(possibleResponse) { + return possibleResponse; + } - //log.debug('[stream]', url, offset, end); + const [offset, end] = range; - event.respondWith(Promise.race([ - timeout(45 * 1000), + /* if(info.size > limitPart && isSafari && offset === limitPart) { + //end = info.size - 1; + //offset = info.size - 1 - limitPart; + offset = info.size - (info.size % limitPart); + } */ + + const limit = end && end < this.limitPart ? alignLimit(end - offset + 1) : this.limitPart; + const alignedOffset = alignOffset(offset, limit); + + return this.requestFilePart(alignedOffset, limit).then(result => { + let ab = result.bytes as Uint8Array; + + //log.debug('[stream] requestFilePart result:', result); + + const headers: Record = { + 'Accept-Ranges': 'bytes', + 'Content-Range': `bytes ${alignedOffset}-${alignedOffset + ab.byteLength - 1}/${this.info.size || '*'}`, + 'Content-Length': `${ab.byteLength}`, + }; + + if(this.info.mimeType) headers['Content-Type'] = this.info.mimeType; - new Promise((resolve, reject) => { - // safari workaround - const possibleResponse = responseForSafariFirstRange(range, info.mimeType, info.size); - if(possibleResponse) { - return resolve(possibleResponse); + if(isSafari) { + ab = ab.slice(offset - alignedOffset, end - alignedOffset + 1); + headers['Content-Range'] = `bytes ${offset}-${offset + ab.byteLength - 1}/${this.info.size || '*'}`; + headers['Content-Length'] = `${ab.byteLength}`; } - const limit = end && end < limitPart ? alignLimit(end - offset + 1) : limitPart; - const alignedOffset = alignOffset(offset, limit); + // simulate slow connection + //setTimeout(() => { + return new Response(ab, { + status: 206, + statusText: 'Partial Content', + headers, + }); + //}, 2.5e3); + }); + } - //log.debug('[stream] requestFilePart:', /* info.dcId, info.location, */ alignedOffset, limit); + public static get(info: DownloadOptions) { + return streams.get(this.getId(info)) ?? new Stream(info); + } - const task: RequestFilePartTask = { - type: 'requestFilePart', - id: incrementTaskId(), - payload: [info.dcId, info.location, alignedOffset, limit] - }; + public static getId(info: DownloadOptions) { + return (info.location as InputFileLocation.inputDocumentFileLocation).id; + } +} - - const deferred = deferredPromises[task.id] = deferredPromise(); - deferred.then(result => { - let ab = result.bytes as Uint8Array; - - //log.debug('[stream] requestFilePart result:', result); - - const headers: Record = { - 'Accept-Ranges': 'bytes', - 'Content-Range': `bytes ${alignedOffset}-${alignedOffset + ab.byteLength - 1}/${info.size || '*'}`, - 'Content-Length': `${ab.byteLength}`, - }; - - if(info.mimeType) headers['Content-Type'] = info.mimeType; - - if(isSafari) { - ab = ab.slice(offset - alignedOffset, end - alignedOffset + 1); - headers['Content-Range'] = `bytes ${offset}-${offset + ab.byteLength - 1}/${info.size || '*'}`; - headers['Content-Length'] = `${ab.byteLength}`; - } - - // simulate slow connection - //setTimeout(() => { - resolve(new Response(ab, { - status: 206, - statusText: 'Partial Content', - headers, - })); - //}, 2.5e3); - }).catch(err => {}); - - notifySomeone(task); - }) +export default function onStreamFetch(event: FetchEvent, params: string) { + const range = parseRange(event.request.headers.get('Range')); + const info: DownloadOptions = JSON.parse(decodeURIComponent(params)); + const stream = Stream.get(info); + + //log.debug('[stream]', url, offset, end); + + event.respondWith(Promise.race([ + timeout(45 * 1000), + stream.requestRange(range) ])); } -function responseForSafariFirstRange(range: [number, number], mimeType: string, size: number): Response { +function responseForSafariFirstRange(range: StreamRange, mimeType: string, size: number): Response { if(range[0] === 0 && range[1] === 1) { return new Response(new Uint8Array(2).buffer, { status: 206, @@ -112,7 +143,7 @@ const STREAM_CHUNK_MIDDLE_LIMIT = 512 * 1024; const STREAM_CHUNK_UPPER_LIMIT = 1024 * 1024; const SMALLEST_CHUNK_LIMIT = 512 * 4; -function parseRange(header: string): [number, number] { +function parseRange(header: string): StreamRange { if(!header) return [0, 0]; const [, chunks] = header.split('='); const ranges = chunks.split(', ');