Browse Source

Preload stream chunks

master
Eduard Kuzmenko 3 years ago
parent
commit
3aaf6ddd22
  1. 12
      src/components/popups/avatar.ts
  2. 32
      src/helpers/blob.ts
  3. 79
      src/lib/cacheStorage.ts
  4. 15
      src/lib/filemanager.ts
  5. 103
      src/lib/mtproto/apiFileManager.ts
  6. 14
      src/lib/mtproto/mtproto.worker.ts
  7. 19
      src/lib/mtproto/mtprotoworker.ts
  8. 19
      src/lib/serviceWorker/index.service.ts
  9. 125
      src/lib/serviceWorker/stream.ts

12
src/components/popups/avatar.ts

@ -9,6 +9,7 @@ import resizeableImage from "../../lib/cropper";
import PopupElement from "."; import PopupElement from ".";
import { ripple } from "../ripple"; import { ripple } from "../ripple";
import { _i18n } from "../../lib/langPack"; import { _i18n } from "../../lib/langPack";
import { readBlobAsDataURL } from "../../helpers/blob";
export default class PopupAvatar extends PopupElement { export default class PopupAvatar extends PopupElement {
private cropContainer: HTMLElement; private cropContainer: HTMLElement;
@ -49,11 +50,8 @@ export default class PopupAvatar extends PopupElement {
if(!file) { if(!file) {
return; return;
} }
const reader = new FileReader(); readBlobAsDataURL(file).then(contents => {
reader.onload = (e) => {
const contents = e.target.result as string;
this.image = new Image(); this.image = new Image();
this.cropContainer.append(this.image); this.cropContainer.append(this.image);
this.image.src = contents; this.image.src = contents;
@ -67,9 +65,7 @@ export default class PopupAvatar extends PopupElement {
this.cropper = resizeableImage(this.image, this.canvas); this.cropper = resizeableImage(this.image, this.canvas);
this.input.value = ''; this.input.value = '';
}; };
}; });
reader.readAsDataURL(file);
}, false); }, false);
this.btnSubmit = document.createElement('button'); this.btnSubmit = document.createElement('button');

32
src/helpers/blob.ts

@ -9,16 +9,32 @@
* https://github.com/zhukov/webogram/blob/master/LICENSE * https://github.com/zhukov/webogram/blob/master/LICENSE
*/ */
export const readBlobAsText = (blob: Blob) => { export function readBlobAs(blob: Blob, method: 'readAsText'): Promise<string>;
return new Promise<string>(resolve => { export function readBlobAs(blob: Blob, method: 'readAsDataURL'): Promise<string>;
export function readBlobAs(blob: Blob, method: 'readAsArrayBuffer'): Promise<ArrayBuffer>;
export function readBlobAs(blob: Blob, method: 'readAsArrayBuffer' | 'readAsText' | 'readAsDataURL'): Promise<any> {
return new Promise<any>((resolve) => {
const reader = new FileReader(); const reader = new FileReader();
reader.addEventListener('loadend', (e) => { reader.addEventListener('loadend', (e) => resolve(e.target.result));
// @ts-ignore reader[method](blob);
resolve(e.srcElement.result);
});
reader.readAsText(blob);
}); });
}; }
export function readBlobAsText(blob: Blob) {
return readBlobAs(blob, 'readAsText');
}
export function readBlobAsDataURL(blob: Blob) {
return readBlobAs(blob, 'readAsDataURL');
}
export function readBlobAsArrayBuffer(blob: Blob) {
return readBlobAs(blob, 'readAsArrayBuffer');
}
export function readBlobAsUint8Array(blob: Blob) {
return readBlobAsArrayBuffer(blob).then(buffer => new Uint8Array(buffer));
}
export function blobConstruct(blobParts: any, mimeType: string = ''): Blob { export function blobConstruct(blobParts: any, mimeType: string = ''): Blob {
let blob; let blob;

79
src/lib/cacheStorage.ts

@ -10,16 +10,17 @@ import FileManager from './filemanager';
//import { MOUNT_CLASS_TO } from './mtproto/mtproto_config'; //import { MOUNT_CLASS_TO } from './mtproto/mtproto_config';
//import { logger } from './polyfill'; //import { logger } from './polyfill';
export type CacheStorageDbName = 'cachedFiles' | 'cachedStreamChunks' | 'cachedAssets';
export default class CacheStorageController { export default class CacheStorageController {
private static STORAGES: CacheStorageController[] = []; private static STORAGES: CacheStorageController[] = [];
//public dbName = 'cachedFiles';
private openDbPromise: Promise<Cache>; private openDbPromise: Promise<Cache>;
private useStorage = true; private useStorage = true;
//private log: ReturnType<typeof logger> = logger('CS'); //private log: ReturnType<typeof logger> = logger('CS');
constructor(private dbName: string) { constructor(private dbName: CacheStorageDbName) {
if(Modes.test) { if(Modes.test) {
this.dbName += '_test'; this.dbName += '_test';
} }
@ -33,71 +34,38 @@ export default class CacheStorageController {
} }
private openDatabase(): Promise<Cache> { private openDatabase(): Promise<Cache> {
if(this.openDbPromise) { return this.openDbPromise ?? (this.openDbPromise = caches.open(this.dbName));
return this.openDbPromise;
}
return this.openDbPromise = caches.open(this.dbName);
} }
public delete(entryName: string) { public delete(entryName: string) {
return this.timeoutOperation((cache) => { return this.timeoutOperation((cache) => cache.delete('/' + entryName));
return cache.delete('/' + entryName);
});
} }
public deleteAll() { public deleteAll() {
return caches.delete(this.dbName); return caches.delete(this.dbName);
} }
public save(entryName: string, response: Response) { public get(entryName: string) {
if(!this.useStorage) return Promise.reject('STORAGE_OFFLINE'); return this.timeoutOperation((cache) => cache.match('/' + entryName));
return this.timeoutOperation((cache) => {
return cache.put('/' + entryName, response);
});
} }
public saveFile(fileName: string, blob: Blob | Uint8Array) { public save(entryName: string, response: Response) {
if(!this.useStorage) return Promise.reject('STORAGE_OFFLINE'); return this.timeoutOperation((cache) => cache.put('/' + entryName, response));
//return Promise.resolve(blobConstruct([blob]));
if(!(blob instanceof Blob)) {
blob = blobConstruct(blob) as Blob;
}
const response = new Response(blob, {
headers: {
'Content-Length': '' + blob.size
}
});
return this.save(fileName, response).then(() => {
return blob as Blob;
});
} }
/* public getBlobSize(blob: any) {
return blob.size || blob.byteLength || blob.length;
} */
public getFile(fileName: string, method: 'blob' | 'json' | 'text' = 'blob'): Promise<any> { public getFile(fileName: string, method: 'blob' | 'json' | 'text' = 'blob'): Promise<any> {
if(!this.useStorage) return Promise.reject('STORAGE_OFFLINE');
/* if(method === 'blob') { /* if(method === 'blob') {
return Promise.reject(); return Promise.reject();
} */ } */
// const str = `get fileName: ${fileName}`; // const str = `get fileName: ${fileName}`;
// console.time(str); // console.time(str);
return this.timeoutOperation(async(cache) => { return this.get(fileName).then((response) => {
const response = await cache.match('/' + fileName); if(!response) {
if(!response || !cache) {
//console.warn('getFile:', response, fileName); //console.warn('getFile:', response, fileName);
throw 'NO_ENTRY_FOUND'; throw 'NO_ENTRY_FOUND';
} }
const promise = response[method](); const promise = response[method]();
// promise.then(() => { // promise.then(() => {
// console.timeEnd(str); // console.timeEnd(str);
@ -106,7 +74,26 @@ export default class CacheStorageController {
}); });
} }
private timeoutOperation<T>(callback: (cache: Cache) => Promise<T>) { public saveFile(fileName: string, blob: Blob | Uint8Array) {
//return Promise.resolve(blobConstruct([blob]));
if(!(blob instanceof Blob)) {
blob = blobConstruct(blob) as Blob;
}
const response = new Response(blob, {
headers: {
'Content-Length': '' + blob.size
}
});
return this.save(fileName, response).then(() => blob as Blob);
}
public timeoutOperation<T>(callback: (cache: Cache) => Promise<T>) {
if(!this.useStorage) {
return Promise.reject('STORAGE_OFFLINE');
}
return new Promise<T>(async(resolve, reject) => { return new Promise<T>(async(resolve, reject) => {
let rejected = false; let rejected = false;
const timeout = setTimeout(() => { const timeout = setTimeout(() => {
@ -118,6 +105,8 @@ export default class CacheStorageController {
try { try {
const cache = await this.openDatabase(); const cache = await this.openDatabase();
if(!cache) { if(!cache) {
this.useStorage = false;
this.openDbPromise = undefined;
throw 'no cache?'; throw 'no cache?';
} }

15
src/lib/filemanager.ts

@ -9,7 +9,7 @@
* https://github.com/zhukov/webogram/blob/master/LICENSE * https://github.com/zhukov/webogram/blob/master/LICENSE
*/ */
import { blobConstruct } from "../helpers/blob"; import { blobConstruct, readBlobAsUint8Array } from "../helpers/blob";
export class FileManager { export class FileManager {
public blobSupported = true; public blobSupported = true;
@ -28,17 +28,8 @@ export class FileManager {
public write(fileWriter: ReturnType<FileManager['getFakeFileWriter']>, bytes: Uint8Array | Blob | string): Promise<void> { public write(fileWriter: ReturnType<FileManager['getFakeFileWriter']>, bytes: Uint8Array | Blob | string): Promise<void> {
if(bytes instanceof Blob) { // is file bytes if(bytes instanceof Blob) { // is file bytes
return new Promise((resolve, reject) => { return readBlobAsUint8Array(bytes).then(arr => {
let fileReader = new FileReader(); return fileWriter.write(arr);
fileReader.onload = function(event) {
let arrayBuffer = event.target.result as ArrayBuffer;
let arr = new Uint8Array(arrayBuffer);
fileWriter.write(arr).then(resolve, reject);
};
fileReader.readAsArrayBuffer(bytes);
}); });
} else { } else {
return fileWriter.write(bytes); return fileWriter.write(bytes);

103
src/lib/mtproto/apiFileManager.ts

@ -11,6 +11,7 @@
import { MOUNT_CLASS_TO } from "../../config/debug"; import { MOUNT_CLASS_TO } from "../../config/debug";
import Modes from "../../config/modes"; import Modes from "../../config/modes";
import { readBlobAsArrayBuffer } from "../../helpers/blob";
import { CancellablePromise, deferredPromise } from "../../helpers/cancellablePromise"; import { CancellablePromise, deferredPromise } from "../../helpers/cancellablePromise";
import { notifyAll, notifySomeone } from "../../helpers/context"; import { notifyAll, notifySomeone } from "../../helpers/context";
import { getFileNameByLocation } from "../../helpers/fileName"; import { getFileNameByLocation } from "../../helpers/fileName";
@ -489,73 +490,59 @@ export class ApiFileManager {
for(let offset = 0; offset < fileSize; offset += partSize) { for(let offset = 0; offset < fileSize; offset += partSize) {
const part = _part++; // 0, 1 const part = _part++; // 0, 1
yield self.downloadRequest('upload', id, () => { yield self.downloadRequest('upload', id, () => {
return new Promise<void>((uploadResolve, uploadReject) => { const blob = file.slice(offset, offset + partSize);
const reader = new FileReader();
const blob = file.slice(offset, offset + partSize);
reader.onloadend = (e) => {
if(canceled) {
uploadReject({type: 'UPLOAD_CANCELED'});
return;
}
if(e.target.readyState !== FileReader.DONE) {
self.log.error('wrong readyState!');
uploadReject({type: 'WRONG_READY_STATE'});
return;
}
let buffer = e.target.result as ArrayBuffer;
self.debug && self.log('Upload file part, isBig:', isBigFile, part, buffer.byteLength, new Uint8Array(buffer).length, new Uint8Array(buffer).slice().length);
/* const u = new Uint8Array(buffer.byteLength); return readBlobAsArrayBuffer(blob).then(buffer => {
for(let i = 0; i < u.length; ++i) { if(canceled) {
//u[i] = Math.random() * 255 | 0; throw {type: 'UPLOAD_CANCELED'};
u[i] = 0; }
}
buffer = u.buffer; */ self.debug && self.log('Upload file part, isBig:', isBigFile, part, buffer.byteLength, new Uint8Array(buffer).length, new Uint8Array(buffer).slice().length);
/* const u = new Uint8Array(buffer.byteLength);
for(let i = 0; i < u.length; ++i) {
//u[i] = Math.random() * 255 | 0;
u[i] = 0;
}
buffer = u.buffer; */
/* setTimeout(() => { /* setTimeout(() => {
doneParts++; doneParts++;
uploadResolve(); uploadResolve();
//////this.log('Progress', doneParts * partSize / fileSize); //////this.log('Progress', doneParts * partSize / fileSize);
self.log('done part', part, doneParts); self.log('done part', part, doneParts);
deferred.notify({done: doneParts * partSize, total: fileSize}); deferred.notify({done: doneParts * partSize, total: fileSize});
if(doneParts >= totalParts) { if(doneParts >= totalParts) {
deferred.resolve(resultInputFile); deferred.resolve(resultInputFile);
resolved = true; resolved = true;
} }
}, 1250); }, 1250);
return; */ return; */
apiManager.invokeApi(method, { return apiManager.invokeApi(method, {
file_id: fileId, file_id: fileId,
file_part: part, file_part: part,
file_total_parts: totalParts, file_total_parts: totalParts,
bytes: buffer/* new Uint8Array(buffer) */ bytes: buffer/* new Uint8Array(buffer) */
} as any, { } as any, {
//startMaxLength: partSize + 256, //startMaxLength: partSize + 256,
fileUpload: true fileUpload: true
}).then((result) => { }).then((result) => {
doneParts++; doneParts++;
uploadResolve();
//////this.log('Progress', doneParts * partSize / fileSize); //////this.log('Progress', doneParts * partSize / fileSize);
deferred.notify({done: doneParts * partSize, total: fileSize}); deferred.notify({done: doneParts * partSize, total: fileSize});
if(doneParts >= totalParts) { if(doneParts >= totalParts) {
deferred.resolve(resultInputFile); deferred.resolve(resultInputFile);
resolved = true; resolved = true;
} }
}, errorHandler); }, errorHandler);
};
reader.readAsArrayBuffer(blob);
}); });
}, activeDelta).catch(errorHandler); }, activeDelta).catch(errorHandler);
} }

14
src/lib/mtproto/mtproto.worker.ts

@ -20,6 +20,7 @@ import sessionStorage from '../sessionStorage';
import { LocalStorageProxyTask } from '../localStorage'; import { LocalStorageProxyTask } from '../localStorage';
import { WebpConvertTask } from '../webp/webpWorkerController'; import { WebpConvertTask } from '../webp/webpWorkerController';
import { socketsProxied } from './transports/socketProxied'; import { socketsProxied } from './transports/socketProxied';
import { ToggleStorageTask } from './mtprotoworker';
let webpSupported = false; let webpSupported = false;
export const isWebpSupported = () => { export const isWebpSupported = () => {
@ -95,6 +96,12 @@ const taskListeners = {
forceReconnect: () => { forceReconnect: () => {
networkerFactory.forceReconnect(); networkerFactory.forceReconnect();
}, },
toggleStorage: (task: ToggleStorageTask) => {
const enabled = task.payload;
// AppStorage.toggleStorage(enabled);
CacheStorageController.toggleStorage(enabled);
}
}; };
const onMessage = async(e: any) => { const onMessage = async(e: any) => {
@ -153,13 +160,6 @@ const onMessage = async(e: any) => {
break; break;
} }
case 'toggleStorage': {
const enabled = task.args[0];
// AppStorage.toggleStorage(enabled);
CacheStorageController.toggleStorage(enabled);
break;
}
case 'setLanguage': case 'setLanguage':
case 'startAll': case 'startAll':
case 'stopAll': { case 'stopAll': {

19
src/lib/mtproto/mtprotoworker.ts

@ -6,7 +6,7 @@
import type { LocalStorageProxyTask, LocalStorageProxyTaskResponse } from '../localStorage'; import type { LocalStorageProxyTask, LocalStorageProxyTaskResponse } from '../localStorage';
//import type { LocalStorageProxyDeleteTask, LocalStorageProxySetTask } from '../storage'; //import type { LocalStorageProxyDeleteTask, LocalStorageProxySetTask } from '../storage';
import type { InvokeApiOptions } from '../../types'; import type { InvokeApiOptions, WorkerTaskVoidTemplate } from '../../types';
import type { MethodDeclMap } from '../../layer'; import type { MethodDeclMap } from '../../layer';
import MTProtoWorker from 'worker-loader!./mtproto.worker'; import MTProtoWorker from 'worker-loader!./mtproto.worker';
//import './mtproto.worker'; //import './mtproto.worker';
@ -29,6 +29,7 @@ import appRuntimeManager from '../appManagers/appRuntimeManager';
import { SocketProxyTask } from './transports/socketProxied'; import { SocketProxyTask } from './transports/socketProxied';
import telegramMeWebManager from './telegramMeWebManager'; import telegramMeWebManager from './telegramMeWebManager';
import { pause } from '../../helpers/schedulers'; import { pause } from '../../helpers/schedulers';
import { CacheStorageDbName } from '../cacheStorage';
type Task = { type Task = {
taskId: number, taskId: number,
@ -45,9 +46,15 @@ type HashOptions = {
[queryJSON: string]: HashResult [queryJSON: string]: HashResult
}; };
export interface ToggleStorageTask extends WorkerTaskVoidTemplate {
type: 'toggleStorage',
payload: boolean
};
export class ApiManagerProxy extends CryptoWorkerMethods { export class ApiManagerProxy extends CryptoWorkerMethods {
public worker: /* Window */Worker; public worker: /* Window */Worker;
public postMessage: (...args: any[]) => void; public postMessage: (...args: any[]) => void;
public postSWMessage: (...args: any[]) => void = () => {};
private afterMessageIdTemp = 0; private afterMessageIdTemp = 0;
private taskId = 0; private taskId = 0;
@ -101,6 +108,7 @@ export class ApiManagerProxy extends CryptoWorkerMethods {
this.registerServiceWorker(); this.registerServiceWorker();
this.addTaskListener('clear', () => { this.addTaskListener('clear', () => {
const toClear: CacheStorageDbName[] = ['cachedFiles', 'cachedStreamChunks'];
Promise.all([ Promise.all([
AppStorage.toggleStorage(false), AppStorage.toggleStorage(false),
sessionStorage.clear(), sessionStorage.clear(),
@ -108,7 +116,8 @@ export class ApiManagerProxy extends CryptoWorkerMethods {
telegramMeWebManager.setAuthorized(false), telegramMeWebManager.setAuthorized(false),
pause(3000) pause(3000)
]), ]),
webPushApiManager.forceUnsubscribe() webPushApiManager.forceUnsubscribe(),
Promise.all(toClear.map(cacheName => caches.delete(cacheName)))
]).finally(() => { ]).finally(() => {
appRuntimeManager.reload(); appRuntimeManager.reload();
}); });
@ -220,6 +229,8 @@ export class ApiManagerProxy extends CryptoWorkerMethods {
this.log('SW statechange', e); this.log('SW statechange', e);
}); });
this.postSWMessage = worker.controller.postMessage.bind(worker.controller);
/// #if MTPROTO_SW /// #if MTPROTO_SW
const controller = worker.controller || registration.installing || registration.waiting || registration.active; const controller = worker.controller || registration.installing || registration.waiting || registration.active;
this.onWorkerFirstMessage(controller); this.onWorkerFirstMessage(controller);
@ -539,7 +550,9 @@ export class ApiManagerProxy extends CryptoWorkerMethods {
} }
public toggleStorage(enabled: boolean) { public toggleStorage(enabled: boolean) {
return this.performTaskWorkerVoid('toggleStorage', enabled); const task: ToggleStorageTask = {type: 'toggleStorage', payload: enabled};
this.postMessage(task);
this.postSWMessage(task);
} }
public stopAll() { public stopAll() {

19
src/lib/serviceWorker/index.service.ts

@ -8,26 +8,28 @@
import '../mtproto/mtproto.worker'; import '../mtproto/mtproto.worker';
/// #endif /// #endif
//import CacheStorageController from '../cacheStorage'; //import CacheStorageController from '../cacheStorage';
import type { WorkerTaskTemplate, WorkerTaskVoidTemplate } from '../../types'; import type { Modify, WorkerTaskTemplate, WorkerTaskVoidTemplate } from '../../types';
import type { InputFileLocation, UploadFile } from '../../layer'; import type { InputFileLocation, UploadFile } from '../../layer';
import type { WebPushApiManager } from '../mtproto/webPushApiManager'; import type { WebPushApiManager } from '../mtproto/webPushApiManager';
import type { PushNotificationObject } from './push'; import type { PushNotificationObject } from './push';
import type { ToggleStorageTask } from '../mtproto/mtprotoworker';
import { logger, LogTypes } from '../logger'; import { logger, LogTypes } from '../logger';
import { CancellablePromise } from '../../helpers/cancellablePromise'; import { CancellablePromise } from '../../helpers/cancellablePromise';
import { CACHE_ASSETS_NAME, requestCache } from './cache'; import { CACHE_ASSETS_NAME, requestCache } from './cache';
import onStreamFetch from './stream'; import onStreamFetch from './stream';
import { closeAllNotifications, onPing } from './push'; import { closeAllNotifications, onPing } from './push';
import CacheStorageController from '../cacheStorage';
export const log = logger('SW', LogTypes.Error | LogTypes.Debug | LogTypes.Log | LogTypes.Warn); export const log = logger('SW', LogTypes.Error | LogTypes.Debug | LogTypes.Log | LogTypes.Warn);
const ctx = self as any as ServiceWorkerGlobalScope; const ctx = self as any as ServiceWorkerGlobalScope;
export const deferredPromises: {[taskId: number]: CancellablePromise<any>} = {}; export const deferredPromises: {[taskId: string]: CancellablePromise<any>} = {};
export interface RequestFilePartTask extends WorkerTaskTemplate { export interface RequestFilePartTask extends Modify<WorkerTaskTemplate, {id: string}> {
type: 'requestFilePart', type: 'requestFilePart',
payload: [number, InputFileLocation, number, number] payload: [number, InputFileLocation, number, number]
}; };
export interface RequestFilePartTaskResponse extends WorkerTaskTemplate { export interface RequestFilePartTaskResponse extends Modify<WorkerTaskTemplate, {id: string}> {
type: 'requestFilePart', type: 'requestFilePart',
payload?: UploadFile.uploadFile, payload?: UploadFile.uploadFile,
originalPayload?: RequestFilePartTask['payload'] originalPayload?: RequestFilePartTask['payload']
@ -55,7 +57,7 @@ export interface ServiceWorkerPushClickTask extends WorkerTaskVoidTemplate {
payload: PushNotificationObject payload: PushNotificationObject
}; };
export type ServiceWorkerTask = RequestFilePartTaskResponse | ServiceWorkerPingTask | ServiceWorkerNotificationsClearTask; export type ServiceWorkerTask = RequestFilePartTaskResponse | ServiceWorkerPingTask | ServiceWorkerNotificationsClearTask | ToggleStorageTask;
/// #if !MTPROTO_SW /// #if !MTPROTO_SW
const taskListeners: { const taskListeners: {
@ -77,6 +79,9 @@ const taskListeners: {
} }
delete deferredPromises[task.id]; delete deferredPromises[task.id];
},
toggleStorage: (task: ToggleStorageTask) => {
CacheStorageController.toggleStorage(task.payload);
} }
}; };
ctx.addEventListener('message', (e) => { ctx.addEventListener('message', (e) => {
@ -89,7 +94,7 @@ ctx.addEventListener('message', (e) => {
/// #endif /// #endif
//const cacheStorage = new CacheStorageController('cachedAssets'); //const cacheStorage = new CacheStorageController('cachedAssets');
let taskId = 0; /* let taskId = 0;
export function getTaskId() { export function getTaskId() {
return taskId; return taskId;
@ -97,7 +102,7 @@ export function getTaskId() {
export function incrementTaskId() { export function incrementTaskId() {
return taskId++; return taskId++;
} } */
const onFetch = (event: FetchEvent): void => { const onFetch = (event: FetchEvent): void => {
if(event.request.url.indexOf(location.origin + '/') === 0 && event.request.url.match(/\.(js|css|jpe?g|json|wasm|png|mp3|svg|tgs|ico|woff2?|ttf|webmanifest?)(?:\?.*)?$/)) { if(event.request.url.indexOf(location.origin + '/') === 0 && event.request.url.match(/\.(js|css|jpe?g|json|wasm|png|mp3|svg|tgs|ico|woff2?|ttf|webmanifest?)(?:\?.*)?$/)) {

125
src/lib/serviceWorker/stream.ts

@ -4,15 +4,52 @@
* https://github.com/morethanwords/tweb/blob/master/LICENSE * https://github.com/morethanwords/tweb/blob/master/LICENSE
*/ */
import { deferredPromise } from "../../helpers/cancellablePromise"; import { readBlobAsUint8Array } from "../../helpers/blob";
import { CancellablePromise, deferredPromise } from "../../helpers/cancellablePromise";
import { notifySomeone } from "../../helpers/context"; import { notifySomeone } from "../../helpers/context";
import debounce from "../../helpers/schedulers/debounce"; import debounce from "../../helpers/schedulers/debounce";
import { isSafari } from "../../helpers/userAgent"; import { isSafari } from "../../helpers/userAgent";
import { InputFileLocation, UploadFile } from "../../layer"; import { InputFileLocation, UploadFile } from "../../layer";
import CacheStorageController from "../cacheStorage";
import { DownloadOptions } from "../mtproto/apiFileManager"; import { DownloadOptions } from "../mtproto/apiFileManager";
import { RequestFilePartTask, deferredPromises, incrementTaskId } from "./index.service"; import { RequestFilePartTask, deferredPromises, log } from "./index.service";
import timeout from "./timeout"; import timeout from "./timeout";
const cacheStorage = new CacheStorageController('cachedStreamChunks');
const CHUNK_TTL = 86400;
const CHUNK_CACHED_TIME_HEADER = 'Time-Cached';
const clearOldChunks = () => {
return cacheStorage.timeoutOperation((cache) => {
return cache.keys().then(requests => {
const filtered: Map<StreamId, Request> = new Map();
const timestamp = Date.now() / 1000 | 0;
for(const request of requests) {
const match = request.url.match(/\/(\d+?)\?/);
if(match && !filtered.has(match[1])) {
filtered.set(match[1], request);
}
}
const promises: Promise<any>[] = [];
for(const [id, request] of filtered) {
const promise = cache.match(request).then((response) => {
if((+response.headers.get(CHUNK_CACHED_TIME_HEADER) + CHUNK_TTL) <= timestamp) {
log('will delete stream chunk:', id);
return cache.delete(request, {ignoreSearch: true, ignoreVary: true});
}
});
promises.push(promise);
}
return Promise.all(promises);
});
});
};
setInterval(clearOldChunks, 1800e3);
type StreamRange = [number, number]; type StreamRange = [number, number];
type StreamId = string; type StreamId = string;
const streams: Map<StreamId, Stream> = new Map(); const streams: Map<StreamId, Stream> = new Map();
@ -20,6 +57,7 @@ class Stream {
private destroyDebounced: () => void; private destroyDebounced: () => void;
private id: StreamId; private id: StreamId;
private limitPart: number; private limitPart: number;
private loadedOffsets: Set<number> = new Set();
constructor(private info: DownloadOptions) { constructor(private info: DownloadOptions) {
this.id = Stream.getId(info); this.id = Stream.getId(info);
@ -27,24 +65,85 @@ class Stream {
// ! если грузить очень большое видео чанками по 512Кб в мобильном Safari, то стрим не запустится // ! если грузить очень большое видео чанками по 512Кб в мобильном Safari, то стрим не запустится
this.limitPart = info.size > (75 * 1024 * 1024) ? STREAM_CHUNK_UPPER_LIMIT : STREAM_CHUNK_MIDDLE_LIMIT; this.limitPart = info.size > (75 * 1024 * 1024) ? STREAM_CHUNK_UPPER_LIMIT : STREAM_CHUNK_MIDDLE_LIMIT;
this.destroyDebounced = debounce(this.destroy, 15000, false, true); this.destroyDebounced = debounce(this.destroy, 150000, false, true);
} }
private destroy = () => { private destroy = () => {
streams.delete(this.id); streams.delete(this.id);
}; };
private requestFilePart(alignedOffset: number, limit: number) { private requestFilePartFromWorker(alignedOffset: number, limit: number, fromPreload = false) {
const task: RequestFilePartTask = { const task: Omit<RequestFilePartTask, 'id'> = {
type: 'requestFilePart', type: 'requestFilePart',
id: incrementTaskId(),
payload: [this.info.dcId, this.info.location, alignedOffset, limit] payload: [this.info.dcId, this.info.location, alignedOffset, limit]
}; };
const taskId = JSON.stringify(task);
(task as RequestFilePartTask).id = taskId;
let deferred = deferredPromises[taskId] as CancellablePromise<UploadFile.uploadFile>;
if(deferred) {
return deferred.then(uploadFile => uploadFile.bytes);
}
notifySomeone(task); notifySomeone(task);
this.loadedOffsets.add(alignedOffset);
const deferred = deferredPromises[task.id] = deferredPromise<UploadFile.uploadFile>(); deferred = deferredPromises[taskId] = deferredPromise<UploadFile.uploadFile>();
return deferred; const bytesPromise = deferred.then(uploadFile => uploadFile.bytes);
this.saveChunkToCache(bytesPromise, alignedOffset, limit);
!fromPreload && this.preloadChunks(alignedOffset, alignedOffset + (this.limitPart * 15));
return bytesPromise;
}
private requestFilePartFromCache(alignedOffset: number, limit: number, fromPreload?: boolean) {
const key = this.getChunkKey(alignedOffset, limit);
return cacheStorage.getFile(key).then((blob: Blob) => {
return fromPreload ? new Uint8Array() : readBlobAsUint8Array(blob);
}, (error) => {
if(error === 'NO_ENTRY_FOUND') {
return;
}
});
}
private requestFilePart(alignedOffset: number, limit: number, fromPreload?: boolean) {
return this.requestFilePartFromCache(alignedOffset, limit, fromPreload).then(bytes => {
return bytes || this.requestFilePartFromWorker(alignedOffset, limit, fromPreload);
});
}
private saveChunkToCache(deferred: Promise<Uint8Array>, alignedOffset: number, limit: number) {
return deferred.then(bytes => {
const key = this.getChunkKey(alignedOffset, limit);
const response = new Response(bytes, {
headers: {
'Content-Length': '' + bytes.length,
'Content-Type': 'application/octet-stream',
[CHUNK_CACHED_TIME_HEADER]: '' + (Date.now() / 1000 | 0)
}
});
return cacheStorage.save(key, response);
});
}
private preloadChunks(offset: number, end: number) {
if(end > this.info.size) {
end = this.info.size;
}
for(; offset < end; offset += this.limitPart) {
if(this.loadedOffsets.has(offset)) {
continue;
}
this.loadedOffsets.add(offset);
this.requestFilePart(offset, this.limitPart, true);
}
} }
public requestRange(range: StreamRange) { public requestRange(range: StreamRange) {
@ -66,9 +165,7 @@ class Stream {
const limit = end && end < this.limitPart ? alignLimit(end - offset + 1) : this.limitPart; const limit = end && end < this.limitPart ? alignLimit(end - offset + 1) : this.limitPart;
const alignedOffset = alignOffset(offset, limit); const alignedOffset = alignOffset(offset, limit);
return this.requestFilePart(alignedOffset, limit).then(result => { return this.requestFilePart(alignedOffset, limit).then(ab => {
let ab = result.bytes as Uint8Array;
//log.debug('[stream] requestFilePart result:', result); //log.debug('[stream] requestFilePart result:', result);
const headers: Record<string, string> = { const headers: Record<string, string> = {
@ -96,11 +193,15 @@ class Stream {
}); });
} }
private getChunkKey(alignedOffset: number, limit: number) {
return this.id + '?offset=' + alignedOffset + '&limit=' + limit;
}
public static get(info: DownloadOptions) { public static get(info: DownloadOptions) {
return streams.get(this.getId(info)) ?? new Stream(info); return streams.get(this.getId(info)) ?? new Stream(info);
} }
public static getId(info: DownloadOptions) { private static getId(info: DownloadOptions) {
return (info.location as InputFileLocation.inputDocumentFileLocation).id; return (info.location as InputFileLocation.inputDocumentFileLocation).id;
} }
} }

Loading…
Cancel
Save