Browse Source

Refactor service worker's message port

Fix message port memory leak
master
Eduard Kuzmenko 2 years ago
parent
commit
446949daa6
  1. 2
      src/helpers/context.ts
  2. 17
      src/helpers/listenMessagePort.ts
  3. 4
      src/lib/crypto/cryptoMessagePort.ts
  4. 35
      src/lib/mtproto/mtproto.worker.ts
  5. 6
      src/lib/mtproto/mtprotoMessagePort.ts
  6. 114
      src/lib/mtproto/mtprotoworker.ts
  7. 113
      src/lib/mtproto/superMessagePort.ts
  8. 32
      src/lib/mtproto/webPushApiManager.ts
  9. 134
      src/lib/serviceWorker/index.service.ts
  10. 26
      src/lib/serviceWorker/push.ts
  11. 52
      src/lib/serviceWorker/serviceMessagePort.ts
  12. 91
      src/lib/serviceWorker/stream.ts

2
src/helpers/context.ts

@ -15,6 +15,8 @@ export const getWindowClients = () => { @@ -15,6 +15,8 @@ export const getWindowClients = () => {
.matchAll({includeUncontrolled: false, type: 'window'});
};
export const getLastWindowClient = () => getWindowClients().then((windowClients) => windowClients.slice(-1)[0]);
const postMessage = (listener: WindowClient | DedicatedWorkerGlobalScope, ...args: any[]) => {
try {
// @ts-ignore

17
src/helpers/listenMessagePort.ts

@ -4,24 +4,27 @@ @@ -4,24 +4,27 @@
* https://github.com/morethanwords/tweb/blob/master/LICENSE
*/
import type SuperMessagePort from "../lib/mtproto/superMessagePort";
import ctx from "../environment/ctx";
import SuperMessagePort from "../lib/mtproto/superMessagePort";
export default function listenMessagePort(
messagePort: SuperMessagePort<any, any, any>,
onConnect?: (source: MessageEventSource) => void,
onDisconnect?: (source: MessageEventSource) => void
) {
const attachPort = (s: any) => {
messagePort.attachPort(s);
onConnect && onConnect(s);
const attachPort = (listenPort: any, sendPort: any) => {
messagePort.attachListenPort(listenPort);
sendPort && messagePort.attachSendPort(sendPort);
onConnect?.(listenPort);
};
onDisconnect && messagePort.setOnPortDisconnect(onDisconnect);
messagePort.setOnPortDisconnect(onDisconnect);
if(typeof(SharedWorkerGlobalScope) !== 'undefined') {
(ctx as any as SharedWorkerGlobalScope).addEventListener('connect', (e) => attachPort(e.source));
(ctx as any as SharedWorkerGlobalScope).addEventListener('connect', (e) => attachPort(e.source, e.source));
} else if(typeof(ServiceWorkerGlobalScope) !== 'undefined') {
attachPort(ctx, null);
} else {
attachPort(ctx);
attachPort(ctx, ctx);
}
}

4
src/lib/crypto/cryptoMessagePort.ts

@ -16,6 +16,10 @@ type CryptoEvent = { @@ -16,6 +16,10 @@ type CryptoEvent = {
};
export class CryptoMessagePort<Master extends boolean = false> extends SuperMessagePort<CryptoEvent, CryptoEvent, Master> {
constructor() {
super('CRYPTO');
}
public invokeCrypto<T extends keyof CryptoMethods>(method: T, ...args: Parameters<CryptoMethods[T]>): Promise<Awaited<ReturnType<CryptoMethods[T]>>> {
const payload = {method, args};
const listeners = this.listeners['invoke'];

35
src/lib/mtproto/mtproto.worker.ts

@ -20,12 +20,19 @@ import { logger } from '../logger'; @@ -20,12 +20,19 @@ import { logger } from '../logger';
import { State } from '../../config/state';
import toggleStorages from '../../helpers/toggleStorages';
import appTabsManager from '../appManagers/appTabsManager';
import ServiceMessagePort from '../serviceWorker/serviceMessagePort';
import callbackify from '../../helpers/callbackify';
let _isServiceWorkerOnline = true;
export function isServiceWorkerOnline() {
return _isServiceWorkerOnline;
}
let serviceMessagePort: ServiceMessagePort<true>, _serviceMessagePort: MessagePort;
export function getServiceMessagePort() {
return _isServiceWorkerOnline ? serviceMessagePort : undefined;
}
const log = logger('MTPROTO');
// let haveState = false;
@ -73,6 +80,28 @@ port.addMultipleEventsListeners({ @@ -73,6 +80,28 @@ port.addMultipleEventsListeners({
_isServiceWorkerOnline = online;
},
serviceWorkerPort: (payload, source, event) => {
if(serviceMessagePort) {
serviceMessagePort.detachPort(_serviceMessagePort);
_serviceMessagePort = undefined;
} else {
serviceMessagePort = new ServiceMessagePort();
serviceMessagePort.addMultipleEventsListeners({
requestFilePart: (payload) => {
return callbackify(appManagersManager.getManagers(), (managers) => {
const {docId, dcId, offset, limit} = payload;
return managers.appDocsManager.requestDocPart(docId, dcId, offset, limit);
});
}
});
}
// * port can be undefined in the future
if(_serviceMessagePort = event.ports[0]) {
serviceMessagePort.attachPort(_serviceMessagePort);
}
},
createObjectURL: (blob) => {
return URL.createObjectURL(blob);
},
@ -99,8 +128,14 @@ appManagersManager.start(); @@ -99,8 +128,14 @@ appManagersManager.start();
appManagersManager.getManagers();
appTabsManager.start();
// let sentHello = false;
listenMessagePort(port, (source) => {
appTabsManager.addTab(source);
// if(!sentHello) {
// port.invokeVoid('hello', undefined, source);
// sentHello = true;
// }
}, (source) => {
appTabsManager.deleteTab(source);
});

6
src/lib/mtproto/mtprotoMessagePort.ts

@ -27,6 +27,7 @@ export default class MTProtoMessagePort<Master extends boolean = true> extends S @@ -27,6 +27,7 @@ export default class MTProtoMessagePort<Master extends boolean = true> extends S
manager: (payload: MTProtoManagerTaskPayload) => any,
toggleStorages: (payload: {enabled: boolean, clearWrite: boolean}) => ReturnType<typeof toggleStorages>,
serviceWorkerOnline: (online: boolean) => void,
serviceWorkerPort: (payload: void, source: MessageEventSource, event: MessageEvent) => void,
cryptoPort: (payload: void, source: MessageEventSource, event: MessageEvent) => void,
createObjectURL: (blob: Blob) => string,
tabState: (payload: TabState, source: MessageEventSource) => void,
@ -35,12 +36,13 @@ export default class MTProtoMessagePort<Master extends boolean = true> extends S @@ -35,12 +36,13 @@ export default class MTProtoMessagePort<Master extends boolean = true> extends S
convertOpus: (payload: {fileName: string, bytes: Uint8Array}) => Promise<Uint8Array>,
localStorageProxy: (payload: LocalStorageProxyTask['payload']) => Promise<any>,
mirror: (payload: MirrorTaskPayload) => void,
notificationBuild: (payload: NotificationBuildTaskPayload) => void
notificationBuild: (payload: NotificationBuildTaskPayload) => void,
// hello: () => void
} & MTProtoBroadcastEvent, Master> {
private static INSTANCE: MTProtoMessagePort;
constructor() {
super();
super('MTPROTO');
MTProtoMessagePort.INSTANCE = this;

114
src/lib/mtproto/mtprotoworker.ts

@ -4,8 +4,7 @@ @@ -4,8 +4,7 @@
* https://github.com/morethanwords/tweb/blob/master/LICENSE
*/
import type { RequestFilePartTask, RequestFilePartTaskResponse, ServiceWorkerTask } from '../serviceWorker/index.service';
import type { Awaited, WorkerTaskVoidTemplate } from '../../types';
import type { Awaited } from '../../types';
import type { CacheStorageDbName } from '../cacheStorage';
import type { State } from '../../config/state';
import type { Message, MessagePeerReaction, PeerNotifySettings } from '../../layer';
@ -18,7 +17,6 @@ import webPushApiManager from './webPushApiManager'; @@ -18,7 +17,6 @@ import webPushApiManager from './webPushApiManager';
import appRuntimeManager from '../appManagers/appRuntimeManager';
import telegramMeWebManager from './telegramMeWebManager';
import pause from '../../helpers/schedulers/pause';
import isObject from '../../helpers/object/isObject';
import ENVIRONMENT from '../../environment';
import loadState from '../appManagers/utils/state/loadState';
import opusDecodeController from '../opusDecodeController';
@ -28,11 +26,7 @@ import SuperMessagePort from './superMessagePort'; @@ -28,11 +26,7 @@ import SuperMessagePort from './superMessagePort';
import IS_SHARED_WORKER_SUPPORTED from '../../environment/sharedWorkerSupport';
import toggleStorages from '../../helpers/toggleStorages';
import idleController from '../../helpers/idleController';
export interface ToggleStorageTask extends WorkerTaskVoidTemplate {
type: 'toggleStorages',
payload: {enabled: boolean, clearWrite: boolean}
};
import ServiceMessagePort from '../serviceWorker/serviceMessagePort';
export type Mirrors = {
state: State
@ -57,10 +51,8 @@ export type TabState = { @@ -57,10 +51,8 @@ export type TabState = {
};
class ApiManagerProxy extends MTProtoMessagePort {
private worker: /* Window */Worker;
private isSWRegistered: boolean;
// private worker: /* Window */Worker;
// private sockets: Map<number, Socket> = new Map();
private taskListenersSW: {[taskType: string]: (task: any) => void};
private mirrors: Mirrors;
public newVersion: string;
@ -68,11 +60,12 @@ class ApiManagerProxy extends MTProtoMessagePort { @@ -68,11 +60,12 @@ class ApiManagerProxy extends MTProtoMessagePort {
private tabState: TabState;
public serviceMessagePort: ServiceMessagePort<true>;
private lastServiceWorker: ServiceWorker;
constructor() {
super();
this.isSWRegistered = true;
this.taskListenersSW = {};
this.mirrors = {} as any;
this.tabState = {
chatPeerIds: [],
@ -200,78 +193,80 @@ class ApiManagerProxy extends MTProtoMessagePort { @@ -200,78 +193,80 @@ class ApiManagerProxy extends MTProtoMessagePort {
// this.sendState();
}
private registerServiceWorker() {
if(!('serviceWorker' in navigator)) return;
// ! I hate webpack - it won't load it by using worker.register, only navigator.serviceWork will do it.
const worker = navigator.serviceWorker;
private attachServiceWorker(serviceWorker: ServiceWorker) {
this.lastServiceWorker && this.serviceMessagePort.detachPort(this.lastServiceWorker);
this.serviceMessagePort.attachSendPort(this.lastServiceWorker = serviceWorker);
this.serviceMessagePort.invokeVoid('hello', undefined);
}
private _registerServiceWorker() {
navigator.serviceWorker.register(
/* webpackChunkName: "sw" */
new URL('../serviceWorker/index.service', import.meta.url),
{scope: './'}
).then((registration) => {
this.log('SW registered', registration);
this.isSWRegistered = true;
// ! doubtful fix for hard refresh
if(registration.active && !navigator.serviceWorker.controller) {
return registration.unregister().then(() => {
window.location.reload();
});
}
const sw = registration.installing || registration.waiting || registration.active;
sw.addEventListener('statechange', (e) => {
this.log('SW statechange', e);
});
//this.postSWMessage = worker.controller.postMessage.bind(worker.controller);
const controller = navigator.serviceWorker.controller || registration.installing || registration.waiting || registration.active;
this.attachServiceWorker(controller);
/// #if MTPROTO_SW
const controller = worker.controller || registration.installing || registration.waiting || registration.active;
this.onWorkerFirstMessage(controller);
/// #endif
}, (err) => {
this.isSWRegistered = false;
this.log.error('SW registration failed!', err);
this.invokeVoid('serviceWorkerOnline', false);
});
}
private registerServiceWorker() {
if(!('serviceWorker' in navigator)) return;
this.serviceMessagePort = new ServiceMessagePort<true>();
// this.addMultipleEventsListeners({
// hello: () => {
// // this.serviceMessagePort.invokeVoid('port', undefined);
// }
// });
// ! I hate webpack - it won't load it by using worker.register, only navigator.serviceWorker will do it.
const worker = navigator.serviceWorker;
this._registerServiceWorker();
worker.addEventListener('controllerchange', () => {
this.log.warn('controllerchange');
worker.controller.addEventListener('error', (e) => {
const controller = worker.controller;
this.attachServiceWorker(controller);
controller.addEventListener('error', (e) => {
this.log.error('controller error:', e);
});
});
/// #if MTPROTO_SW
this.attachListenPort(worker);
// this.s();
/// #else
worker.addEventListener('message', (e) => {
const task: ServiceWorkerTask = e.data;
if(!isObject(task)) {
return;
}
const callback = this.taskListenersSW[task.type];
if(callback) {
callback(task);
this.serviceMessagePort.attachListenPort(worker);
this.serviceMessagePort.addMultipleEventsListeners({
port: (payload, source, event) => {
this.invokeVoid('serviceWorkerPort', undefined, undefined, [event.ports[0]]);
}
});
this.addServiceWorkerTaskListener('requestFilePart', (task: RequestFilePartTask) => {
const responseTask: RequestFilePartTaskResponse = {
type: task.type,
id: task.id
};
const {docId, dcId, offset, limit} = task.payload;
rootScope.managers.appDocsManager.requestDocPart(docId, dcId, offset, limit)
.then((uploadFile) => {
responseTask.payload = uploadFile;
this.postSWMessage(responseTask);
}, (err) => {
responseTask.originalPayload = task.payload;
responseTask.error = err;
this.postSWMessage(responseTask);
});
});
/// #endif
worker.addEventListener('messageerror', (e) => {
@ -334,16 +329,10 @@ class ApiManagerProxy extends MTProtoMessagePort { @@ -334,16 +329,10 @@ class ApiManagerProxy extends MTProtoMessagePort {
});
}
public postSWMessage(message: any) {
if(navigator.serviceWorker.controller) {
navigator.serviceWorker.controller.postMessage(message);
}
}
private onWorkerFirstMessage(worker: any) {
this.log('set webWorker');
this.worker = worker;
// this.worker = worker;
/// #if MTPROTO_SW
this.attachSendPort(worker);
/// #else
@ -351,10 +340,6 @@ class ApiManagerProxy extends MTProtoMessagePort { @@ -351,10 +340,6 @@ class ApiManagerProxy extends MTProtoMessagePort {
/// #endif
}
public addServiceWorkerTaskListener(name: keyof ApiManagerProxy['taskListenersSW'], callback: ApiManagerProxy['taskListenersSW'][typeof name]) {
this.taskListenersSW[name] = callback;
}
private loadState() {
return Promise.all([
loadState().then((stateResult) => {
@ -384,8 +369,7 @@ class ApiManagerProxy extends MTProtoMessagePort { @@ -384,8 +369,7 @@ class ApiManagerProxy extends MTProtoMessagePort {
public async toggleStorages(enabled: boolean, clearWrite: boolean) {
await toggleStorages(enabled, clearWrite);
this.invoke('toggleStorages', {enabled, clearWrite});
const task: ToggleStorageTask = {type: 'toggleStorages', payload: {enabled, clearWrite}};
this.postSWMessage(task);
this.serviceMessagePort.invokeVoid('toggleStorages', {enabled, clearWrite});
}
public async getMirror<T extends keyof Mirrors>(name: T) {

113
src/lib/mtproto/superMessagePort.ts

@ -7,7 +7,7 @@ @@ -7,7 +7,7 @@
import DEBUG from "../../config/debug";
import ctx from "../../environment/ctx";
import indexOfAndSplice from "../../helpers/array/indexOfAndSplice";
import { IS_SERVICE_WORKER, IS_WORKER, notifyAll } from "../../helpers/context";
import { IS_WORKER } from "../../helpers/context";
import EventListenerBase from "../../helpers/eventListenerBase";
import { Awaited, WorkerTaskTemplate, WorkerTaskVoidTemplate } from "../../types";
import { logger } from "../logger";
@ -57,7 +57,11 @@ interface CloseTask extends SuperMessagePortTask { @@ -57,7 +57,11 @@ interface CloseTask extends SuperMessagePortTask {
type: 'close'
}
type Task = InvokeTask | ResultTask | AckTask | PingTask | PongTask | BatchTask | CloseTask;
// interface OpenTask extends SuperMessagePortTask {
// type: 'open'
// }
type Task = InvokeTask | ResultTask | AckTask | PingTask | PongTask | BatchTask | CloseTask/* | OpenTask */;
type TaskMap = {
[type in Task as type['type']]?: (task: Extract<Task, type>, source: MessageEventSource, event: MessageEvent<any>) => void | Promise<any>
};
@ -75,7 +79,10 @@ export type AckedResult<T> = { @@ -75,7 +79,10 @@ export type AckedResult<T> = {
// };
type ListenPort = WindowProxy | MessagePort | ServiceWorker | Worker | ServiceWorkerContainer;
type SendPort = WindowProxy | MessagePort | ServiceWorker | Worker;
type SendPort = Pick<MessageEventSource, 'postMessage'>/* WindowProxy | MessagePort | ServiceWorker | Worker */;
export type MessageListenPort = ListenPort;
export type MessageSendPort = SendPort;
type ListenerCallback = (payload: any, source: MessageEventSource, event: MessageEvent<any>) => any;
type Listeners = Record<string, ListenerCallback>;
@ -99,7 +106,8 @@ export default class SuperMessagePort< @@ -99,7 +106,8 @@ export default class SuperMessagePort<
[id: number]: {
resolve: any,
reject: any,
taskType: string
taskType: string,
port?: SendPort
}
};
protected pending: Map<SendPort, Task[]>;
@ -111,30 +119,18 @@ export default class SuperMessagePort< @@ -111,30 +119,18 @@ export default class SuperMessagePort<
protected processTaskMap: TaskMap;
protected onPortDisconnect: (source: MessageEventSource) => void;
// protected onPortConnect: (source: MessageEventSource) => void;
constructor() {
constructor(logSuffix?: string) {
super(false);
this.processTaskMap = {
result: this.processResultTask,
ack: this.processAckTask,
invoke: this.processInvokeTask,
ping: this.processPingTask,
pong: this.processPongTask,
close: this.processCloseTask
};
}
public _constructor() {
super._constructor(false);
this.listenPorts = [];
this.sendPorts = [];
this.pingResolves = new Map();
this.taskId = 0;
this.awaiting = {};
this.pending = new Map();
this.log = logger('MP');
this.log = logger('MP' + (logSuffix ? '-' + logSuffix : ''));
this.debug = DEBUG;
if(typeof(window) !== 'undefined') {
@ -143,12 +139,26 @@ export default class SuperMessagePort< @@ -143,12 +139,26 @@ export default class SuperMessagePort<
this.postMessage(undefined, task);
});
}
this.processTaskMap = {
result: this.processResultTask,
ack: this.processAckTask,
invoke: this.processInvokeTask,
ping: this.processPingTask,
pong: this.processPongTask,
close: this.processCloseTask,
// open: this.processOpenTask
};
}
public setOnPortDisconnect(callback: (source: MessageEventSource) => void) {
this.onPortDisconnect = callback;
}
// public setOnPortConnect(callback: (source: MessageEventSource) => void) {
// this.onPortConnect = callback;
// }
public attachPort(port: MessageEventSource) {
this.attachListenPort(port);
this.attachSendPort(port);
@ -160,14 +170,17 @@ export default class SuperMessagePort< @@ -160,14 +170,17 @@ export default class SuperMessagePort<
}
public attachSendPort(port: SendPort) {
this.log.warn('attaching port');
this.log.warn('attaching send port');
if((port as MessagePort).start) {
(port as MessagePort).start();
}
(port as MessagePort).start?.();
this.sendPorts.push(port);
// this.sendPing(port);
// const task = this.createTask('open', undefined);
// this.postMessage(port, task);
this.releasePending();
}
// ! Can't rely on ping because timers can be suspended
@ -207,17 +220,25 @@ export default class SuperMessagePort< @@ -207,17 +220,25 @@ export default class SuperMessagePort<
// }, timeout);
// }
protected detachPort(port: SendPort) {
public detachPort(port: ListenPort) {
this.log.warn('disconnecting port');
port.removeEventListener('message', this.onMessage as any);
indexOfAndSplice(this.listenPorts, port);
indexOfAndSplice(this.sendPorts, port);
if((port as MessagePort).close) {
(port as MessagePort).close();
}
indexOfAndSplice(this.sendPorts, port as any);
port.removeEventListener?.('message', this.onMessage as any);
(port as MessagePort).close?.();
this.onPortDisconnect && this.onPortDisconnect(port as any);
this.onPortDisconnect?.(port as any);
const error = new Error('PORT_DISCONNECTED');
for(const id in this.awaiting) {
const task = this.awaiting[id];
if(task.port === port) {
task.reject(error);
delete this.awaiting[id];
}
}
}
protected postMessage(port: SendPort | SendPort[], task: Task) {
@ -248,7 +269,7 @@ export default class SuperMessagePort< @@ -248,7 +269,7 @@ export default class SuperMessagePort<
protected /* async */ releasePending() {
//return;
if(!this.listenPorts.length || this.releasingPending) {
if(/* !this.listenPorts.length || !this.sendPorts.length || */this.releasingPending) {
return;
}
@ -276,6 +297,10 @@ export default class SuperMessagePort< @@ -276,6 +297,10 @@ export default class SuperMessagePort<
// });
const tasks = portTasks;
const ports = port ? [port] : this.sendPorts;
if(!ports.length) {
return;
}
tasks.forEach((task) => {
// if(task.type === 'batch') {
@ -283,19 +308,20 @@ export default class SuperMessagePort< @@ -283,19 +308,20 @@ export default class SuperMessagePort<
// }
try {
if(IS_SERVICE_WORKER) {
notifyAll(task);
} else {
this.postMessage(port, task);
}
// if(IS_SERVICE_WORKER && !port) {
// notifyAll(task);
// } else {
this.postMessage(ports, task);
// }
} catch(err) {
this.log.error('postMessage error:', err, task, port);
this.log.error('postMessage error:', err, task, ports);
}
});
this.pending.delete(port);
});
this.debug && this.log.debug('released tasks');
this.pending.clear();
this.releasingPending = false;
}
@ -353,6 +379,10 @@ export default class SuperMessagePort< @@ -353,6 +379,10 @@ export default class SuperMessagePort<
};
previousResolve(ret);
if(payload.cached) {
delete this.awaiting[payload.taskId];
}
};
protected processPingTask = (task: PingTask, source: MessageEventSource, event: MessageEvent) => {
@ -371,6 +401,11 @@ export default class SuperMessagePort< @@ -371,6 +401,11 @@ export default class SuperMessagePort<
this.detachPort(source);
};
// * it's just an 'open' callback, DO NOT attach port from here
// protected processOpenTask = (task: OpenTask, source: MessageEventSource, event: MessageEvent) => {
// this.onPortConnect?.(source);
// };
protected processInvokeTask = async(task: InvokeTask, source: MessageEventSource, event: MessageEvent) => {
const id = task.id;
const innerTask = task.payload;
@ -481,7 +516,7 @@ export default class SuperMessagePort< @@ -481,7 +516,7 @@ export default class SuperMessagePort<
let task: InvokeTask;
const promise = new Promise<Awaited<ReturnType<Send[T]>>>((resolve, reject) => {
task = this.createInvokeTask(type as string, payload, withAck, undefined, transfer);
this.awaiting[task.id] = {resolve, reject, taskType: type as string};
this.awaiting[task.id] = {resolve, reject, taskType: type as string, port};
this.pushTask(task, port);
});

32
src/lib/mtproto/webPushApiManager.ts

@ -9,7 +9,9 @@ @@ -9,7 +9,9 @@
* https://github.com/zhukov/webogram/blob/master/LICENSE
*/
import type { ServiceWorkerNotificationsClearTask, ServiceWorkerPingTask, ServiceWorkerPushClickTask } from "../serviceWorker/index.service";
import type { PushNotificationObject } from "../serviceWorker/push";
import type { ServicePushPingTaskPayload } from "../serviceWorker/serviceMessagePort";
import type { NotificationSettings } from "../appManagers/uiNotificationsManager";
import { MOUNT_CLASS_TO } from "../../config/debug";
import { logger } from "../logger";
import apiManagerProxy from "./mtprotoworker";
@ -17,10 +19,8 @@ import I18n, { LangPackKey } from "../langPack"; @@ -17,10 +19,8 @@ import I18n, { LangPackKey } from "../langPack";
import { IS_MOBILE } from "../../environment/userAgent";
import appRuntimeManager from "../appManagers/appRuntimeManager";
import copy from "../../helpers/object/copy";
import type { NotificationSettings } from "../appManagers/uiNotificationsManager";
import singleInstance from "./singleInstance";
import EventListenerBase from "../../helpers/eventListenerBase";
import type { PushNotificationObject } from "../serviceWorker/push";
export type PushSubscriptionNotifyType = 'init' | 'subscribe' | 'unsubscribe';
export type PushSubscriptionNotifyEvent = `push_${PushSubscriptionNotifyType}`;
@ -170,8 +170,8 @@ export class WebPushApiManager extends EventListenerBase<{ @@ -170,8 +170,8 @@ export class WebPushApiManager extends EventListenerBase<{
this.settings.baseUrl = (location.href || '').replace(/#.*$/, '');
const lang: ServiceWorkerPingTask['payload']['lang'] = {} as any;
const ACTIONS_LANG_MAP: Record<keyof ServiceWorkerPingTask['payload']['lang'], LangPackKey> = {
const lang: ServicePushPingTaskPayload['lang'] = {} as any;
const ACTIONS_LANG_MAP: Record<keyof ServicePushPingTaskPayload['lang'], LangPackKey> = {
push_action_mute1d: IS_MOBILE ? 'PushNotification.Action.Mute1d.Mobile' : 'PushNotification.Action.Mute1d',
push_action_settings: IS_MOBILE ? 'PushNotification.Action.Settings.Mobile' : 'PushNotification.Action.Settings',
push_message_nopreview: 'PushNotification.Message.NoPreview'
@ -181,16 +181,11 @@ export class WebPushApiManager extends EventListenerBase<{ @@ -181,16 +181,11 @@ export class WebPushApiManager extends EventListenerBase<{
lang[action as keyof typeof ACTIONS_LANG_MAP] = I18n.format(ACTIONS_LANG_MAP[action as keyof typeof ACTIONS_LANG_MAP], true);
}
const task: ServiceWorkerPingTask = {
type: 'ping',
payload: {
localNotifications: this.localNotificationsAvailable,
lang: lang,
settings: this.settings
}
};
apiManagerProxy.postSWMessage(task);
apiManagerProxy.serviceMessagePort.invokeVoid('pushPing', {
localNotifications: this.localNotificationsAvailable,
lang: lang,
settings: this.settings
});
this.isAliveTO = setTimeout(this.isAliveNotify, 10000);
}
@ -206,8 +201,7 @@ export class WebPushApiManager extends EventListenerBase<{ @@ -206,8 +201,7 @@ export class WebPushApiManager extends EventListenerBase<{
return;
}
const task: ServiceWorkerNotificationsClearTask = {type: 'notifications_clear'};
apiManagerProxy.postSWMessage(task);
apiManagerProxy.serviceMessagePort.invokeVoid('notificationsClear', undefined);
}
public setUpServiceWorkerChannel() {
@ -215,13 +209,13 @@ export class WebPushApiManager extends EventListenerBase<{ @@ -215,13 +209,13 @@ export class WebPushApiManager extends EventListenerBase<{
return;
}
apiManagerProxy.addServiceWorkerTaskListener('push_click', (task: ServiceWorkerPushClickTask) => {
apiManagerProxy.serviceMessagePort.addEventListener('pushClick', (payload) => {
if(singleInstance.deactivatedReason) {
appRuntimeManager.reload();
return;
}
this.dispatchEvent('push_notification_click', task.payload);
this.dispatchEvent('push_notification_click', payload);
});
navigator.serviceWorker.ready.then(this.isAliveNotify);

134
src/lib/serviceWorker/index.service.ts

@ -8,115 +8,85 @@ @@ -8,115 +8,85 @@
import '../mtproto/mtproto.worker';
/// #endif
import type { Modify, WorkerTaskTemplate, WorkerTaskVoidTemplate } from '../../types';
import type { WebPushApiManager } from '../mtproto/webPushApiManager';
import type { PushNotificationObject } from './push';
import type { ToggleStorageTask } from '../mtproto/mtprotoworker';
import type { MyUploadFile } from '../mtproto/apiFileManager';
import { logger, LogTypes } from '../logger';
import { CancellablePromise } from '../../helpers/cancellablePromise';
import { CACHE_ASSETS_NAME, requestCache } from './cache';
import onStreamFetch from './stream';
import { closeAllNotifications, onPing } from './push';
import CacheStorageController from '../cacheStorage';
import { IS_SAFARI } from '../../environment/userAgent';
import ServiceMessagePort from './serviceMessagePort';
import listenMessagePort from '../../helpers/listenMessagePort';
import { getWindowClients } from '../../helpers/context';
import { MessageSendPort } from '../mtproto/superMessagePort';
export const log = logger('SW', LogTypes.Error | LogTypes.Debug | LogTypes.Log | LogTypes.Warn);
const ctx = self as any as ServiceWorkerGlobalScope;
export const deferredPromises: Map<WindowClient['id'], {[taskId: string]: CancellablePromise<any>}> = new Map();
export interface RequestFilePartTask extends Modify<WorkerTaskTemplate, {id: string}> {
type: 'requestFilePart',
payload: {
docId: DocId,
dcId: number,
offset: number,
limit: number
}
};
export interface RequestFilePartTaskResponse extends Modify<WorkerTaskTemplate, {id: string}> {
type: 'requestFilePart',
payload?: MyUploadFile,
originalPayload?: RequestFilePartTask['payload']
/// #if !MTPROTO_SW
let _mtprotoMessagePort: MessagePort;
export const getMtprotoMessagePort = () => _mtprotoMessagePort;
const sendMessagePort = (source: MessageSendPort) => {
const channel = new MessageChannel();
serviceMessagePort.attachPort(_mtprotoMessagePort = channel.port1);
serviceMessagePort.invokeVoid('port', undefined, source, [channel.port2]);
};
export interface ServiceWorkerPingTask extends WorkerTaskVoidTemplate {
type: 'ping',
payload: {
localNotifications: boolean,
lang: {
push_action_mute1d: string
push_action_settings: string
push_message_nopreview: string
},
settings: WebPushApiManager['settings']
const sendMessagePortIfNeeded = (source: MessageSendPort) => {
if(!connectedWindows && !_mtprotoMessagePort) {
sendMessagePort(source);
}
};
export interface ServiceWorkerNotificationsClearTask extends WorkerTaskVoidTemplate {
type: 'notifications_clear'
};
const onWindowConnected = (source: MessageSendPort) => {
sendMessagePortIfNeeded(source);
export interface ServiceWorkerPushClickTask extends WorkerTaskVoidTemplate {
type: 'push_click',
payload: PushNotificationObject
++connectedWindows;
log('window connected');
};
export type ServiceWorkerTask = RequestFilePartTaskResponse | ServiceWorkerPingTask | ServiceWorkerNotificationsClearTask | ToggleStorageTask;
export const serviceMessagePort = new ServiceMessagePort<false>();
serviceMessagePort.addMultipleEventsListeners({
notificationsClear: closeAllNotifications,
/// #if !MTPROTO_SW
const taskListeners: {
[type in ServiceWorkerTask['type']]: (task: any, event: ExtendableMessageEvent) => void
} = {
notifications_clear: () => {
closeAllNotifications();
},
ping: (task: ServiceWorkerPingTask, event) => {
onPing(task, event);
toggleStorages: ({enabled, clearWrite}) => {
CacheStorageController.toggleStorage(enabled, clearWrite);
},
requestFilePart: (task: RequestFilePartTaskResponse, e: ExtendableMessageEvent) => {
const windowClient = e.source as WindowClient;
const promises = deferredPromises.get(windowClient.id);
if(!promises) {
return;
}
const promise = promises[task.id];
if(promise) {
if(task.error) {
promise.reject(task.error);
} else {
promise.resolve(task.payload);
}
delete promises[task.id];
}
pushPing: (payload, source) => {
onPing(payload, source);
},
toggleStorages: (task: ToggleStorageTask) => {
const {enabled, clearWrite} = task.payload;
CacheStorageController.toggleStorage(enabled, clearWrite);
}
};
ctx.addEventListener('message', (e) => {
const task = e.data as ServiceWorkerTask;
const callback = taskListeners[task.type];
if(callback) {
callback(task, e);
hello: (payload, source) => {
onWindowConnected(source);
}
});
/// #endif
//const cacheStorage = new CacheStorageController('cachedAssets');
/* let taskId = 0;
// * service worker can be killed, so won't get 'hello' event
getWindowClients().then((windowClients) => {
windowClients.forEach((windowClient) => {
onWindowConnected(windowClient);
});
});
let connectedWindows = 0;
listenMessagePort(serviceMessagePort, undefined, (source) => {
if(source === _mtprotoMessagePort) {
return;
}
export function getTaskId() {
return taskId;
}
log('window disconnected');
connectedWindows = Math.max(0, connectedWindows - 1);
if(!connectedWindows) {
log.warn('no windows left');
export function incrementTaskId() {
return taskId++;
} */
if(_mtprotoMessagePort) {
serviceMessagePort.detachPort(_mtprotoMessagePort);
_mtprotoMessagePort = undefined;
}
}
});
/// #endif
const onFetch = (event: FetchEvent): void => {
/// #if !DEBUG

26
src/lib/serviceWorker/push.ts

@ -14,7 +14,8 @@ import DATABASE_STATE from "../../config/databases/state"; @@ -14,7 +14,8 @@ import DATABASE_STATE from "../../config/databases/state";
import { IS_FIREFOX } from "../../environment/userAgent";
import deepEqual from "../../helpers/object/deepEqual";
import IDBStorage from "../idb";
import { log, ServiceWorkerPingTask, ServiceWorkerPushClickTask } from "./index.service";
import { log, serviceMessagePort } from "./index.service";
import { ServicePushPingTaskPayload } from "./serviceMessagePort";
const ctx = self as any as ServiceWorkerGlobalScope;
const defaultBaseUrl = location.protocol + '//' + location.hostname + location.pathname.split('/').slice(0, -1).join('/') + '/';
@ -96,8 +97,8 @@ class SomethingGetter<T extends Database<any>, Storage extends Record<string, an @@ -96,8 +97,8 @@ class SomethingGetter<T extends Database<any>, Storage extends Record<string, an
type PushStorage = {
push_mute_until: number,
push_lang: Partial<ServiceWorkerPingTask['payload']['lang']>
push_settings: Partial<ServiceWorkerPingTask['payload']['settings']>
push_lang: Partial<ServicePushPingTaskPayload['lang']>
push_settings: Partial<ServicePushPingTaskPayload['settings']>
};
const getter = new SomethingGetter<typeof DATABASE_STATE, PushStorage>(DATABASE_STATE, 'session', {
@ -192,12 +193,12 @@ ctx.addEventListener('notificationclick', (event) => { @@ -192,12 +193,12 @@ ctx.addEventListener('notificationclick', (event) => {
type: 'window'
}).then((clientList) => {
data.action = action;
pendingNotification = {type: 'push_click', payload: data};
pendingNotification = data;
for(let i = 0; i < clientList.length; i++) {
const client = clientList[i];
if('focus' in client) {
client.focus();
client.postMessage(pendingNotification);
serviceMessagePort.invokeVoid('pushClick', pendingNotification, client);
pendingNotification = undefined;
return;
}
@ -218,7 +219,7 @@ ctx.addEventListener('notificationclick', (event) => { @@ -218,7 +219,7 @@ ctx.addEventListener('notificationclick', (event) => {
ctx.addEventListener('notificationclose', onCloseNotification);
let notifications: Set<Notification> = new Set();
let pendingNotification: ServiceWorkerPushClickTask;
let pendingNotification: PushNotificationObject;
function pushToNotifications(notification: Notification) {
if(!notifications.has(notification)) {
notifications.add(notification);
@ -311,7 +312,7 @@ function fireNotification(obj: PushNotificationObject, settings: PushStorage['pu @@ -311,7 +312,7 @@ function fireNotification(obj: PushNotificationObject, settings: PushStorage['pu
return notificationPromise.then((event) => {
// @ts-ignore
if(event && event.notification) {
if(event?.notification) {
// @ts-ignore
pushToNotifications(event.notification);
}
@ -320,14 +321,9 @@ function fireNotification(obj: PushNotificationObject, settings: PushStorage['pu @@ -320,14 +321,9 @@ function fireNotification(obj: PushNotificationObject, settings: PushStorage['pu
});
}
export function onPing(task: ServiceWorkerPingTask, event: ExtendableMessageEvent) {
const client = event.ports && event.ports[0] || event.source;
const payload = task.payload;
if(pendingNotification &&
client &&
'postMessage' in client) {
client.postMessage(pendingNotification, []);
export function onPing(payload: ServicePushPingTaskPayload, source?: MessageEventSource) {
if(pendingNotification && source) {
serviceMessagePort.invokeVoid('pushClick', pendingNotification, source);
pendingNotification = undefined;
}

52
src/lib/serviceWorker/serviceMessagePort.ts

@ -0,0 +1,52 @@ @@ -0,0 +1,52 @@
/*
* https://github.com/morethanwords/tweb
* Copyright (C) 2019-2021 Eduard Kuzmenko
* https://github.com/morethanwords/tweb/blob/master/LICENSE
*/
import type { WebPushApiManager } from "../mtproto/webPushApiManager";
import type { PushNotificationObject } from "./push";
import type { MyUploadFile } from "../mtproto/apiFileManager";
import SuperMessagePort from "../mtproto/superMessagePort";
import { MOUNT_CLASS_TO } from "../../config/debug";
export type ServicePushPingTaskPayload = {
localNotifications: boolean,
lang: {
push_action_mute1d: string
push_action_settings: string
push_message_nopreview: string
},
settings: WebPushApiManager['settings']
};
export type ServiceRequestFilePartTaskPayload = {
docId: DocId,
dcId: number,
offset: number,
limit: number
};
export type ServiceEvent = {
port: (payload: void, source: MessageEventSource, event: MessageEvent) => void
};
export default class ServiceMessagePort<Master extends boolean = false> extends SuperMessagePort<{
// from main thread to service worker
notificationsClear: () => void,
toggleStorages: (payload: {enabled: boolean, clearWrite: boolean}) => void,
pushPing: (payload: ServicePushPingTaskPayload, source: MessageEventSource, event: MessageEvent) => void,
hello: (payload: void, source: MessageEventSource, event: MessageEvent) => void
}, {
// to main thread
pushClick: (payload: PushNotificationObject) => void,
// to mtproto worker
requestFilePart: (payload: ServiceRequestFilePartTaskPayload) => Promise<MyUploadFile> | MyUploadFile
} & ServiceEvent, Master> {
constructor() {
super('SERVICE');
MOUNT_CLASS_TO && (MOUNT_CLASS_TO.serviceMessagePort = this);
}
}

91
src/lib/serviceWorker/stream.ts

@ -6,17 +6,19 @@ @@ -6,17 +6,19 @@
import readBlobAsUint8Array from "../../helpers/blob/readBlobAsUint8Array";
import deferredPromise, { CancellablePromise } from "../../helpers/cancellablePromise";
import { getWindowClients } from "../../helpers/context";
import debounce from "../../helpers/schedulers/debounce";
import { InputFileLocation, UploadFile } from "../../layer";
import { InputFileLocation } from "../../layer";
import CacheStorageController from "../cacheStorage";
import { DownloadOptions } from "../mtproto/apiFileManager";
import { RequestFilePartTask, deferredPromises, log } from "./index.service";
import { DownloadOptions, MyUploadFile } from "../mtproto/apiFileManager";
import { getMtprotoMessagePort, log, serviceMessagePort } from "./index.service";
import { ServiceRequestFilePartTaskPayload } from "./serviceMessagePort";
import timeout from "./timeout";
const deferredPromises: Map<MessagePort, {[taskId: string]: CancellablePromise<MyUploadFile>}> = new Map();
const cacheStorage = new CacheStorageController('cachedStreamChunks');
const CHUNK_TTL = 86400;
const CHUNK_CACHED_TIME_HEADER = 'Time-Cached';
const USE_CACHE = false;
const clearOldChunks = () => {
return cacheStorage.timeoutOperation((cache) => {
@ -49,18 +51,17 @@ const clearOldChunks = () => { @@ -49,18 +51,17 @@ const clearOldChunks = () => {
setInterval(clearOldChunks, 1800e3);
setInterval(() => {
getWindowClients().then((clients) => {
for(const [clientId, promises] of deferredPromises) {
if(!clients.find((client) => client.id === clientId)) {
for(const taskId in promises) {
const promise = promises[taskId];
promise.reject();
}
deferredPromises.delete(clientId);
const mtprotoMessagePort = getMtprotoMessagePort();
for(const [messagePort, promises] of deferredPromises) {
if(messagePort !== mtprotoMessagePort) {
for(const taskId in promises) {
const promise = promises[taskId];
promise.reject();
}
deferredPromises.delete(messagePort);
}
});
}
}, 120e3);
type StreamRange = [number, number];
@ -86,54 +87,56 @@ class Stream { @@ -86,54 +87,56 @@ class Stream {
};
private async requestFilePartFromWorker(alignedOffset: number, limit: number, fromPreload = false) {
const task: Omit<RequestFilePartTask, 'id'> = {
type: 'requestFilePart',
payload: {
docId: this.id,
dcId: this.info.dcId,
offset: alignedOffset,
limit
}
const payload: ServiceRequestFilePartTaskPayload = {
docId: this.id,
dcId: this.info.dcId,
offset: alignedOffset,
limit
};
const taskId = JSON.stringify(task);
(task as RequestFilePartTask).id = taskId;
const windowClient = await getWindowClients().then((clients) => {
if(!clients.length) {
return;
}
return clients.find((client) => deferredPromises.has(client.id)) || clients[0];
});
if(!windowClient) {
throw new Error('no window');
}
const taskId = JSON.stringify(payload);
let promises = deferredPromises.get(windowClient.id);
const mtprotoMessagePort = getMtprotoMessagePort();
let promises = deferredPromises.get(mtprotoMessagePort);
if(!promises) {
deferredPromises.set(windowClient.id, promises = {});
deferredPromises.set(mtprotoMessagePort, promises = {});
}
let deferred = promises[taskId] as CancellablePromise<UploadFile.uploadFile>;
let deferred = promises[taskId];
if(deferred) {
return deferred.then((uploadFile) => uploadFile.bytes);
}
windowClient.postMessage(task);
this.loadedOffsets.add(alignedOffset);
deferred = promises[taskId] = deferredPromise<UploadFile.uploadFile>();
deferred = promises[taskId] = deferredPromise();
serviceMessagePort.invoke('requestFilePart', payload, undefined, mtprotoMessagePort)
.then(deferred.resolve, deferred.reject).finally(() => {
if(promises[taskId] === deferred) {
delete promises[taskId];
if(!Object.keys(promises).length) {
deferredPromises.delete(mtprotoMessagePort);
}
}
});
const bytesPromise = deferred.then((uploadFile) => uploadFile.bytes);
this.saveChunkToCache(bytesPromise, alignedOffset, limit);
!fromPreload && this.preloadChunks(alignedOffset, alignedOffset + (this.limitPart * 15));
if(USE_CACHE) {
this.saveChunkToCache(bytesPromise, alignedOffset, limit);
!fromPreload && this.preloadChunks(alignedOffset, alignedOffset + (this.limitPart * 15));
}
return bytesPromise;
}
private requestFilePartFromCache(alignedOffset: number, limit: number, fromPreload?: boolean) {
if(!USE_CACHE) {
return Promise.resolve();
}
const key = this.getChunkKey(alignedOffset, limit);
return cacheStorage.getFile(key).then((blob: Blob) => {
return fromPreload ? new Uint8Array() : readBlobAsUint8Array(blob);

Loading…
Cancel
Save