From 446949daa6f51593d16dc22fbbc6168a2bbb28cb Mon Sep 17 00:00:00 2001 From: Eduard Kuzmenko Date: Mon, 1 Aug 2022 10:02:57 +0200 Subject: [PATCH] Refactor service worker's message port Fix message port memory leak --- src/helpers/context.ts | 2 + src/helpers/listenMessagePort.ts | 17 ++- src/lib/crypto/cryptoMessagePort.ts | 4 + src/lib/mtproto/mtproto.worker.ts | 35 +++++ src/lib/mtproto/mtprotoMessagePort.ts | 6 +- src/lib/mtproto/mtprotoworker.ts | 114 +++++++---------- src/lib/mtproto/superMessagePort.ts | 113 +++++++++++------ src/lib/mtproto/webPushApiManager.ts | 32 ++--- src/lib/serviceWorker/index.service.ts | 134 ++++++++------------ src/lib/serviceWorker/push.ts | 26 ++-- src/lib/serviceWorker/serviceMessagePort.ts | 52 ++++++++ src/lib/serviceWorker/stream.ts | 91 ++++++------- 12 files changed, 353 insertions(+), 273 deletions(-) create mode 100644 src/lib/serviceWorker/serviceMessagePort.ts diff --git a/src/helpers/context.ts b/src/helpers/context.ts index 323553c4..7d0106b8 100644 --- a/src/helpers/context.ts +++ b/src/helpers/context.ts @@ -15,6 +15,8 @@ export const getWindowClients = () => { .matchAll({includeUncontrolled: false, type: 'window'}); }; +export const getLastWindowClient = () => getWindowClients().then((windowClients) => windowClients.slice(-1)[0]); + const postMessage = (listener: WindowClient | DedicatedWorkerGlobalScope, ...args: any[]) => { try { // @ts-ignore diff --git a/src/helpers/listenMessagePort.ts b/src/helpers/listenMessagePort.ts index 088fd0a9..5dc01718 100644 --- a/src/helpers/listenMessagePort.ts +++ b/src/helpers/listenMessagePort.ts @@ -4,24 +4,27 @@ * https://github.com/morethanwords/tweb/blob/master/LICENSE */ +import type SuperMessagePort from "../lib/mtproto/superMessagePort"; import ctx from "../environment/ctx"; -import SuperMessagePort from "../lib/mtproto/superMessagePort"; export default function listenMessagePort( messagePort: SuperMessagePort, onConnect?: (source: MessageEventSource) => void, onDisconnect?: (source: MessageEventSource) => void ) { - const attachPort = (s: any) => { - messagePort.attachPort(s); - onConnect && onConnect(s); + const attachPort = (listenPort: any, sendPort: any) => { + messagePort.attachListenPort(listenPort); + sendPort && messagePort.attachSendPort(sendPort); + onConnect?.(listenPort); }; - onDisconnect && messagePort.setOnPortDisconnect(onDisconnect); + messagePort.setOnPortDisconnect(onDisconnect); if(typeof(SharedWorkerGlobalScope) !== 'undefined') { - (ctx as any as SharedWorkerGlobalScope).addEventListener('connect', (e) => attachPort(e.source)); + (ctx as any as SharedWorkerGlobalScope).addEventListener('connect', (e) => attachPort(e.source, e.source)); + } else if(typeof(ServiceWorkerGlobalScope) !== 'undefined') { + attachPort(ctx, null); } else { - attachPort(ctx); + attachPort(ctx, ctx); } } diff --git a/src/lib/crypto/cryptoMessagePort.ts b/src/lib/crypto/cryptoMessagePort.ts index dd754aa6..03414c62 100644 --- a/src/lib/crypto/cryptoMessagePort.ts +++ b/src/lib/crypto/cryptoMessagePort.ts @@ -16,6 +16,10 @@ type CryptoEvent = { }; export class CryptoMessagePort extends SuperMessagePort { + constructor() { + super('CRYPTO'); + } + public invokeCrypto(method: T, ...args: Parameters): Promise>> { const payload = {method, args}; const listeners = this.listeners['invoke']; diff --git a/src/lib/mtproto/mtproto.worker.ts b/src/lib/mtproto/mtproto.worker.ts index 4fb54d97..2dca75a5 100644 --- a/src/lib/mtproto/mtproto.worker.ts +++ b/src/lib/mtproto/mtproto.worker.ts @@ -20,12 +20,19 @@ import { logger } from '../logger'; import { State } from '../../config/state'; import toggleStorages from '../../helpers/toggleStorages'; import appTabsManager from '../appManagers/appTabsManager'; +import ServiceMessagePort from '../serviceWorker/serviceMessagePort'; +import callbackify from '../../helpers/callbackify'; let _isServiceWorkerOnline = true; export function isServiceWorkerOnline() { return _isServiceWorkerOnline; } +let serviceMessagePort: ServiceMessagePort, _serviceMessagePort: MessagePort; +export function getServiceMessagePort() { + return _isServiceWorkerOnline ? serviceMessagePort : undefined; +} + const log = logger('MTPROTO'); // let haveState = false; @@ -73,6 +80,28 @@ port.addMultipleEventsListeners({ _isServiceWorkerOnline = online; }, + serviceWorkerPort: (payload, source, event) => { + if(serviceMessagePort) { + serviceMessagePort.detachPort(_serviceMessagePort); + _serviceMessagePort = undefined; + } else { + serviceMessagePort = new ServiceMessagePort(); + serviceMessagePort.addMultipleEventsListeners({ + requestFilePart: (payload) => { + return callbackify(appManagersManager.getManagers(), (managers) => { + const {docId, dcId, offset, limit} = payload; + return managers.appDocsManager.requestDocPart(docId, dcId, offset, limit); + }); + } + }); + } + + // * port can be undefined in the future + if(_serviceMessagePort = event.ports[0]) { + serviceMessagePort.attachPort(_serviceMessagePort); + } + }, + createObjectURL: (blob) => { return URL.createObjectURL(blob); }, @@ -99,8 +128,14 @@ appManagersManager.start(); appManagersManager.getManagers(); appTabsManager.start(); +// let sentHello = false; listenMessagePort(port, (source) => { appTabsManager.addTab(source); + + // if(!sentHello) { + // port.invokeVoid('hello', undefined, source); + // sentHello = true; + // } }, (source) => { appTabsManager.deleteTab(source); }); diff --git a/src/lib/mtproto/mtprotoMessagePort.ts b/src/lib/mtproto/mtprotoMessagePort.ts index 1b9c33be..6ea75302 100644 --- a/src/lib/mtproto/mtprotoMessagePort.ts +++ b/src/lib/mtproto/mtprotoMessagePort.ts @@ -27,6 +27,7 @@ export default class MTProtoMessagePort extends S manager: (payload: MTProtoManagerTaskPayload) => any, toggleStorages: (payload: {enabled: boolean, clearWrite: boolean}) => ReturnType, serviceWorkerOnline: (online: boolean) => void, + serviceWorkerPort: (payload: void, source: MessageEventSource, event: MessageEvent) => void, cryptoPort: (payload: void, source: MessageEventSource, event: MessageEvent) => void, createObjectURL: (blob: Blob) => string, tabState: (payload: TabState, source: MessageEventSource) => void, @@ -35,12 +36,13 @@ export default class MTProtoMessagePort extends S convertOpus: (payload: {fileName: string, bytes: Uint8Array}) => Promise, localStorageProxy: (payload: LocalStorageProxyTask['payload']) => Promise, mirror: (payload: MirrorTaskPayload) => void, - notificationBuild: (payload: NotificationBuildTaskPayload) => void + notificationBuild: (payload: NotificationBuildTaskPayload) => void, + // hello: () => void } & MTProtoBroadcastEvent, Master> { private static INSTANCE: MTProtoMessagePort; constructor() { - super(); + super('MTPROTO'); MTProtoMessagePort.INSTANCE = this; diff --git a/src/lib/mtproto/mtprotoworker.ts b/src/lib/mtproto/mtprotoworker.ts index 4b86aff0..3342044f 100644 --- a/src/lib/mtproto/mtprotoworker.ts +++ b/src/lib/mtproto/mtprotoworker.ts @@ -4,8 +4,7 @@ * https://github.com/morethanwords/tweb/blob/master/LICENSE */ -import type { RequestFilePartTask, RequestFilePartTaskResponse, ServiceWorkerTask } from '../serviceWorker/index.service'; -import type { Awaited, WorkerTaskVoidTemplate } from '../../types'; +import type { Awaited } from '../../types'; import type { CacheStorageDbName } from '../cacheStorage'; import type { State } from '../../config/state'; import type { Message, MessagePeerReaction, PeerNotifySettings } from '../../layer'; @@ -18,7 +17,6 @@ import webPushApiManager from './webPushApiManager'; import appRuntimeManager from '../appManagers/appRuntimeManager'; import telegramMeWebManager from './telegramMeWebManager'; import pause from '../../helpers/schedulers/pause'; -import isObject from '../../helpers/object/isObject'; import ENVIRONMENT from '../../environment'; import loadState from '../appManagers/utils/state/loadState'; import opusDecodeController from '../opusDecodeController'; @@ -28,11 +26,7 @@ import SuperMessagePort from './superMessagePort'; import IS_SHARED_WORKER_SUPPORTED from '../../environment/sharedWorkerSupport'; import toggleStorages from '../../helpers/toggleStorages'; import idleController from '../../helpers/idleController'; - -export interface ToggleStorageTask extends WorkerTaskVoidTemplate { - type: 'toggleStorages', - payload: {enabled: boolean, clearWrite: boolean} -}; +import ServiceMessagePort from '../serviceWorker/serviceMessagePort'; export type Mirrors = { state: State @@ -57,10 +51,8 @@ export type TabState = { }; class ApiManagerProxy extends MTProtoMessagePort { - private worker: /* Window */Worker; - private isSWRegistered: boolean; + // private worker: /* Window */Worker; // private sockets: Map = new Map(); - private taskListenersSW: {[taskType: string]: (task: any) => void}; private mirrors: Mirrors; public newVersion: string; @@ -68,11 +60,12 @@ class ApiManagerProxy extends MTProtoMessagePort { private tabState: TabState; + public serviceMessagePort: ServiceMessagePort; + private lastServiceWorker: ServiceWorker; + constructor() { super(); - this.isSWRegistered = true; - this.taskListenersSW = {}; this.mirrors = {} as any; this.tabState = { chatPeerIds: [], @@ -200,78 +193,80 @@ class ApiManagerProxy extends MTProtoMessagePort { // this.sendState(); } - private registerServiceWorker() { - if(!('serviceWorker' in navigator)) return; - - // ! I hate webpack - it won't load it by using worker.register, only navigator.serviceWork will do it. - const worker = navigator.serviceWorker; + private attachServiceWorker(serviceWorker: ServiceWorker) { + this.lastServiceWorker && this.serviceMessagePort.detachPort(this.lastServiceWorker); + this.serviceMessagePort.attachSendPort(this.lastServiceWorker = serviceWorker); + this.serviceMessagePort.invokeVoid('hello', undefined); + } + + private _registerServiceWorker() { navigator.serviceWorker.register( /* webpackChunkName: "sw" */ new URL('../serviceWorker/index.service', import.meta.url), {scope: './'} ).then((registration) => { this.log('SW registered', registration); - this.isSWRegistered = true; + + // ! doubtful fix for hard refresh + if(registration.active && !navigator.serviceWorker.controller) { + return registration.unregister().then(() => { + window.location.reload(); + }); + } const sw = registration.installing || registration.waiting || registration.active; sw.addEventListener('statechange', (e) => { this.log('SW statechange', e); }); - //this.postSWMessage = worker.controller.postMessage.bind(worker.controller); - + const controller = navigator.serviceWorker.controller || registration.installing || registration.waiting || registration.active; + this.attachServiceWorker(controller); + /// #if MTPROTO_SW - const controller = worker.controller || registration.installing || registration.waiting || registration.active; this.onWorkerFirstMessage(controller); /// #endif }, (err) => { - this.isSWRegistered = false; this.log.error('SW registration failed!', err); this.invokeVoid('serviceWorkerOnline', false); }); + } + + private registerServiceWorker() { + if(!('serviceWorker' in navigator)) return; + + this.serviceMessagePort = new ServiceMessagePort(); + + // this.addMultipleEventsListeners({ + // hello: () => { + // // this.serviceMessagePort.invokeVoid('port', undefined); + // } + // }); + + // ! I hate webpack - it won't load it by using worker.register, only navigator.serviceWorker will do it. + const worker = navigator.serviceWorker; + this._registerServiceWorker(); worker.addEventListener('controllerchange', () => { this.log.warn('controllerchange'); - worker.controller.addEventListener('error', (e) => { + const controller = worker.controller; + this.attachServiceWorker(controller); + + controller.addEventListener('error', (e) => { this.log.error('controller error:', e); }); }); /// #if MTPROTO_SW this.attachListenPort(worker); - // this.s(); /// #else - worker.addEventListener('message', (e) => { - const task: ServiceWorkerTask = e.data; - if(!isObject(task)) { - return; - } - - const callback = this.taskListenersSW[task.type]; - if(callback) { - callback(task); + this.serviceMessagePort.attachListenPort(worker); + this.serviceMessagePort.addMultipleEventsListeners({ + port: (payload, source, event) => { + this.invokeVoid('serviceWorkerPort', undefined, undefined, [event.ports[0]]); } }); - - this.addServiceWorkerTaskListener('requestFilePart', (task: RequestFilePartTask) => { - const responseTask: RequestFilePartTaskResponse = { - type: task.type, - id: task.id - }; - - const {docId, dcId, offset, limit} = task.payload; - rootScope.managers.appDocsManager.requestDocPart(docId, dcId, offset, limit) - .then((uploadFile) => { - responseTask.payload = uploadFile; - this.postSWMessage(responseTask); - }, (err) => { - responseTask.originalPayload = task.payload; - responseTask.error = err; - this.postSWMessage(responseTask); - }); - }); /// #endif worker.addEventListener('messageerror', (e) => { @@ -334,16 +329,10 @@ class ApiManagerProxy extends MTProtoMessagePort { }); } - public postSWMessage(message: any) { - if(navigator.serviceWorker.controller) { - navigator.serviceWorker.controller.postMessage(message); - } - } - private onWorkerFirstMessage(worker: any) { this.log('set webWorker'); - this.worker = worker; + // this.worker = worker; /// #if MTPROTO_SW this.attachSendPort(worker); /// #else @@ -351,10 +340,6 @@ class ApiManagerProxy extends MTProtoMessagePort { /// #endif } - public addServiceWorkerTaskListener(name: keyof ApiManagerProxy['taskListenersSW'], callback: ApiManagerProxy['taskListenersSW'][typeof name]) { - this.taskListenersSW[name] = callback; - } - private loadState() { return Promise.all([ loadState().then((stateResult) => { @@ -384,8 +369,7 @@ class ApiManagerProxy extends MTProtoMessagePort { public async toggleStorages(enabled: boolean, clearWrite: boolean) { await toggleStorages(enabled, clearWrite); this.invoke('toggleStorages', {enabled, clearWrite}); - const task: ToggleStorageTask = {type: 'toggleStorages', payload: {enabled, clearWrite}}; - this.postSWMessage(task); + this.serviceMessagePort.invokeVoid('toggleStorages', {enabled, clearWrite}); } public async getMirror(name: T) { diff --git a/src/lib/mtproto/superMessagePort.ts b/src/lib/mtproto/superMessagePort.ts index ec99bb4e..6afc7ff1 100644 --- a/src/lib/mtproto/superMessagePort.ts +++ b/src/lib/mtproto/superMessagePort.ts @@ -7,7 +7,7 @@ import DEBUG from "../../config/debug"; import ctx from "../../environment/ctx"; import indexOfAndSplice from "../../helpers/array/indexOfAndSplice"; -import { IS_SERVICE_WORKER, IS_WORKER, notifyAll } from "../../helpers/context"; +import { IS_WORKER } from "../../helpers/context"; import EventListenerBase from "../../helpers/eventListenerBase"; import { Awaited, WorkerTaskTemplate, WorkerTaskVoidTemplate } from "../../types"; import { logger } from "../logger"; @@ -57,7 +57,11 @@ interface CloseTask extends SuperMessagePortTask { type: 'close' } -type Task = InvokeTask | ResultTask | AckTask | PingTask | PongTask | BatchTask | CloseTask; +// interface OpenTask extends SuperMessagePortTask { +// type: 'open' +// } + +type Task = InvokeTask | ResultTask | AckTask | PingTask | PongTask | BatchTask | CloseTask/* | OpenTask */; type TaskMap = { [type in Task as type['type']]?: (task: Extract, source: MessageEventSource, event: MessageEvent) => void | Promise }; @@ -75,7 +79,10 @@ export type AckedResult = { // }; type ListenPort = WindowProxy | MessagePort | ServiceWorker | Worker | ServiceWorkerContainer; -type SendPort = WindowProxy | MessagePort | ServiceWorker | Worker; +type SendPort = Pick/* WindowProxy | MessagePort | ServiceWorker | Worker */; + +export type MessageListenPort = ListenPort; +export type MessageSendPort = SendPort; type ListenerCallback = (payload: any, source: MessageEventSource, event: MessageEvent) => any; type Listeners = Record; @@ -99,7 +106,8 @@ export default class SuperMessagePort< [id: number]: { resolve: any, reject: any, - taskType: string + taskType: string, + port?: SendPort } }; protected pending: Map; @@ -111,30 +119,18 @@ export default class SuperMessagePort< protected processTaskMap: TaskMap; protected onPortDisconnect: (source: MessageEventSource) => void; + // protected onPortConnect: (source: MessageEventSource) => void; - constructor() { + constructor(logSuffix?: string) { super(false); - this.processTaskMap = { - result: this.processResultTask, - ack: this.processAckTask, - invoke: this.processInvokeTask, - ping: this.processPingTask, - pong: this.processPongTask, - close: this.processCloseTask - }; - } - - public _constructor() { - super._constructor(false); - this.listenPorts = []; this.sendPorts = []; this.pingResolves = new Map(); this.taskId = 0; this.awaiting = {}; this.pending = new Map(); - this.log = logger('MP'); + this.log = logger('MP' + (logSuffix ? '-' + logSuffix : '')); this.debug = DEBUG; if(typeof(window) !== 'undefined') { @@ -143,12 +139,26 @@ export default class SuperMessagePort< this.postMessage(undefined, task); }); } + + this.processTaskMap = { + result: this.processResultTask, + ack: this.processAckTask, + invoke: this.processInvokeTask, + ping: this.processPingTask, + pong: this.processPongTask, + close: this.processCloseTask, + // open: this.processOpenTask + }; } public setOnPortDisconnect(callback: (source: MessageEventSource) => void) { this.onPortDisconnect = callback; } + // public setOnPortConnect(callback: (source: MessageEventSource) => void) { + // this.onPortConnect = callback; + // } + public attachPort(port: MessageEventSource) { this.attachListenPort(port); this.attachSendPort(port); @@ -160,14 +170,17 @@ export default class SuperMessagePort< } public attachSendPort(port: SendPort) { - this.log.warn('attaching port'); + this.log.warn('attaching send port'); - if((port as MessagePort).start) { - (port as MessagePort).start(); - } + (port as MessagePort).start?.(); this.sendPorts.push(port); // this.sendPing(port); + + // const task = this.createTask('open', undefined); + // this.postMessage(port, task); + + this.releasePending(); } // ! Can't rely on ping because timers can be suspended @@ -207,17 +220,25 @@ export default class SuperMessagePort< // }, timeout); // } - protected detachPort(port: SendPort) { + public detachPort(port: ListenPort) { this.log.warn('disconnecting port'); - port.removeEventListener('message', this.onMessage as any); indexOfAndSplice(this.listenPorts, port); - indexOfAndSplice(this.sendPorts, port); - if((port as MessagePort).close) { - (port as MessagePort).close(); - } + indexOfAndSplice(this.sendPorts, port as any); + + port.removeEventListener?.('message', this.onMessage as any); + (port as MessagePort).close?.(); - this.onPortDisconnect && this.onPortDisconnect(port as any); + this.onPortDisconnect?.(port as any); + + const error = new Error('PORT_DISCONNECTED'); + for(const id in this.awaiting) { + const task = this.awaiting[id]; + if(task.port === port) { + task.reject(error); + delete this.awaiting[id]; + } + } } protected postMessage(port: SendPort | SendPort[], task: Task) { @@ -248,7 +269,7 @@ export default class SuperMessagePort< protected /* async */ releasePending() { //return; - if(!this.listenPorts.length || this.releasingPending) { + if(/* !this.listenPorts.length || !this.sendPorts.length || */this.releasingPending) { return; } @@ -276,6 +297,10 @@ export default class SuperMessagePort< // }); const tasks = portTasks; + const ports = port ? [port] : this.sendPorts; + if(!ports.length) { + return; + } tasks.forEach((task) => { // if(task.type === 'batch') { @@ -283,19 +308,20 @@ export default class SuperMessagePort< // } try { - if(IS_SERVICE_WORKER) { - notifyAll(task); - } else { - this.postMessage(port, task); - } + // if(IS_SERVICE_WORKER && !port) { + // notifyAll(task); + // } else { + this.postMessage(ports, task); + // } } catch(err) { - this.log.error('postMessage error:', err, task, port); + this.log.error('postMessage error:', err, task, ports); } }); + + this.pending.delete(port); }); this.debug && this.log.debug('released tasks'); - this.pending.clear(); this.releasingPending = false; } @@ -353,6 +379,10 @@ export default class SuperMessagePort< }; previousResolve(ret); + + if(payload.cached) { + delete this.awaiting[payload.taskId]; + } }; protected processPingTask = (task: PingTask, source: MessageEventSource, event: MessageEvent) => { @@ -371,6 +401,11 @@ export default class SuperMessagePort< this.detachPort(source); }; + // * it's just an 'open' callback, DO NOT attach port from here + // protected processOpenTask = (task: OpenTask, source: MessageEventSource, event: MessageEvent) => { + // this.onPortConnect?.(source); + // }; + protected processInvokeTask = async(task: InvokeTask, source: MessageEventSource, event: MessageEvent) => { const id = task.id; const innerTask = task.payload; @@ -481,7 +516,7 @@ export default class SuperMessagePort< let task: InvokeTask; const promise = new Promise>>((resolve, reject) => { task = this.createInvokeTask(type as string, payload, withAck, undefined, transfer); - this.awaiting[task.id] = {resolve, reject, taskType: type as string}; + this.awaiting[task.id] = {resolve, reject, taskType: type as string, port}; this.pushTask(task, port); }); diff --git a/src/lib/mtproto/webPushApiManager.ts b/src/lib/mtproto/webPushApiManager.ts index 05854167..e0cc9dae 100644 --- a/src/lib/mtproto/webPushApiManager.ts +++ b/src/lib/mtproto/webPushApiManager.ts @@ -9,7 +9,9 @@ * https://github.com/zhukov/webogram/blob/master/LICENSE */ -import type { ServiceWorkerNotificationsClearTask, ServiceWorkerPingTask, ServiceWorkerPushClickTask } from "../serviceWorker/index.service"; +import type { PushNotificationObject } from "../serviceWorker/push"; +import type { ServicePushPingTaskPayload } from "../serviceWorker/serviceMessagePort"; +import type { NotificationSettings } from "../appManagers/uiNotificationsManager"; import { MOUNT_CLASS_TO } from "../../config/debug"; import { logger } from "../logger"; import apiManagerProxy from "./mtprotoworker"; @@ -17,10 +19,8 @@ import I18n, { LangPackKey } from "../langPack"; import { IS_MOBILE } from "../../environment/userAgent"; import appRuntimeManager from "../appManagers/appRuntimeManager"; import copy from "../../helpers/object/copy"; -import type { NotificationSettings } from "../appManagers/uiNotificationsManager"; import singleInstance from "./singleInstance"; import EventListenerBase from "../../helpers/eventListenerBase"; -import type { PushNotificationObject } from "../serviceWorker/push"; export type PushSubscriptionNotifyType = 'init' | 'subscribe' | 'unsubscribe'; export type PushSubscriptionNotifyEvent = `push_${PushSubscriptionNotifyType}`; @@ -170,8 +170,8 @@ export class WebPushApiManager extends EventListenerBase<{ this.settings.baseUrl = (location.href || '').replace(/#.*$/, ''); - const lang: ServiceWorkerPingTask['payload']['lang'] = {} as any; - const ACTIONS_LANG_MAP: Record = { + const lang: ServicePushPingTaskPayload['lang'] = {} as any; + const ACTIONS_LANG_MAP: Record = { push_action_mute1d: IS_MOBILE ? 'PushNotification.Action.Mute1d.Mobile' : 'PushNotification.Action.Mute1d', push_action_settings: IS_MOBILE ? 'PushNotification.Action.Settings.Mobile' : 'PushNotification.Action.Settings', push_message_nopreview: 'PushNotification.Message.NoPreview' @@ -181,16 +181,11 @@ export class WebPushApiManager extends EventListenerBase<{ lang[action as keyof typeof ACTIONS_LANG_MAP] = I18n.format(ACTIONS_LANG_MAP[action as keyof typeof ACTIONS_LANG_MAP], true); } - const task: ServiceWorkerPingTask = { - type: 'ping', - payload: { - localNotifications: this.localNotificationsAvailable, - lang: lang, - settings: this.settings - } - }; - - apiManagerProxy.postSWMessage(task); + apiManagerProxy.serviceMessagePort.invokeVoid('pushPing', { + localNotifications: this.localNotificationsAvailable, + lang: lang, + settings: this.settings + }); this.isAliveTO = setTimeout(this.isAliveNotify, 10000); } @@ -206,8 +201,7 @@ export class WebPushApiManager extends EventListenerBase<{ return; } - const task: ServiceWorkerNotificationsClearTask = {type: 'notifications_clear'}; - apiManagerProxy.postSWMessage(task); + apiManagerProxy.serviceMessagePort.invokeVoid('notificationsClear', undefined); } public setUpServiceWorkerChannel() { @@ -215,13 +209,13 @@ export class WebPushApiManager extends EventListenerBase<{ return; } - apiManagerProxy.addServiceWorkerTaskListener('push_click', (task: ServiceWorkerPushClickTask) => { + apiManagerProxy.serviceMessagePort.addEventListener('pushClick', (payload) => { if(singleInstance.deactivatedReason) { appRuntimeManager.reload(); return; } - this.dispatchEvent('push_notification_click', task.payload); + this.dispatchEvent('push_notification_click', payload); }); navigator.serviceWorker.ready.then(this.isAliveNotify); diff --git a/src/lib/serviceWorker/index.service.ts b/src/lib/serviceWorker/index.service.ts index 40c38c64..c7e17423 100644 --- a/src/lib/serviceWorker/index.service.ts +++ b/src/lib/serviceWorker/index.service.ts @@ -8,115 +8,85 @@ import '../mtproto/mtproto.worker'; /// #endif -import type { Modify, WorkerTaskTemplate, WorkerTaskVoidTemplate } from '../../types'; -import type { WebPushApiManager } from '../mtproto/webPushApiManager'; -import type { PushNotificationObject } from './push'; -import type { ToggleStorageTask } from '../mtproto/mtprotoworker'; -import type { MyUploadFile } from '../mtproto/apiFileManager'; import { logger, LogTypes } from '../logger'; -import { CancellablePromise } from '../../helpers/cancellablePromise'; import { CACHE_ASSETS_NAME, requestCache } from './cache'; import onStreamFetch from './stream'; import { closeAllNotifications, onPing } from './push'; import CacheStorageController from '../cacheStorage'; import { IS_SAFARI } from '../../environment/userAgent'; +import ServiceMessagePort from './serviceMessagePort'; +import listenMessagePort from '../../helpers/listenMessagePort'; +import { getWindowClients } from '../../helpers/context'; +import { MessageSendPort } from '../mtproto/superMessagePort'; export const log = logger('SW', LogTypes.Error | LogTypes.Debug | LogTypes.Log | LogTypes.Warn); const ctx = self as any as ServiceWorkerGlobalScope; -export const deferredPromises: Map}> = new Map(); - -export interface RequestFilePartTask extends Modify { - type: 'requestFilePart', - payload: { - docId: DocId, - dcId: number, - offset: number, - limit: number - } -}; -export interface RequestFilePartTaskResponse extends Modify { - type: 'requestFilePart', - payload?: MyUploadFile, - originalPayload?: RequestFilePartTask['payload'] +/// #if !MTPROTO_SW +let _mtprotoMessagePort: MessagePort; +export const getMtprotoMessagePort = () => _mtprotoMessagePort; + +const sendMessagePort = (source: MessageSendPort) => { + const channel = new MessageChannel(); + serviceMessagePort.attachPort(_mtprotoMessagePort = channel.port1); + serviceMessagePort.invokeVoid('port', undefined, source, [channel.port2]); }; -export interface ServiceWorkerPingTask extends WorkerTaskVoidTemplate { - type: 'ping', - payload: { - localNotifications: boolean, - lang: { - push_action_mute1d: string - push_action_settings: string - push_message_nopreview: string - }, - settings: WebPushApiManager['settings'] +const sendMessagePortIfNeeded = (source: MessageSendPort) => { + if(!connectedWindows && !_mtprotoMessagePort) { + sendMessagePort(source); } }; -export interface ServiceWorkerNotificationsClearTask extends WorkerTaskVoidTemplate { - type: 'notifications_clear' -}; +const onWindowConnected = (source: MessageSendPort) => { + sendMessagePortIfNeeded(source); -export interface ServiceWorkerPushClickTask extends WorkerTaskVoidTemplate { - type: 'push_click', - payload: PushNotificationObject + ++connectedWindows; + log('window connected'); }; -export type ServiceWorkerTask = RequestFilePartTaskResponse | ServiceWorkerPingTask | ServiceWorkerNotificationsClearTask | ToggleStorageTask; +export const serviceMessagePort = new ServiceMessagePort(); +serviceMessagePort.addMultipleEventsListeners({ + notificationsClear: closeAllNotifications, -/// #if !MTPROTO_SW -const taskListeners: { - [type in ServiceWorkerTask['type']]: (task: any, event: ExtendableMessageEvent) => void -} = { - notifications_clear: () => { - closeAllNotifications(); - }, - ping: (task: ServiceWorkerPingTask, event) => { - onPing(task, event); + toggleStorages: ({enabled, clearWrite}) => { + CacheStorageController.toggleStorage(enabled, clearWrite); }, - requestFilePart: (task: RequestFilePartTaskResponse, e: ExtendableMessageEvent) => { - const windowClient = e.source as WindowClient; - const promises = deferredPromises.get(windowClient.id); - if(!promises) { - return; - } - const promise = promises[task.id]; - if(promise) { - if(task.error) { - promise.reject(task.error); - } else { - promise.resolve(task.payload); - } - - delete promises[task.id]; - } + pushPing: (payload, source) => { + onPing(payload, source); }, - toggleStorages: (task: ToggleStorageTask) => { - const {enabled, clearWrite} = task.payload; - CacheStorageController.toggleStorage(enabled, clearWrite); - } -}; -ctx.addEventListener('message', (e) => { - const task = e.data as ServiceWorkerTask; - const callback = taskListeners[task.type]; - if(callback) { - callback(task, e); + + hello: (payload, source) => { + onWindowConnected(source); } }); -/// #endif -//const cacheStorage = new CacheStorageController('cachedAssets'); -/* let taskId = 0; +// * service worker can be killed, so won't get 'hello' event +getWindowClients().then((windowClients) => { + windowClients.forEach((windowClient) => { + onWindowConnected(windowClient); + }); +}); + +let connectedWindows = 0; +listenMessagePort(serviceMessagePort, undefined, (source) => { + if(source === _mtprotoMessagePort) { + return; + } -export function getTaskId() { - return taskId; -} + log('window disconnected'); + connectedWindows = Math.max(0, connectedWindows - 1); + if(!connectedWindows) { + log.warn('no windows left'); -export function incrementTaskId() { - return taskId++; -} */ + if(_mtprotoMessagePort) { + serviceMessagePort.detachPort(_mtprotoMessagePort); + _mtprotoMessagePort = undefined; + } + } +}); +/// #endif const onFetch = (event: FetchEvent): void => { /// #if !DEBUG diff --git a/src/lib/serviceWorker/push.ts b/src/lib/serviceWorker/push.ts index e98c890e..2edb9939 100644 --- a/src/lib/serviceWorker/push.ts +++ b/src/lib/serviceWorker/push.ts @@ -14,7 +14,8 @@ import DATABASE_STATE from "../../config/databases/state"; import { IS_FIREFOX } from "../../environment/userAgent"; import deepEqual from "../../helpers/object/deepEqual"; import IDBStorage from "../idb"; -import { log, ServiceWorkerPingTask, ServiceWorkerPushClickTask } from "./index.service"; +import { log, serviceMessagePort } from "./index.service"; +import { ServicePushPingTaskPayload } from "./serviceMessagePort"; const ctx = self as any as ServiceWorkerGlobalScope; const defaultBaseUrl = location.protocol + '//' + location.hostname + location.pathname.split('/').slice(0, -1).join('/') + '/'; @@ -96,8 +97,8 @@ class SomethingGetter, Storage extends Record - push_settings: Partial + push_lang: Partial + push_settings: Partial }; const getter = new SomethingGetter(DATABASE_STATE, 'session', { @@ -192,12 +193,12 @@ ctx.addEventListener('notificationclick', (event) => { type: 'window' }).then((clientList) => { data.action = action; - pendingNotification = {type: 'push_click', payload: data}; + pendingNotification = data; for(let i = 0; i < clientList.length; i++) { const client = clientList[i]; if('focus' in client) { client.focus(); - client.postMessage(pendingNotification); + serviceMessagePort.invokeVoid('pushClick', pendingNotification, client); pendingNotification = undefined; return; } @@ -218,7 +219,7 @@ ctx.addEventListener('notificationclick', (event) => { ctx.addEventListener('notificationclose', onCloseNotification); let notifications: Set = new Set(); -let pendingNotification: ServiceWorkerPushClickTask; +let pendingNotification: PushNotificationObject; function pushToNotifications(notification: Notification) { if(!notifications.has(notification)) { notifications.add(notification); @@ -311,7 +312,7 @@ function fireNotification(obj: PushNotificationObject, settings: PushStorage['pu return notificationPromise.then((event) => { // @ts-ignore - if(event && event.notification) { + if(event?.notification) { // @ts-ignore pushToNotifications(event.notification); } @@ -320,14 +321,9 @@ function fireNotification(obj: PushNotificationObject, settings: PushStorage['pu }); } -export function onPing(task: ServiceWorkerPingTask, event: ExtendableMessageEvent) { - const client = event.ports && event.ports[0] || event.source; - const payload = task.payload; - - if(pendingNotification && - client && - 'postMessage' in client) { - client.postMessage(pendingNotification, []); +export function onPing(payload: ServicePushPingTaskPayload, source?: MessageEventSource) { + if(pendingNotification && source) { + serviceMessagePort.invokeVoid('pushClick', pendingNotification, source); pendingNotification = undefined; } diff --git a/src/lib/serviceWorker/serviceMessagePort.ts b/src/lib/serviceWorker/serviceMessagePort.ts new file mode 100644 index 00000000..92f59720 --- /dev/null +++ b/src/lib/serviceWorker/serviceMessagePort.ts @@ -0,0 +1,52 @@ +/* + * https://github.com/morethanwords/tweb + * Copyright (C) 2019-2021 Eduard Kuzmenko + * https://github.com/morethanwords/tweb/blob/master/LICENSE + */ + +import type { WebPushApiManager } from "../mtproto/webPushApiManager"; +import type { PushNotificationObject } from "./push"; +import type { MyUploadFile } from "../mtproto/apiFileManager"; +import SuperMessagePort from "../mtproto/superMessagePort"; +import { MOUNT_CLASS_TO } from "../../config/debug"; + +export type ServicePushPingTaskPayload = { + localNotifications: boolean, + lang: { + push_action_mute1d: string + push_action_settings: string + push_message_nopreview: string + }, + settings: WebPushApiManager['settings'] +}; + +export type ServiceRequestFilePartTaskPayload = { + docId: DocId, + dcId: number, + offset: number, + limit: number +}; + +export type ServiceEvent = { + port: (payload: void, source: MessageEventSource, event: MessageEvent) => void +}; + +export default class ServiceMessagePort extends SuperMessagePort<{ + // from main thread to service worker + notificationsClear: () => void, + toggleStorages: (payload: {enabled: boolean, clearWrite: boolean}) => void, + pushPing: (payload: ServicePushPingTaskPayload, source: MessageEventSource, event: MessageEvent) => void, + hello: (payload: void, source: MessageEventSource, event: MessageEvent) => void +}, { + // to main thread + pushClick: (payload: PushNotificationObject) => void, + + // to mtproto worker + requestFilePart: (payload: ServiceRequestFilePartTaskPayload) => Promise | MyUploadFile +} & ServiceEvent, Master> { + constructor() { + super('SERVICE'); + + MOUNT_CLASS_TO && (MOUNT_CLASS_TO.serviceMessagePort = this); + } +} diff --git a/src/lib/serviceWorker/stream.ts b/src/lib/serviceWorker/stream.ts index 01d8b0d3..131cedf4 100644 --- a/src/lib/serviceWorker/stream.ts +++ b/src/lib/serviceWorker/stream.ts @@ -6,17 +6,19 @@ import readBlobAsUint8Array from "../../helpers/blob/readBlobAsUint8Array"; import deferredPromise, { CancellablePromise } from "../../helpers/cancellablePromise"; -import { getWindowClients } from "../../helpers/context"; import debounce from "../../helpers/schedulers/debounce"; -import { InputFileLocation, UploadFile } from "../../layer"; +import { InputFileLocation } from "../../layer"; import CacheStorageController from "../cacheStorage"; -import { DownloadOptions } from "../mtproto/apiFileManager"; -import { RequestFilePartTask, deferredPromises, log } from "./index.service"; +import { DownloadOptions, MyUploadFile } from "../mtproto/apiFileManager"; +import { getMtprotoMessagePort, log, serviceMessagePort } from "./index.service"; +import { ServiceRequestFilePartTaskPayload } from "./serviceMessagePort"; import timeout from "./timeout"; +const deferredPromises: Map}> = new Map(); const cacheStorage = new CacheStorageController('cachedStreamChunks'); const CHUNK_TTL = 86400; const CHUNK_CACHED_TIME_HEADER = 'Time-Cached'; +const USE_CACHE = false; const clearOldChunks = () => { return cacheStorage.timeoutOperation((cache) => { @@ -49,18 +51,17 @@ const clearOldChunks = () => { setInterval(clearOldChunks, 1800e3); setInterval(() => { - getWindowClients().then((clients) => { - for(const [clientId, promises] of deferredPromises) { - if(!clients.find((client) => client.id === clientId)) { - for(const taskId in promises) { - const promise = promises[taskId]; - promise.reject(); - } - - deferredPromises.delete(clientId); + const mtprotoMessagePort = getMtprotoMessagePort(); + for(const [messagePort, promises] of deferredPromises) { + if(messagePort !== mtprotoMessagePort) { + for(const taskId in promises) { + const promise = promises[taskId]; + promise.reject(); } + + deferredPromises.delete(messagePort); } - }); + } }, 120e3); type StreamRange = [number, number]; @@ -86,54 +87,56 @@ class Stream { }; private async requestFilePartFromWorker(alignedOffset: number, limit: number, fromPreload = false) { - const task: Omit = { - type: 'requestFilePart', - payload: { - docId: this.id, - dcId: this.info.dcId, - offset: alignedOffset, - limit - } + const payload: ServiceRequestFilePartTaskPayload = { + docId: this.id, + dcId: this.info.dcId, + offset: alignedOffset, + limit }; - const taskId = JSON.stringify(task); - (task as RequestFilePartTask).id = taskId; - - const windowClient = await getWindowClients().then((clients) => { - if(!clients.length) { - return; - } - - return clients.find((client) => deferredPromises.has(client.id)) || clients[0]; - }); - - if(!windowClient) { - throw new Error('no window'); - } + const taskId = JSON.stringify(payload); - let promises = deferredPromises.get(windowClient.id); + const mtprotoMessagePort = getMtprotoMessagePort(); + let promises = deferredPromises.get(mtprotoMessagePort); if(!promises) { - deferredPromises.set(windowClient.id, promises = {}); + deferredPromises.set(mtprotoMessagePort, promises = {}); } - let deferred = promises[taskId] as CancellablePromise; + let deferred = promises[taskId]; if(deferred) { return deferred.then((uploadFile) => uploadFile.bytes); } - windowClient.postMessage(task); this.loadedOffsets.add(alignedOffset); - - deferred = promises[taskId] = deferredPromise(); + + deferred = promises[taskId] = deferredPromise(); + + serviceMessagePort.invoke('requestFilePart', payload, undefined, mtprotoMessagePort) + .then(deferred.resolve, deferred.reject).finally(() => { + if(promises[taskId] === deferred) { + delete promises[taskId]; + + if(!Object.keys(promises).length) { + deferredPromises.delete(mtprotoMessagePort); + } + } + }); + const bytesPromise = deferred.then((uploadFile) => uploadFile.bytes); - this.saveChunkToCache(bytesPromise, alignedOffset, limit); - !fromPreload && this.preloadChunks(alignedOffset, alignedOffset + (this.limitPart * 15)); + if(USE_CACHE) { + this.saveChunkToCache(bytesPromise, alignedOffset, limit); + !fromPreload && this.preloadChunks(alignedOffset, alignedOffset + (this.limitPart * 15)); + } return bytesPromise; } private requestFilePartFromCache(alignedOffset: number, limit: number, fromPreload?: boolean) { + if(!USE_CACHE) { + return Promise.resolve(); + } + const key = this.getChunkKey(alignedOffset, limit); return cacheStorage.getFile(key).then((blob: Blob) => { return fromPreload ? new Uint8Array() : readBlobAsUint8Array(blob);