From 2874032b3a59b9d3dd70afb936806e13550ed674 Mon Sep 17 00:00:00 2001 From: Eduard Kuzmenko Date: Fri, 5 Mar 2021 19:22:22 +0400 Subject: [PATCH] tmp --- src/config/modes.ts | 7 +- src/helpers/context.ts | 2 +- src/lib/mtproto/mtproto.service.ts | 15 +- src/lib/mtproto/mtproto.worker.ts | 80 ++------ src/lib/mtproto/mtprotoworker.ts | 283 +++++++++++++++-------------- 5 files changed, 175 insertions(+), 212 deletions(-) diff --git a/src/config/modes.ts b/src/config/modes.ts index 6a9240aa..2633dd36 100644 --- a/src/config/modes.ts +++ b/src/config/modes.ts @@ -3,7 +3,12 @@ const Modes = { debug: location.search.indexOf('debug=1') > 0, http: false, //location.search.indexOf('http=1') > 0, ssl: true, // location.search.indexOf('ssl=1') > 0 || location.protocol === 'https:' && location.search.indexOf('ssl=0') === -1, - multipleConnections: true + multipleConnections: true, + asServiceWorker: false }; +/// #if MTPROTO_SW +Modes.asServiceWorker = true; +/// #endif + export default Modes; diff --git a/src/helpers/context.ts b/src/helpers/context.ts index 09a6215e..cadd0959 100644 --- a/src/helpers/context.ts +++ b/src/helpers/context.ts @@ -29,4 +29,4 @@ const notifyWorker = (...args: any[]) => { const noop = () => {}; export const notifySomeone = isServiceWorker ? notifyServiceWorker.bind(null, false) : (isWebWorker ? notifyWorker : noop); -export const notifyAll = isServiceWorker ? notifyServiceWorker.bind(null, true) : (isWebWorker ? notifyWorker : noop); \ No newline at end of file +export const notifyAll = isServiceWorker ? notifyServiceWorker.bind(null, true) : (isWebWorker ? notifyWorker : noop); diff --git a/src/lib/mtproto/mtproto.service.ts b/src/lib/mtproto/mtproto.service.ts index 5c8b3a88..3cae8772 100644 --- a/src/lib/mtproto/mtproto.service.ts +++ b/src/lib/mtproto/mtproto.service.ts @@ -1,3 +1,6 @@ +/// #if MTPROTO_SW +import './mtproto.worker'; +/// #endif import { isSafari } from '../../helpers/userAgent'; import { logger, LogLevels } from '../logger'; import type { DownloadOptions } from './apiFileManager'; @@ -6,11 +9,12 @@ import { notifySomeone } from '../../helpers/context'; import type { InputFileLocation, FileLocation, UploadFile } from '../../layer'; import { CancellablePromise, deferredPromise } from '../../helpers/cancellablePromise'; -const log = logger('SW', LogLevels.error/* | LogLevels.debug | LogLevels.log */); +const log = logger('SW', LogLevels.error | LogLevels.debug | LogLevels.log | LogLevels.warn); const ctx = self as any as ServiceWorkerGlobalScope; const deferredPromises: {[taskId: number]: CancellablePromise} = {}; +/// #if !MTPROTO_SW ctx.addEventListener('message', (e) => { const task = e.data as ServiceWorkerTaskResponse; const promise = deferredPromises[task.id]; @@ -23,6 +27,7 @@ ctx.addEventListener('message', (e) => { delete deferredPromises[task.id]; }); +/// #endif let taskId = 0; @@ -132,10 +137,7 @@ const onChangeState = () => { ctx.onfetch = onFetch; }; -/** - * Service Worker Installation - */ -ctx.addEventListener('install', (event: ExtendableEvent) => { +ctx.addEventListener('install', (event) => { log('installing'); /* initCache(); @@ -146,9 +148,6 @@ ctx.addEventListener('install', (event: ExtendableEvent) => { event.waitUntil(ctx.skipWaiting()); // Activate worker immediately }); -/** - * Service Worker Activation - */ ctx.addEventListener('activate', (event) => { log('activating', ctx); diff --git a/src/lib/mtproto/mtproto.worker.ts b/src/lib/mtproto/mtproto.worker.ts index 4c41c33c..eddfbaec 100644 --- a/src/lib/mtproto/mtproto.worker.ts +++ b/src/lib/mtproto/mtproto.worker.ts @@ -5,51 +5,10 @@ import apiManager from "./apiManager"; import cryptoWorker from "../crypto/cryptoworker"; import networkerFactory from "./networkerFactory"; import apiFileManager from './apiFileManager'; -//import { logger, LogLevels } from '../logger'; import type { ServiceWorkerTask, ServiceWorkerTaskResponse } from './mtproto.service'; import { ctx } from '../../helpers/userAgent'; import { socketsProxied } from './dcConfigurator'; - -//const log = logger('DW', LogLevels.error); - -//console.error('INCLUDE !!!', new Error().stack); - -/* function isObject(object: any) { - return typeof(object) === 'object' && object !== null; -} */ - -/* function fillTransfer(transfer: any, obj: any) { - if(!obj) return; - - if(obj instanceof ArrayBuffer) { - transfer.add(obj); - } else if(obj.buffer && obj.buffer instanceof ArrayBuffer) { - transfer.add(obj.buffer); - } else if(isObject(obj)) { - for(var i in obj) { - fillTransfer(transfer, obj[i]); - } - } else if(Array.isArray(obj)) { - obj.forEach(value => { - fillTransfer(transfer, value); - }); - } -} */ - -function respond(...args: any[]) { - // отключил для всего потому что не успел пофиксить transfer detached - //if(isSafari(self)/* || true */) { - // @ts-ignore - ctx.postMessage(...args); - /* } else { - var transfer = new Set(); - fillTransfer(transfer, arguments); - - //console.log('reply', transfer, [...transfer]); - ctx.postMessage(...arguments, [...transfer]); - //console.log('reply', transfer, [...transfer]); - } */ -} +import { notifyAll } from '../../helpers/context'; let webpSupported = false; export const isWebpSupported = () => { @@ -57,30 +16,18 @@ export const isWebpSupported = () => { }; networkerFactory.setUpdatesProcessor((obj) => { - respond({update: obj}); + notifyAll({update: obj}); }); networkerFactory.onConnectionStatusChange = (status) => { - respond({type: 'connectionStatusChange', payload: status}); + notifyAll({type: 'connectionStatusChange', payload: status}); }; -/* ctx.onerror = (error) => { - console.error('error:', error); -}; - -ctx.onunhandledrejection = (error) => { - console.error('onunhandledrejection:', error); -}; */ - const onMessage = async(e: any) => { try { const task = e.data; const taskId = task.taskId; - //log.debug('got message:', taskId, task); - - //debugger; - if(task.type === 'convertWebp') { const {fileName, bytes} = task.payload; const deferred = apiFileManager.webpConvertPromises[fileName]; @@ -105,7 +52,7 @@ const onMessage = async(e: any) => { responseTask.error = err; } - respond(responseTask); + notifyAll(responseTask); return; } else if(task.type === 'webpSupport') { webpSupported = task.payload; @@ -134,7 +81,7 @@ const onMessage = async(e: any) => { case 'gzipUncompress': // @ts-ignore return cryptoWorker[task.task].apply(cryptoWorker, task.args).then(result => { - respond({taskId, result}); + notifyAll({taskId, result}); }); case 'setQueueId': @@ -152,9 +99,9 @@ const onMessage = async(e: any) => { result = await result; } - respond({taskId, result}); + notifyAll({taskId, result}); } catch(error) { - respond({taskId, error}); + notifyAll({taskId, error}); } break; @@ -163,7 +110,7 @@ const onMessage = async(e: any) => { case 'getNetworker': { // @ts-ignore apiManager[task.task].apply(apiManager, task.args).finally(() => { - respond({taskId, result: null}); + notifyAll({taskId, result: null}); }); break; @@ -177,10 +124,12 @@ const onMessage = async(e: any) => { if(result instanceof Promise) { result = await result; } + + //console.log(notifyAll); - respond({taskId, result}); + notifyAll({taskId, result}); } catch(error) { - respond({taskId, error}); + notifyAll({taskId, error}); } //throw new Error('Unknown task: ' + task.task); @@ -192,7 +141,6 @@ const onMessage = async(e: any) => { } }; -ctx.addEventListener('message', onMessage); - //console.log('[WORKER] Will send ready', Date.now() / 1000); -ctx.postMessage('ready'); +ctx.addEventListener('message', onMessage); +notifyAll('ready'); diff --git a/src/lib/mtproto/mtprotoworker.ts b/src/lib/mtproto/mtprotoworker.ts index d6816474..c07ec30f 100644 --- a/src/lib/mtproto/mtprotoworker.ts +++ b/src/lib/mtproto/mtprotoworker.ts @@ -23,8 +23,6 @@ type Task = { args: any[] }; -const USE_WORKER_AS_WORKER = true; - type HashResult = { hash: number, result: any @@ -66,7 +64,10 @@ export class ApiManagerProxy extends CryptoWorkerMethods { this.log('constructor'); this.registerServiceWorker(); + + /// #if !MTPROTO_SW this.registerWorker(); + /// #endif } public isServiceWorkerOnline() { @@ -75,25 +76,29 @@ export class ApiManagerProxy extends CryptoWorkerMethods { private registerServiceWorker() { if(!('serviceWorker' in navigator)) return; - + navigator.serviceWorker.register('./sw.js', {scope: './'}).then(registration => { + this.log('SW registered', registration); this.isSWRegistered = true; + + const sw = registration.installing || registration.waiting || registration.active; + sw.addEventListener('statechange', (e) => { + this.log('SW statechange', e); + }); + + const controller = navigator.serviceWorker.controller || registration.installing || registration.waiting || registration.active; + this.onWorkerFirstMessage(controller); }, (err) => { this.isSWRegistered = false; this.log.error('SW registration failed!', err); appDocsManager.onServiceWorkerFail(); }); - navigator.serviceWorker.ready.then((registration) => { - this.log('set SW'); - this.releasePending(); - - if(!USE_WORKER_AS_WORKER) { - this.postMessage = navigator.serviceWorker.controller.postMessage.bind(navigator.serviceWorker.controller); - } + /* navigator.serviceWorker.ready.then((registration) => { + this.log('set SW', navigator.serviceWorker); //registration.update(); - }); + }); */ navigator.serviceWorker.addEventListener('controllerchange', () => { this.log.warn('controllerchange'); @@ -104,9 +109,9 @@ export class ApiManagerProxy extends CryptoWorkerMethods { }); }); - /** - * Message resolver - */ + /// #if MTPROTO_SW + navigator.serviceWorker.addEventListener('message', this.onWorkerMessage); + /// #else navigator.serviceWorker.addEventListener('message', (e) => { const task: ServiceWorkerTask = e.data; if(!isObject(task)) { @@ -115,144 +120,150 @@ export class ApiManagerProxy extends CryptoWorkerMethods { this.postMessage(task); }); + /// #endif navigator.serviceWorker.addEventListener('messageerror', (e) => { this.log.error('SW messageerror:', e); }); } + private onWorkerFirstMessage(worker: any) { + if(!this.worker) { + this.worker = worker; + this.log('set webWorker'); + + this.postMessage = this.worker.postMessage.bind(this.worker); + + const isWebpSupported = webpWorkerController.isWebpSupported(); + this.log('WebP supported:', isWebpSupported); + this.postMessage({type: 'webpSupport', payload: isWebpSupported}); + + this.releasePending(); + } + } + + private onWorkerMessage = (e: MessageEvent) => { + this.log('got message from worker:', e.data); + + const task = e.data; + + if(!isObject(task)) { + return; + } + + if(task.update) { + if(this.updatesProcessor) { + this.updatesProcessor(task.update); + } + } else if(task.progress) { + rootScope.broadcast('download_progress', task.progress); + } else if(task.type === 'reload') { + location.reload(); + } else if(task.type === 'connectionStatusChange') { + rootScope.broadcast('connection_status_change', task.payload); + } else if(task.type === 'convertWebp') { + webpWorkerController.postMessage(task); + } else if((task as ServiceWorkerTaskResponse).type === 'requestFilePart') { + const _task = task as ServiceWorkerTaskResponse; + + if(_task.error) { + const onError = (error: ApiError) => { + if(error?.type === 'FILE_REFERENCE_EXPIRED') { + // @ts-ignore + const bytes = _task.originalPayload[1].file_reference; + referenceDatabase.refreshReference(bytes).then(() => { + // @ts-ignore + _task.originalPayload[1].file_reference = referenceDatabase.getReferenceByLink(bytes); + const newTask: ServiceWorkerTask = { + type: _task.type, + id: _task.id, + payload: _task.originalPayload + }; + + this.postMessage(newTask); + }).catch(onError); + } else { + navigator.serviceWorker.controller.postMessage(task); + } + }; + + onError(_task.error); + } else { + navigator.serviceWorker.controller.postMessage(task); + } + } else if(task.type === 'socketProxy') { + const socketTask = task.payload; + const id = socketTask.id; + //console.log('socketProxy', socketTask, id); + + if(socketTask.type === 'send') { + const socket = this.sockets.get(id); + socket.send(socketTask.payload); + } else if(socketTask.type === 'close') { + const socket = this.sockets.get(id); + socket.close(); + } else if(socketTask.type === 'setup') { + const socket = new Socket(socketTask.payload.dcId, socketTask.payload.url, socketTask.payload.logSuffix); + + const onOpen = () => { + //console.log('socketProxy onOpen'); + this.postMessage({ + type: 'socketProxy', + payload: { + type: 'open', + id + } + }); + }; + const onClose = () => { + this.postMessage({ + type: 'socketProxy', + payload: { + type: 'close', + id + } + }); + + socket.removeListener('open', onOpen); + socket.removeListener('close', onClose); + socket.removeListener('message', onMessage); + this.sockets.delete(id); + }; + const onMessage = (buffer: ArrayBuffer) => { + this.postMessage({ + type: 'socketProxy', + payload: { + type: 'message', + id, + payload: buffer + } + }); + }; + + socket.addListener('open', onOpen); + socket.addListener('close', onClose); + socket.addListener('message', onMessage); + this.sockets.set(id, socket); + } + } else if(task.hasOwnProperty('result') || task.hasOwnProperty('error')) { + this.finalizeTask(task.taskId, task.result, task.error); + } + }; + + /// #if !MTPROTO_SW private registerWorker() { //return; const worker = new MTProtoWorker(); //const worker = window; - worker.addEventListener('message', (e) => { - if(!this.worker) { - this.worker = worker as any; - this.log('set webWorker'); - - if(USE_WORKER_AS_WORKER) { - this.postMessage = this.worker.postMessage.bind(this.worker); - } - - const isWebpSupported = webpWorkerController.isWebpSupported(); - this.log('WebP supported:', isWebpSupported); - this.postMessage({type: 'webpSupport', payload: isWebpSupported}); - - this.releasePending(); - } - - //this.log('got message from worker:', e.data); - - const task = e.data; - - if(!isObject(task)) { - return; - } - - if(task.update) { - if(this.updatesProcessor) { - this.updatesProcessor(task.update); - } - } else if(task.progress) { - rootScope.broadcast('download_progress', task.progress); - } else if(task.type === 'reload') { - location.reload(); - } else if(task.type === 'connectionStatusChange') { - rootScope.broadcast('connection_status_change', task.payload); - } else if(task.type === 'convertWebp') { - webpWorkerController.postMessage(task); - } else if((task as ServiceWorkerTaskResponse).type === 'requestFilePart') { - const _task = task as ServiceWorkerTaskResponse; - - if(_task.error) { - const onError = (error: ApiError) => { - if(error?.type === 'FILE_REFERENCE_EXPIRED') { - // @ts-ignore - const bytes = _task.originalPayload[1].file_reference; - referenceDatabase.refreshReference(bytes).then(() => { - // @ts-ignore - _task.originalPayload[1].file_reference = referenceDatabase.getReferenceByLink(bytes); - const newTask: ServiceWorkerTask = { - type: _task.type, - id: _task.id, - payload: _task.originalPayload - }; - - this.postMessage(newTask); - }).catch(onError); - } else { - navigator.serviceWorker.controller.postMessage(task); - } - }; - - onError(_task.error); - } else { - navigator.serviceWorker.controller.postMessage(task); - } - } else if(task.type === 'socketProxy') { - const socketTask = task.payload; - const id = socketTask.id; - //console.log('socketProxy', socketTask, id); - - if(socketTask.type === 'send') { - const socket = this.sockets.get(id); - socket.send(socketTask.payload); - } else if(socketTask.type === 'close') { - const socket = this.sockets.get(id); - socket.close(); - } else if(socketTask.type === 'setup') { - const socket = new Socket(socketTask.payload.dcId, socketTask.payload.url, socketTask.payload.logSuffix); - - const onOpen = () => { - //console.log('socketProxy onOpen'); - this.postMessage({ - type: 'socketProxy', - payload: { - type: 'open', - id - } - }); - }; - const onClose = () => { - this.postMessage({ - type: 'socketProxy', - payload: { - type: 'close', - id - } - }); - - socket.removeListener('open', onOpen); - socket.removeListener('close', onClose); - socket.removeListener('message', onMessage); - this.sockets.delete(id); - }; - const onMessage = (buffer: ArrayBuffer) => { - this.postMessage({ - type: 'socketProxy', - payload: { - type: 'message', - id, - payload: buffer - } - }); - }; - - socket.addListener('open', onOpen); - socket.addListener('close', onClose); - socket.addListener('message', onMessage); - this.sockets.set(id, socket); - } - } else if(task.hasOwnProperty('result') || task.hasOwnProperty('error')) { - this.finalizeTask(task.taskId, task.result, task.error); - } - }); + worker.addEventListener('message', this.onWorkerFirstMessage.bind(this, worker), {once: true}); + worker.addEventListener('message', this.onWorkerMessage); worker.addEventListener('error', (err) => { this.log.error('WORKER ERROR', err); }); } + /// #endif private finalizeTask(taskId: number, result: any, error: any) { const deferred = this.awaiting[taskId];