Browse Source

Refactor stream

master
Eduard Kuzmenko 3 years ago
parent
commit
9198ed384d
  1. 30
      src/helpers/schedulers.ts
  2. 31
      src/helpers/schedulers/debounce.ts
  3. 111
      src/lib/serviceWorker/stream.ts

30
src/helpers/schedulers.ts

@ -6,36 +6,12 @@
// * Jolly Cobra's schedulers // * Jolly Cobra's schedulers
import { AnyToVoidFunction, NoneToVoidFunction } from "../types"; import { AnyToVoidFunction, NoneToVoidFunction } from "../types";
import _debounce from './schedulers/debounce';
//type Scheduler = typeof requestAnimationFrame | typeof onTickEnd | typeof runNow; //type Scheduler = typeof requestAnimationFrame | typeof onTickEnd | typeof runNow;
export function debounce<F extends AnyToVoidFunction>( const debounce = _debounce;
fn: F, export {debounce};
ms: number,
shouldRunFirst = true,
shouldRunLast = true,
) {
let waitingTimeout: number | null = null;
return (...args: Parameters<F>) => {
if(waitingTimeout) {
clearTimeout(waitingTimeout);
waitingTimeout = null;
} else if(shouldRunFirst) {
// @ts-ignore
fn(...args);
}
waitingTimeout = setTimeout(() => {
if(shouldRunLast) {
// @ts-ignore
fn(...args);
}
waitingTimeout = null;
}, ms) as any;
};
}
export function throttle<F extends AnyToVoidFunction>( export function throttle<F extends AnyToVoidFunction>(
fn: F, fn: F,

31
src/helpers/schedulers/debounce.ts

@ -0,0 +1,31 @@
// * Jolly Cobra's schedulers
import { AnyToVoidFunction } from "../../types";
export default function debounce<F extends AnyToVoidFunction>(
fn: F,
ms: number,
shouldRunFirst = true,
shouldRunLast = true,
) {
let waitingTimeout: number | null = null;
return (...args: Parameters<F>) => {
if(waitingTimeout) {
clearTimeout(waitingTimeout);
waitingTimeout = null;
} else if(shouldRunFirst) {
// @ts-ignore
fn(...args);
}
waitingTimeout = setTimeout(() => {
if(shouldRunLast) {
// @ts-ignore
fn(...args);
}
waitingTimeout = null;
}, ms) as any;
};
}

111
src/lib/serviceWorker/stream.ts

@ -6,88 +6,119 @@
import { deferredPromise } from "../../helpers/cancellablePromise"; import { deferredPromise } from "../../helpers/cancellablePromise";
import { notifySomeone } from "../../helpers/context"; import { notifySomeone } from "../../helpers/context";
import debounce from "../../helpers/schedulers/debounce";
import { isSafari } from "../../helpers/userAgent"; import { isSafari } from "../../helpers/userAgent";
import { UploadFile } from "../../layer"; import { InputFileLocation, UploadFile } from "../../layer";
import { DownloadOptions } from "../mtproto/apiFileManager"; import { DownloadOptions } from "../mtproto/apiFileManager";
import { RequestFilePartTask, deferredPromises, incrementTaskId } from "./index.service"; import { RequestFilePartTask, deferredPromises, incrementTaskId } from "./index.service";
import timeout from "./timeout"; import timeout from "./timeout";
export default function onStreamFetch(event: FetchEvent, params: string) { type StreamRange = [number, number];
const range = parseRange(event.request.headers.get('Range')); type StreamId = string;
let [offset, end] = range; const streams: Map<StreamId, Stream> = new Map();
class Stream {
private destroyDebounced: () => void;
private id: StreamId;
private limitPart: number;
const info: DownloadOptions = JSON.parse(decodeURIComponent(params)); constructor(private info: DownloadOptions) {
//const fileName = getFileNameByLocation(info.location); this.id = Stream.getId(info);
streams.set(this.id, this);
// ! если грузить очень большое видео чанками по 512Кб в мобильном Safari, то стрим не запустится // ! если грузить очень большое видео чанками по 512Кб в мобильном Safari, то стрим не запустится
const limitPart = info.size > (75 * 1024 * 1024) ? STREAM_CHUNK_UPPER_LIMIT : STREAM_CHUNK_MIDDLE_LIMIT; this.limitPart = info.size > (75 * 1024 * 1024) ? STREAM_CHUNK_UPPER_LIMIT : STREAM_CHUNK_MIDDLE_LIMIT;
this.destroyDebounced = debounce(this.destroy, 15000, false, true);
/* if(info.size > limitPart && isSafari && offset === limitPart) {
//end = info.size - 1;
//offset = info.size - 1 - limitPart;
offset = info.size - (info.size % limitPart);
} */
//log.debug('[stream]', url, offset, end);
event.respondWith(Promise.race([
timeout(45 * 1000),
new Promise<Response>((resolve, reject) => {
// safari workaround
const possibleResponse = responseForSafariFirstRange(range, info.mimeType, info.size);
if(possibleResponse) {
return resolve(possibleResponse);
} }
const limit = end && end < limitPart ? alignLimit(end - offset + 1) : limitPart; private destroy = () => {
const alignedOffset = alignOffset(offset, limit); streams.delete(this.id);
};
//log.debug('[stream] requestFilePart:', /* info.dcId, info.location, */ alignedOffset, limit);
private requestFilePart(alignedOffset: number, limit: number) {
const task: RequestFilePartTask = { const task: RequestFilePartTask = {
type: 'requestFilePart', type: 'requestFilePart',
id: incrementTaskId(), id: incrementTaskId(),
payload: [info.dcId, info.location, alignedOffset, limit] payload: [this.info.dcId, this.info.location, alignedOffset, limit]
}; };
notifySomeone(task);
const deferred = deferredPromises[task.id] = deferredPromise<UploadFile.uploadFile>(); const deferred = deferredPromises[task.id] = deferredPromise<UploadFile.uploadFile>();
deferred.then(result => { return deferred;
}
public requestRange(range: StreamRange) {
this.destroyDebounced();
const possibleResponse = responseForSafariFirstRange(range, this.info.mimeType, this.info.size);
if(possibleResponse) {
return possibleResponse;
}
const [offset, end] = range;
/* if(info.size > limitPart && isSafari && offset === limitPart) {
//end = info.size - 1;
//offset = info.size - 1 - limitPart;
offset = info.size - (info.size % limitPart);
} */
const limit = end && end < this.limitPart ? alignLimit(end - offset + 1) : this.limitPart;
const alignedOffset = alignOffset(offset, limit);
return this.requestFilePart(alignedOffset, limit).then(result => {
let ab = result.bytes as Uint8Array; let ab = result.bytes as Uint8Array;
//log.debug('[stream] requestFilePart result:', result); //log.debug('[stream] requestFilePart result:', result);
const headers: Record<string, string> = { const headers: Record<string, string> = {
'Accept-Ranges': 'bytes', 'Accept-Ranges': 'bytes',
'Content-Range': `bytes ${alignedOffset}-${alignedOffset + ab.byteLength - 1}/${info.size || '*'}`, 'Content-Range': `bytes ${alignedOffset}-${alignedOffset + ab.byteLength - 1}/${this.info.size || '*'}`,
'Content-Length': `${ab.byteLength}`, 'Content-Length': `${ab.byteLength}`,
}; };
if(info.mimeType) headers['Content-Type'] = info.mimeType; if(this.info.mimeType) headers['Content-Type'] = this.info.mimeType;
if(isSafari) { if(isSafari) {
ab = ab.slice(offset - alignedOffset, end - alignedOffset + 1); ab = ab.slice(offset - alignedOffset, end - alignedOffset + 1);
headers['Content-Range'] = `bytes ${offset}-${offset + ab.byteLength - 1}/${info.size || '*'}`; headers['Content-Range'] = `bytes ${offset}-${offset + ab.byteLength - 1}/${this.info.size || '*'}`;
headers['Content-Length'] = `${ab.byteLength}`; headers['Content-Length'] = `${ab.byteLength}`;
} }
// simulate slow connection // simulate slow connection
//setTimeout(() => { //setTimeout(() => {
resolve(new Response(ab, { return new Response(ab, {
status: 206, status: 206,
statusText: 'Partial Content', statusText: 'Partial Content',
headers, headers,
})); });
//}, 2.5e3); //}, 2.5e3);
}).catch(err => {}); });
}
notifySomeone(task); public static get(info: DownloadOptions) {
}) return streams.get(this.getId(info)) ?? new Stream(info);
}
public static getId(info: DownloadOptions) {
return (info.location as InputFileLocation.inputDocumentFileLocation).id;
}
}
export default function onStreamFetch(event: FetchEvent, params: string) {
const range = parseRange(event.request.headers.get('Range'));
const info: DownloadOptions = JSON.parse(decodeURIComponent(params));
const stream = Stream.get(info);
//log.debug('[stream]', url, offset, end);
event.respondWith(Promise.race([
timeout(45 * 1000),
stream.requestRange(range)
])); ]));
} }
function responseForSafariFirstRange(range: [number, number], mimeType: string, size: number): Response { function responseForSafariFirstRange(range: StreamRange, mimeType: string, size: number): Response {
if(range[0] === 0 && range[1] === 1) { if(range[0] === 0 && range[1] === 1) {
return new Response(new Uint8Array(2).buffer, { return new Response(new Uint8Array(2).buffer, {
status: 206, status: 206,
@ -112,7 +143,7 @@ const STREAM_CHUNK_MIDDLE_LIMIT = 512 * 1024;
const STREAM_CHUNK_UPPER_LIMIT = 1024 * 1024; const STREAM_CHUNK_UPPER_LIMIT = 1024 * 1024;
const SMALLEST_CHUNK_LIMIT = 512 * 4; const SMALLEST_CHUNK_LIMIT = 512 * 4;
function parseRange(header: string): [number, number] { function parseRange(header: string): StreamRange {
if(!header) return [0, 0]; if(!header) return [0, 0];
const [, chunks] = header.split('='); const [, chunks] = header.split('=');
const ranges = chunks.split(', '); const ranges = chunks.split(', ');

Loading…
Cancel
Save