/* * https://github.com/morethanwords/tweb * Copyright (C) 2019-2021 Eduard Kuzmenko * https://github.com/morethanwords/tweb/blob/master/LICENSE */ import DEBUG from '../../config/debug'; import ctx from '../../environment/ctx'; import indexOfAndSplice from '../../helpers/array/indexOfAndSplice'; import {IS_WORKER} from '../../helpers/context'; import EventListenerBase from '../../helpers/eventListenerBase'; import makeError from '../../helpers/makeError'; import {Awaited, WorkerTaskTemplate, WorkerTaskVoidTemplate} from '../../types'; import {logger} from '../logger'; type SuperMessagePortTask = WorkerTaskTemplate & { transfer?: Transferable[] }; interface InvokeTask extends SuperMessagePortTask { type: 'invoke', payload: WorkerTaskVoidTemplate & {withAck?: boolean, void?: boolean} } interface ResultTask extends SuperMessagePortTask { type: 'result', payload: { taskId: number, result?: any, error?: any } } interface AckTask extends SuperMessagePortTask { type: 'ack', payload: { cached: boolean, taskId: number result?: any, error?: any, } } interface PingTask extends SuperMessagePortTask { type: 'ping' } interface PongTask extends SuperMessagePortTask { type: 'pong' } interface BatchTask extends SuperMessagePortTask { type: 'batch', payload: Task[] } interface CloseTask extends SuperMessagePortTask { type: 'close' } // interface OpenTask extends SuperMessagePortTask { // type: 'open' // } interface LockTask extends SuperMessagePortTask { type: 'lock', payload: string } type Task = InvokeTask | ResultTask | AckTask | PingTask | PongTask | BatchTask | CloseTask/* | OpenTask */ | LockTask; type TaskMap = { [type in Task as type['type']]?: (task: Extract, source: MessageEventSource, event: MessageEvent) => void | Promise }; export type AckedResult = { cached: boolean, result: Promise }; // export type AckedResult = { // cached: true, // result: T // } | { // cached: false, // result: Promise // }; type ListenPort = WindowProxy | MessagePort | ServiceWorker | Worker | ServiceWorkerContainer; type SendPort = Pick/* WindowProxy | MessagePort | ServiceWorker | Worker */; export type MessageListenPort = ListenPort; export type MessageSendPort = SendPort; type ListenerCallback = (payload: any, source: MessageEventSource, event: MessageEvent) => any; type Listeners = Record; // const PING_INTERVAL = DEBUG && false ? 0x7FFFFFFF : 5000; // const PING_TIMEOUT = DEBUG && false ? 0x7FFFFFFF : 10000; export default class SuperMessagePort< Workers extends Listeners, Masters extends Listeners, IsMaster extends boolean, Receive extends Listeners = IsMaster extends true ? Masters : Workers, Send extends Listeners = IsMaster extends true ? Workers : Masters > extends EventListenerBase { protected listenPorts: Array; protected sendPorts: Array; protected pingResolves: Map void>; protected taskId: number; protected awaiting: { [id: number]: { resolve: any, reject: any, taskType: string, port?: SendPort } }; protected pending: Map; protected log: ReturnType; protected debug: boolean; protected releasingPending: boolean; protected processTaskMap: TaskMap; protected onPortDisconnect: (source: MessageEventSource) => void; // protected onPortConnect: (source: MessageEventSource) => void; constructor(logSuffix?: string) { super(false); this.listenPorts = []; this.sendPorts = []; this.pingResolves = new Map(); this.taskId = 0; this.awaiting = {}; this.pending = new Map(); this.log = logger('MP' + (logSuffix ? '-' + logSuffix : '')); this.debug = DEBUG; if('locks' in navigator) { const id = 'lock-' + Date.now() + (Math.random() * 0xFFFF | 0); navigator.locks.request(id, () => new Promise(() => {})); this.pushTask(this.createTask('lock', id)); } else if(typeof(window) !== 'undefined') { window.addEventListener('beforeunload', () => { const task = this.createTask('close', undefined); this.postMessage(undefined, task); }); } this.processTaskMap = { result: this.processResultTask, ack: this.processAckTask, invoke: this.processInvokeTask, ping: this.processPingTask, pong: this.processPongTask, close: this.processCloseTask, // open: this.processOpenTask, lock: this.processLockTask }; } public setOnPortDisconnect(callback: (source: MessageEventSource) => void) { this.onPortDisconnect = callback; } // public setOnPortConnect(callback: (source: MessageEventSource) => void) { // this.onPortConnect = callback; // } public attachPort(port: MessageEventSource) { this.attachListenPort(port); this.attachSendPort(port); } public attachListenPort(port: ListenPort) { this.listenPorts.push(port); port.addEventListener('message', this.onMessage as any); } public attachSendPort(port: SendPort) { this.log.warn('attaching send port'); (port as MessagePort).start?.(); this.sendPorts.push(port); // this.sendPing(port); // const task = this.createTask('open', undefined); // this.postMessage(port, task); this.releasePending(); } // ! Can't rely on ping because timers can be suspended // protected sendPing(port: SendPort, loop = IS_WORKER) { // let timeout: number; // const promise = new Promise((resolve, reject) => { // this.pingResolves.set(port, resolve); // this.pushTask(this.createTask('ping', undefined), port); // timeout = ctx.setTimeout(() => { // reject(); // }, PING_TIMEOUT); // }); // promise.then(() => { // // this.log('got pong'); // clearTimeout(timeout); // this.pingResolves.delete(port); // if(loop) { // this.sendPingWithTimeout(port); // } // }, () => { // this.pingResolves.delete(port); // this.detachPort(port); // }); // } // protected sendPingWithTimeout(port: SendPort, timeout = PING_INTERVAL) { // ctx.setTimeout(() => { // if(!this.sendPorts.includes(port)) { // return; // } // this.sendPing(port); // }, timeout); // } public detachPort(port: ListenPort) { this.log.warn('disconnecting port'); indexOfAndSplice(this.listenPorts, port); indexOfAndSplice(this.sendPorts, port as any); port.removeEventListener?.('message', this.onMessage as any); (port as MessagePort).close?.(); this.onPortDisconnect?.(port as any); const error = makeError('PORT_DISCONNECTED'); for(const id in this.awaiting) { const task = this.awaiting[id]; if(task.port === port) { task.reject(error); delete this.awaiting[id]; } } } protected postMessage(port: SendPort | SendPort[], task: Task) { const ports = Array.isArray(port) ? port : (port ? [port] : this.sendPorts); ports.forEach((port) => { port.postMessage(task, task.transfer as any); }); } protected onMessage = (event: MessageEvent) => { const task: Task = event.data; // this.log('got message', task); const source: MessageEventSource = event.source || event.currentTarget as any; // can have no source /* if(task.type === 'batch') { const newEvent: MessageEvent = {data: event.data, source: event.source, currentTarget: event.currentTarget} as any; task.payload.forEach((task) => { // @ts-ignore newEvent.data = task; this.onMessage(newEvent); }); } */ // @ts-ignore this.processTaskMap[task.type](task, source, event); }; protected /* async */ releasePending() { // return; if(/* !this.listenPorts.length || !this.sendPorts.length || */this.releasingPending) { return; } this.releasingPending = true; // const perf = performance.now(); // await pause(0); this.debug && this.log.debug('releasing tasks, length:', this.pending.size/* , performance.now() - perf */); this.pending.forEach((portTasks, port) => { // let batchTask: BatchTask; // const tasks: Task[] = []; // portTasks.forEach((task) => { // if(task.transfer) { // batchTask = undefined; // tasks.push(task); // } else { // if(!batchTask) { // batchTask = this.createTask('batch', []); // tasks.push(batchTask); // } // batchTask.payload.push(task); // } // }); const tasks = portTasks; const ports = port ? [port] : this.sendPorts; if(!ports.length) { return; } tasks.forEach((task) => { // if(task.type === 'batch') { // this.log(`batching ${task.payload.length} tasks`); // } try { // if(IS_SERVICE_WORKER && !port) { // notifyAll(task); // } else { this.postMessage(ports, task); // } } catch(err) { this.log.error('postMessage error:', err, task, ports); } }); this.pending.delete(port); }); this.debug && this.log.debug('released tasks'); this.releasingPending = false; } protected processResultTask = (task: ResultTask) => { const {taskId, result, error} = task.payload; const deferred = this.awaiting[taskId]; if(!deferred) { return; } this.debug && this.log.debug('done', deferred.taskType, result, error); 'error' in task.payload ? deferred.reject(error) : deferred.resolve(result); delete this.awaiting[taskId]; }; protected processAckTask = (task: AckTask) => { const payload = task.payload; const deferred = this.awaiting[payload.taskId]; if(!deferred) { return; } // * will finish the init promise with incoming result const previousResolve: (acked: AckedResult) => void = deferred.resolve; // const previousReject = deferred.reject; // if(payload.cached) { // if('result' in payload) { // previousResolve({ // cached: true, // result: payload.result // }); // } else { // previousReject(payload.error); // } // } else { // const ret: AckedResult = { // cached: false, // result: new Promise((resolve, reject) => { // deferred.resolve = resolve; // deferred.reject = reject; // }) // }; // previousResolve(ret); // } const ret: AckedResult = { cached: payload.cached, result: payload.cached ? ('result' in payload ? Promise.resolve(payload.result) : Promise.reject(payload.error)) : new Promise((resolve, reject) => { deferred.resolve = resolve; deferred.reject = reject; }) }; previousResolve(ret); if(payload.cached) { delete this.awaiting[payload.taskId]; } }; protected processPingTask = (task: PingTask, source: MessageEventSource, event: MessageEvent) => { this.pushTask(this.createTask('pong', undefined), event.source); }; protected processPongTask = (task: PongTask, source: MessageEventSource, event: MessageEvent) => { const pingResolve = this.pingResolves.get(source); if(pingResolve) { this.pingResolves.delete(source); pingResolve(); } }; protected processCloseTask = (task: CloseTask, source: MessageEventSource, event: MessageEvent) => { this.detachPort(source); }; // * it's just an 'open' callback, DO NOT attach port from here // protected processOpenTask = (task: OpenTask, source: MessageEventSource, event: MessageEvent) => { // this.onPortConnect?.(source); // }; protected processLockTask = (task: LockTask, source: MessageEventSource, event: MessageEvent) => { navigator.locks.request(task.payload, () => { this.processCloseTask(undefined, source, undefined); }); }; protected processInvokeTask = async(task: InvokeTask, source: MessageEventSource, event: MessageEvent) => { const id = task.id; const innerTask = task.payload; let resultTaskPayload: ResultTask['payload']; let resultTask: ResultTask, ackTask: AckTask; if(!innerTask.void) { resultTaskPayload = {taskId: id}; resultTask = this.createTask('result', resultTaskPayload); } if(innerTask.withAck) { ackTask = this.createTask('ack', { taskId: id, cached: true }); } let isPromise: boolean; try { const listeners = this.listeners[innerTask.type]; if(!listeners?.length) { throw new Error('no listener'); } const listener = listeners[0]; // @ts-ignore let result = this.invokeListenerCallback(innerTask.type, listener, innerTask.payload, source, event); if(innerTask.void) { return; } isPromise = result instanceof Promise; if(ackTask) { const cached = !isPromise; ackTask.payload.cached = cached; if(cached) ackTask.payload.result = result; this.pushTask(ackTask, source); if(cached) { return; } } if(isPromise) { result = await result; } resultTaskPayload.result = result; } catch(error) { this.log.error('worker task error:', error, task); if(innerTask.void) { return; } if(ackTask && ackTask.payload.cached) { ackTask.payload.error = error; this.pushTask(ackTask, source); return; } resultTaskPayload.error = error; } this.pushTask(resultTask, source); }; protected createTask[0]>(type: T, payload: K['payload'], transfer?: Transferable[]): K { return { type, payload, id: this.taskId++, transfer } as K; } protected createInvokeTask(type: string, payload: any, withAck?: boolean, _void?: boolean, transfer?: Transferable[]): InvokeTask { return this.createTask('invoke', { type, payload, withAck, void: _void }, transfer); } protected pushTask(task: Task, port?: SendPort) { let tasks = this.pending.get(port); if(!tasks) { this.pending.set(port, tasks = []); } tasks.push(task); this.releasePending(); } public invokeVoid(type: T, payload: Parameters[0], port?: SendPort, transfer?: Transferable[]) { const task = this.createInvokeTask(type as string, payload, undefined, true, transfer); this.pushTask(task, port); } public invoke(type: T, payload: Parameters[0], withAck?: false, port?: SendPort, transfer?: Transferable[]): Promise>>; public invoke(type: T, payload: Parameters[0], withAck?: true, port?: SendPort, transfer?: Transferable[]): Promise>>>; public invoke(type: T, payload: Parameters[0], withAck?: boolean, port?: SendPort, transfer?: Transferable[]) { this.debug && this.log.debug('start', type, payload); let task: InvokeTask; const promise = new Promise>>((resolve, reject) => { task = this.createInvokeTask(type as string, payload, withAck, undefined, transfer); this.awaiting[task.id] = {resolve, reject, taskType: type as string, port}; this.pushTask(task, port); }); if(IS_WORKER) { promise.finally(() => { clearInterval(interval); }); const interval = ctx.setInterval(() => { this.log.error('task still has no result', task, port); }, 5e3); } else if(false) { // let timedOut = false; const startTime = Date.now(); promise.finally(() => { const elapsedTime = Date.now() - startTime; if(elapsedTime >= TIMEOUT) { this.log.error(`task was processing ${Date.now() - startTime}ms`, task.payload.payload, port); }/* else { clearTimeout(timeout); } */ }); const TIMEOUT = 10; // const timeout = ctx.setTimeout(() => { // timedOut = true; // // this.log.error(`task is processing more than ${TIMEOUT} milliseconds`, task, port); // }, TIMEOUT); } return promise; } public invokeExceptSource(type: T, payload: Parameters[0], source?: SendPort) { const ports = this.sendPorts.slice(); indexOfAndSplice(ports, source); ports.forEach((target) => { this.invokeVoid(type, payload, target); }); } }