diff --git a/src/lib/mtproto/authorizer.ts b/src/lib/mtproto/authorizer.ts index 7c5a7b85..472b7ecf 100644 --- a/src/lib/mtproto/authorizer.ts +++ b/src/lib/mtproto/authorizer.ts @@ -105,7 +105,8 @@ export class Authorizer { this.log('mtpSendPlainRequest: creating requestPromise'); } - return transport.send(resultArray).then(result => { + const promise = transport.send(resultArray) as any as Promise; + return promise.then(result => { if(DEBUG) { this.log('mtpSendPlainRequest: in good sector', result); } diff --git a/src/lib/mtproto/dcConfigurator.ts b/src/lib/mtproto/dcConfigurator.ts index f266b7d1..df54d79e 100644 --- a/src/lib/mtproto/dcConfigurator.ts +++ b/src/lib/mtproto/dcConfigurator.ts @@ -1,21 +1,16 @@ -import MTTransport from './transports/transport'; +import MTTransport, { MTConnection, MTConnectionConstructable } from './transports/transport'; import Modes from '../../config/modes'; -/// #if MTPROTO_HTTP_UPLOAD -// @ts-ignore -import TcpObfuscated from './transports/tcpObfuscated'; -// @ts-ignore +/// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD import HTTP from './transports/http'; -/// #elif !MTPROTO_HTTP -// @ts-ignore +/// #endif + +/// #if !MTPROTO_HTTP +import Socket from './transports/websocket'; import TcpObfuscated from './transports/tcpObfuscated'; +import EventListenerBase from '../../helpers/eventListenerBase'; import { isSafari } from '../../helpers/userAgent'; -import type MTPNetworker from './networker'; import { notifyAll, isWebWorker } from '../../helpers/context'; -import { CancellablePromise, deferredPromise } from '../../helpers/cancellablePromise'; -/// #else -// @ts-ignore -import HTTP from './transports/http'; /// #endif export type TransportType = 'websocket' | 'https' | 'http'; @@ -31,6 +26,48 @@ type Servers = { let socketId = 0; const TEST_SUFFIX = Modes.test ? '_test' : ''; +class SocketProxied extends EventListenerBase<{ + open: () => void, + message: (buffer: ArrayBuffer) => any, + close: () => void, +}> implements MTConnection { + private id: number; + + constructor(protected dcId: number, protected url: string, logSuffix: string) { + super(); + this.id = ++socketId; + socketsProxied.set(this.id, this); + + notifyAll({ + type: 'socketProxy', + payload: { + type: 'setup', + payload: { + dcId, + url, + logSuffix + }, + id: this.id + } + }); + } + + send = (payload: Uint8Array) => { + const task: any = { + type: 'socketProxy', + payload: { + type: 'send', + payload, + id: this.id + } + }; + + notifyAll(task); + }; +} + +export const socketsProxied: Map = new Map(); + export class DcConfigurator { private sslSubdomains = ['pluto', 'venus', 'aurora', 'vesta', 'flora']; @@ -58,58 +95,10 @@ export class DcConfigurator { const logSuffix = connectionType === 'upload' ? '-U' : connectionType === 'download' ? '-D' : ''; const retryTimeout = connectionType === 'client' ? 30000 : 10000; - if(isSafari && isWebWorker && false) { - class P implements MTTransport { - private id: number; - private taskId = 0; - public networker: MTPNetworker; - public promises: Map> = new Map(); - constructor(dcId: number, url: string) { - this.id = ++socketId; + const oooohLetMeLive: MTConnectionConstructable = (isSafari && isWebWorker) || true ? SocketProxied : Socket; - notifyAll({ - type: 'socketProxy', - payload: { - type: 'setup', - payload: { - dcId, - url, - logSuffix, - retryTimeout - }, - id: this.id - } - }); - } - - send = (payload: Uint8Array) => { - const task: any = { - type: 'socketProxy', - payload: { - type: 'send', - payload, - id: this.id - } - }; - - if(this.networker) { - notifyAll(task); - return null; - } - - task.payload.taskId = ++this.taskId; - const deferred = deferredPromise(); - this.promises.set(task.id, deferred); - notifyAll(task); - return deferred; - }; - } - - return new P(dcId, chosenServer); - } else { - return new TcpObfuscated(dcId, chosenServer, logSuffix, retryTimeout); - } + return new TcpObfuscated(oooohLetMeLive, dcId, chosenServer, logSuffix, retryTimeout); }; /// #endif diff --git a/src/lib/mtproto/mtproto.worker.ts b/src/lib/mtproto/mtproto.worker.ts index 0fce0cf8..4c41c33c 100644 --- a/src/lib/mtproto/mtproto.worker.ts +++ b/src/lib/mtproto/mtproto.worker.ts @@ -8,6 +8,7 @@ import apiFileManager from './apiFileManager'; //import { logger, LogLevels } from '../logger'; import type { ServiceWorkerTask, ServiceWorkerTaskResponse } from './mtproto.service'; import { ctx } from '../../helpers/userAgent'; +import { socketsProxied } from './dcConfigurator'; //const log = logger('DW', LogLevels.error); @@ -109,6 +110,19 @@ const onMessage = async(e: any) => { } else if(task.type === 'webpSupport') { webpSupported = task.payload; return; + } else if(task.type === 'socketProxy') { + const socketTask = task.payload; + const id = socketTask.id; + + const socketProxied = socketsProxied.get(id); + if(socketTask.type === 'message') { + socketProxied.setListenerResult('message', socketTask.payload); + } else if(socketTask.type === 'open') { + socketProxied.setListenerResult('open'); + } else if(socketTask.type === 'close') { + socketProxied.setListenerResult('close'); + socketsProxied.delete(id); + } } if(!task.task) { diff --git a/src/lib/mtproto/mtprotoworker.ts b/src/lib/mtproto/mtprotoworker.ts index c3888690..0a67a19a 100644 --- a/src/lib/mtproto/mtprotoworker.ts +++ b/src/lib/mtproto/mtprotoworker.ts @@ -1,5 +1,4 @@ import MTProtoWorker from 'worker-loader!./mtproto.worker'; -import SocketWorker from 'worker-loader!./transports/websocket'; //import './mtproto.worker'; import { isObject } from '../../helpers/object'; import type { MethodDeclMap } from '../../layer'; @@ -16,6 +15,7 @@ import type { MTMessage } from './networker'; import referenceDatabase from './referenceDatabase'; import appDocsManager from '../appManagers/appDocsManager'; import DEBUG, { MOUNT_CLASS_TO } from '../../config/debug'; +import Socket from './transports/websocket'; type Task = { taskId: number, @@ -59,7 +59,7 @@ export class ApiManagerProxy extends CryptoWorkerMethods { private debug = DEBUG; - private socketsWorkers: Map = new Map(); + private sockets: Map = new Map(); constructor() { super(); @@ -193,20 +193,53 @@ export class ApiManagerProxy extends CryptoWorkerMethods { } else if(task.type === 'socketProxy') { const socketTask = task.payload; const id = socketTask.id; - console.log('socketProxy', socketTask, id); + //console.log('socketProxy', socketTask, id); + if(socketTask.type === 'send') { - const socketWorker = this.socketsWorkers.get(id); - socketWorker.postMessage(socketTask); + const socket = this.sockets.get(id); + socket.send(socketTask.payload); } else if(socketTask.type === 'setup') { - const socketWorker = new SocketWorker(); - socketWorker.postMessage(socketTask); - socketWorker.addEventListener('message', (e) => { - const task = e.data; + const socket = new Socket(socketTask.payload.dcId, socketTask.payload.url, socketTask.payload.logSuffix); + + const onOpen = () => { + //console.log('socketProxy onOpen'); + this.postMessage({ + type: 'socketProxy', + payload: { + type: 'open', + id + } + }); + }; + const onClose = () => { + this.postMessage({ + type: 'socketProxy', + payload: { + type: 'close', + id + } + }); + socket.removeListener('open', onOpen); + socket.removeListener('close', onClose); + socket.removeListener('message', onMessage); + this.sockets.delete(id); + }; + const onMessage = (buffer: ArrayBuffer) => { + this.postMessage({ + type: 'socketProxy', + payload: { + type: 'message', + id, + payload: buffer + } + }); + }; - }); - - this.socketsWorkers.set(id, socketWorker); + socket.addListener('open', onOpen); + socket.addListener('close', onClose); + socket.addListener('message', onMessage); + this.sockets.set(id, socket); } } else if(task.hasOwnProperty('result') || task.hasOwnProperty('error')) { this.finalizeTask(task.taskId, task.result, task.error); diff --git a/src/lib/mtproto/networker.ts b/src/lib/mtproto/networker.ts index 0023c140..032ba48c 100644 --- a/src/lib/mtproto/networker.ts +++ b/src/lib/mtproto/networker.ts @@ -1013,7 +1013,7 @@ export default class MTPNetworker { public sendEncryptedRequest(message: MTMessage) { return this.getEncryptedOutput(message).then(requestData => { - const promise = this.transport.send(requestData); + const promise: Promise = this.transport.send(requestData) as any; /// #if !MTPROTO_HTTP && !MTPROTO_HTTP_UPLOAD return promise; /// #else diff --git a/src/lib/mtproto/transports/tcpObfuscated.ts b/src/lib/mtproto/transports/tcpObfuscated.ts index dd415cb1..259e6b78 100644 --- a/src/lib/mtproto/transports/tcpObfuscated.ts +++ b/src/lib/mtproto/transports/tcpObfuscated.ts @@ -2,8 +2,7 @@ import Modes from "../../../config/modes"; import { logger, LogLevels } from "../../logger"; import MTPNetworker from "../networker"; import Obfuscation from "./obfuscation"; -import MTTransport from "./transport"; -import Socket from "./websocket"; +import MTTransport, { MTConnection, MTConnectionConstructable } from "./transport"; import intermediatePacketCodec from './intermediate'; export default class TcpObfuscated implements MTTransport { @@ -27,11 +26,11 @@ export default class TcpObfuscated implements MTTransport { private lastCloseTime: number; - private socket: Socket; + private connection: MTConnection; //private debugPayloads: MTPNetworker['debugRequests'] = []; - constructor(private dcId: number, private url: string, private logSuffix: string, public retryTimeout: number) { + constructor(private Connection: MTConnectionConstructable, private dcId: number, private url: string, private logSuffix: string, public retryTimeout: number) { let logLevel = LogLevels.error | LogLevels.log; if(this.debug) logLevel |= LogLevels.debug; this.log = logger(`WS-${dcId}` + logSuffix, logLevel); @@ -40,92 +39,100 @@ export default class TcpObfuscated implements MTTransport { this.connect(); } + private onOpen = () => { + this.connected = true; + + const initPayload = this.obfuscation.init(this.codec); + + if(this.networker) { + this.networker.setConnectionStatus(true); + + if(this.lastCloseTime) { + this.networker.cleanupSent(); + this.networker.resend(); + } + } + + setTimeout(() => { + this.releasePending(); + }, 0); + + this.connection.send(initPayload); + }; + + private onMessage = (buffer: ArrayBuffer) => { + let data = this.obfuscation.decode(new Uint8Array(buffer)); + data = this.codec.readPacket(data); + + if(this.networker) { // authenticated! + //this.pending = this.pending.filter(p => p.body); // clear pending + + this.debug && this.log.debug('redirecting to networker', data.length); + this.networker.parseResponse(data).then(response => { + this.debug && this.log.debug('redirecting to networker response:', response); + + try { + this.networker.processMessage(response.response, response.messageId, response.sessionId); + } catch(err) { + this.log.error('handleMessage networker processMessage error', err); + } + + //this.releasePending(); + }).catch(err => { + this.log.error('handleMessage networker parseResponse error', err); + }); + + //this.dd(); + return; + } + + //console.log('got hex:', data.hex); + const pending = this.pending.shift(); + if(!pending) { + this.debug && this.log.debug('no pending for res:', data.hex); + return; + } + + pending.resolve(data); + }; + + private onClose = () => { + this.connected = false; + + const time = Date.now(); + const diff = time - this.lastCloseTime; + const needTimeout = !isNaN(diff) && diff < this.retryTimeout ? this.retryTimeout - diff : 0; + + if(this.networker) { + this.networker.setConnectionStatus(false); + } + + this.log('will try to reconnect after timeout:', needTimeout / 1000); + setTimeout(() => { + this.log('trying to reconnect...'); + this.lastCloseTime = Date.now(); + + for(const pending of this.pending) { + if(pending.bodySent) { + pending.bodySent = false; + } + } + + this.connect(); + }, needTimeout); + + this.connection.removeListener('open', this.onOpen); + this.connection.removeListener('close', this.onClose); + this.connection.removeListener('message', this.onMessage); + this.connection = undefined; + }; + private connect() { - this.socket = new Socket(this.dcId, this.url, this.logSuffix); + this.connection = new this.Connection(this.dcId, this.url, this.logSuffix); - this.socket.addListener('open', () => { - this.connected = true; - - const initPayload = this.obfuscation.init(this.codec); - - if(this.networker) { - this.networker.setConnectionStatus(true); - - if(this.lastCloseTime) { - this.networker.cleanupSent(); - this.networker.resend(); - } - } - - setTimeout(() => { - this.releasePending(); - }, 0); - - this.socket.send(initPayload); - }); - - this.socket.addListener('message', (buffer) => { - let data = this.obfuscation.decode(new Uint8Array(buffer)); - data = this.codec.readPacket(data); - - if(this.networker) { // authenticated! - //this.pending = this.pending.filter(p => p.body); // clear pending - - this.debug && this.log.debug('redirecting to networker', data.length); - this.networker.parseResponse(data).then(response => { - this.debug && this.log.debug('redirecting to networker response:', response); - - try { - this.networker.processMessage(response.response, response.messageId, response.sessionId); - } catch(err) { - this.log.error('handleMessage networker processMessage error', err); - } - - //this.releasePending(); - }).catch(err => { - this.log.error('handleMessage networker parseResponse error', err); - }); - - //this.dd(); - return; - } - - //console.log('got hex:', data.hex); - const pending = this.pending.shift(); - if(!pending) { - this.debug && this.log.debug('no pending for res:', data.hex); - return; - } - - pending.resolve(data); - }); - - this.socket.addListener('close', () => { - this.connected = false; - this.socket = undefined; - - const time = Date.now(); - const diff = time - this.lastCloseTime; - const needTimeout = !isNaN(diff) && diff < this.retryTimeout ? this.retryTimeout - diff : 0; - - if(this.networker) { - this.networker.setConnectionStatus(false); - } - - this.log('will try to reconnect after timeout:', needTimeout / 1000); - setTimeout(() => { - this.log('trying to reconnect...'); - this.lastCloseTime = Date.now(); - - for(const pending of this.pending) { - if(pending.bodySent) { - pending.bodySent = false; - } - } - - this.connect(); - }, needTimeout); - }); + this.connection.addListener('open', this.onOpen); + this.connection.addListener('close', this.onClose); + this.connection.addListener('message', this.onMessage); } private encodeBody = (body: Uint8Array) => { @@ -202,7 +209,7 @@ export default class TcpObfuscated implements MTTransport { //this.lol.push(body); //setTimeout(() => { - this.socket.send(encoded); + this.connection.send(encoded); //}, 100); //this.dd(); diff --git a/src/lib/mtproto/transports/transport.ts b/src/lib/mtproto/transports/transport.ts index f9cf4b68..02892299 100644 --- a/src/lib/mtproto/transports/transport.ts +++ b/src/lib/mtproto/transports/transport.ts @@ -1,3 +1,17 @@ -export default abstract class MTTransport { - abstract send: (data: Uint8Array) => Promise; +import type EventListenerBase from "../../../helpers/eventListenerBase"; + +export default interface MTTransport { + send: (data: Uint8Array) => void; +} + +export interface MTConnection extends EventListenerBase<{ + open: () => void, + message: (buffer: ArrayBuffer) => any, + close: () => void, +}> { + send: (data: Uint8Array) => void; +} + +export interface MTConnectionConstructable { + new(dcId: number, url: string, logSuffix: string): MTConnection; } diff --git a/src/lib/mtproto/transports/websocket.ts b/src/lib/mtproto/transports/websocket.ts index f4f31404..bab2dc1b 100644 --- a/src/lib/mtproto/transports/websocket.ts +++ b/src/lib/mtproto/transports/websocket.ts @@ -1,13 +1,14 @@ import { logger, LogLevels } from '../../logger'; import Modes from '../../../config/modes'; import EventListenerBase from '../../../helpers/eventListenerBase'; +import { MTConnection } from './transport'; export default class Socket extends EventListenerBase<{ open: () => void, message: (buffer: ArrayBuffer) => any, close: () => void, -}> { - public ws: WebSocket; +}> implements MTConnection { + private ws: WebSocket; private log: ReturnType; private debug = Modes.debug && false; @@ -19,6 +20,8 @@ export default class Socket extends EventListenerBase<{ this.log = logger(`WS-${dcId}` + logSuffix, logLevel); this.log('constructor'); this.connect(); + + return this; } private removeListeners() {