diff --git a/src/lib/mtproto/dcConfigurator.ts b/src/lib/mtproto/dcConfigurator.ts index 4d8e89dd..c6f260f4 100644 --- a/src/lib/mtproto/dcConfigurator.ts +++ b/src/lib/mtproto/dcConfigurator.ts @@ -9,7 +9,7 @@ * https://github.com/zhukov/webogram/blob/master/LICENSE */ -import MTTransport, { MTConnection, MTConnectionConstructable } from './transports/transport'; +import MTTransport, { MTConnectionConstructable } from './transports/transport'; import Modes from '../../config/modes'; /// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD @@ -19,9 +19,9 @@ import HTTP from './transports/http'; /// #if !MTPROTO_HTTP import Socket from './transports/websocket'; import TcpObfuscated from './transports/tcpObfuscated'; -import EventListenerBase from '../../helpers/eventListenerBase'; import { isSafari } from '../../helpers/userAgent'; -import { notifyAll, isWebWorker } from '../../helpers/context'; +import { isWebWorker } from '../../helpers/context'; +import SocketProxied from './transports/socketProxied'; /// #endif export type TransportType = 'websocket' | 'https' | 'http'; @@ -34,65 +34,8 @@ type Servers = { } }; -let socketId = 0; const TEST_SUFFIX = Modes.test ? '_test' : ''; -/// #if !MTPROTO_SW -class SocketProxied extends EventListenerBase<{ - open: () => void, - message: (buffer: ArrayBuffer) => any, - close: () => void, -}> implements MTConnection { - private id: number; - - constructor(protected dcId: number, protected url: string, logSuffix: string) { - super(); - this.id = ++socketId; - socketsProxied.set(this.id, this); - - notifyAll({ - type: 'socketProxy', - payload: { - type: 'setup', - payload: { - dcId, - url, - logSuffix - }, - id: this.id - } - }); - } - - public send(payload: Uint8Array) { - const task: any = { - type: 'socketProxy', - payload: { - type: 'send', - payload, - id: this.id - } - }; - - notifyAll(task); - } - - public close() { - const task: any = { - type: 'socketProxy', - payload: { - type: 'close', - id: this.id - } - }; - - notifyAll(task); - } -} -/// #endif - -export const socketsProxied: Map = new Map(); - export class DcConfigurator { private sslSubdomains = ['pluto', 'venus', 'aurora', 'vesta', 'flora']; diff --git a/src/lib/mtproto/mtproto.worker.ts b/src/lib/mtproto/mtproto.worker.ts index 9a6bd2c5..d658961d 100644 --- a/src/lib/mtproto/mtproto.worker.ts +++ b/src/lib/mtproto/mtproto.worker.ts @@ -13,13 +13,13 @@ import networkerFactory from "./networkerFactory"; import apiFileManager from './apiFileManager'; import type { RequestFilePartTask, RequestFilePartTaskResponse } from '../serviceWorker/index.service'; import { ctx } from '../../helpers/userAgent'; -import { socketsProxied } from './dcConfigurator'; import { notifyAll } from '../../helpers/context'; // import AppStorage from '../storage'; import CacheStorageController from '../cacheStorage'; import sessionStorage from '../sessionStorage'; import { LocalStorageProxyTask } from '../localStorage'; import { WebpConvertTask } from '../webp/webpWorkerController'; +import { socketsProxied } from './transports/socketProxied'; let webpSupported = false; export const isWebpSupported = () => { diff --git a/src/lib/mtproto/mtprotoworker.ts b/src/lib/mtproto/mtprotoworker.ts index 1e39db92..f6d26ea6 100644 --- a/src/lib/mtproto/mtprotoworker.ts +++ b/src/lib/mtproto/mtprotoworker.ts @@ -26,6 +26,7 @@ import sessionStorage from '../sessionStorage'; import webPushApiManager from './webPushApiManager'; import AppStorage from '../storage'; import appRuntimeManager from '../appManagers/appRuntimeManager'; +import { SocketProxyTask } from './transports/socketProxied'; type Task = { taskId: number, @@ -115,7 +116,7 @@ export class ApiManagerProxy extends CryptoWorkerMethods { webpWorkerController.postMessage(task); }); - this.addTaskListener('socketProxy', (task) => { + this.addTaskListener('socketProxy', (task: SocketProxyTask) => { const socketTask = task.payload; const id = socketTask.id; //console.log('socketProxy', socketTask, id); @@ -123,7 +124,7 @@ export class ApiManagerProxy extends CryptoWorkerMethods { if(socketTask.type === 'send') { const socket = this.sockets.get(id); socket.send(socketTask.payload); - } else if(socketTask.type === 'close') { + } else if(socketTask.type === 'close') { // will remove from map in onClose const socket = this.sockets.get(id); socket.close(); } else if(socketTask.type === 'setup') { diff --git a/src/lib/mtproto/networker.ts b/src/lib/mtproto/networker.ts index 594f454e..152818d4 100644 --- a/src/lib/mtproto/networker.ts +++ b/src/lib/mtproto/networker.ts @@ -17,7 +17,7 @@ import Schema from './schema'; import timeManager from './timeManager'; import networkerFactory from './networkerFactory'; import { logger, LogTypes } from '../logger'; -import { InvokeApiOptions } from '../../types'; +import { assumeType, InvokeApiOptions } from '../../types'; import { longToBytes } from '../crypto/crypto_utils'; import MTTransport from './transports/transport'; import { convertToUint8Array, bufferConcat, bytesCmp, bytesToHex } from '../../helpers/bytes'; @@ -31,7 +31,7 @@ import HTTP from './transports/http'; /// #endif import type TcpObfuscated from './transports/tcpObfuscated'; -import { bigInt2str, cmp, rightShift_, str2bigInt } from '../../vendor/leemon'; +import { bigInt2str, rightShift_, str2bigInt } from '../../vendor/leemon'; import { forEachReverse } from '../../helpers/array'; //console.error('networker included!', new Error().stack); @@ -369,6 +369,11 @@ export default class MTPNetworker { return this.pushMessage(message, options); } + public destroy() { + assumeType(this.transport); + this.transport.destroy(); + } + // private sendPingDelayDisconnect = () => { // if(this.pingPromise || true) return; @@ -693,7 +698,7 @@ export default class MTPNetworker { promise.finally(() => { clearTimeout(timeout); this.setConnectionStatus(true); - + if(!--this.activeRequests && this.onDrain) { this.onDrainTimeout = self.setTimeout(() => { this.log('drain'); diff --git a/src/lib/mtproto/transports/socketProxied.ts b/src/lib/mtproto/transports/socketProxied.ts new file mode 100644 index 00000000..699d7b3d --- /dev/null +++ b/src/lib/mtproto/transports/socketProxied.ts @@ -0,0 +1,95 @@ +/* + * https://github.com/morethanwords/tweb + * Copyright (C) 2019-2021 Eduard Kuzmenko + * https://github.com/morethanwords/tweb/blob/master/LICENSE + */ + +import { notifyAll } from "../../../helpers/context"; +import EventListenerBase from "../../../helpers/eventListenerBase"; +import { WorkerTaskVoidTemplate } from "../../../types"; +import { MTConnection } from "./transport"; + +let socketId = 0; +export interface SocketProxyTask extends WorkerTaskVoidTemplate { + type: 'socketProxy', + payload: SocketProxySetupTask | SocketProxySendTask | SocketProxyCloseTask +}; + +export interface SocketProxySetupTask extends WorkerTaskVoidTemplate { + type: 'setup', + payload: { + dcId: number, + url: string, + logSuffix: string + }, + id: number +}; + +export interface SocketProxySendTask extends WorkerTaskVoidTemplate { + type: 'send', + payload: Uint8Array, + id: number +}; + +export interface SocketProxyCloseTask extends WorkerTaskVoidTemplate { + type: 'close', + id: number +}; + +/// #if !MTPROTO_SW +export default class SocketProxied extends EventListenerBase<{ + open: () => void, + message: (buffer: ArrayBuffer) => any, + close: () => void, +}> implements MTConnection { + private id: number; + + constructor(protected dcId: number, protected url: string, logSuffix: string) { + super(); + this.id = ++socketId; + socketsProxied.set(this.id, this); + + const task: SocketProxyTask = { + type: 'socketProxy', + payload: { + type: 'setup', + payload: { + dcId, + url, + logSuffix + }, + id: this.id + } + }; + + notifyAll(task); + } + + public send(payload: Uint8Array) { + const task: SocketProxyTask = { + type: 'socketProxy', + payload: { + type: 'send', + payload, + id: this.id + } + }; + + notifyAll(task); + } + + public close() { + const task: SocketProxyTask = { + type: 'socketProxy', + payload: { + type: 'close', + id: this.id + } + }; + + notifyAll(task); + } +} +/// #endif + +export const socketsProxied: Map = new Map(); diff --git a/src/lib/mtproto/transports/tcpObfuscated.ts b/src/lib/mtproto/transports/tcpObfuscated.ts index 996f3297..bfdb2244 100644 --- a/src/lib/mtproto/transports/tcpObfuscated.ts +++ b/src/lib/mtproto/transports/tcpObfuscated.ts @@ -30,6 +30,9 @@ export default class TcpObfuscated implements MTTransport { private lastCloseTime: number; private connection: MTConnection; + private autoReconnect = true; + private reconnectTimeout: number; + //private debugPayloads: MTPNetworker['debugRequests'] = []; constructor(private Connection: MTConnectionConstructable, @@ -115,31 +118,80 @@ export default class TcpObfuscated implements MTTransport { this.connection.removeEventListener('message', this.onMessage); this.connection = undefined; - const time = Date.now(); - const diff = time - this.lastCloseTime; - const needTimeout = !isNaN(diff) && diff < this.retryTimeout ? this.retryTimeout - diff : 0; + let needTimeout: number; + if(this.autoReconnect) { + const time = Date.now(); + const diff = time - this.lastCloseTime; + needTimeout = !isNaN(diff) && diff < this.retryTimeout ? this.retryTimeout - diff : 0; + } if(this.networker) { this.networker.setConnectionStatus(false, needTimeout); this.pending.length = 0; } + + if(this.autoReconnect) { + this.log('will try to reconnect after timeout:', needTimeout / 1000); + this.reconnectTimeout = self.setTimeout(this.reconnect, needTimeout); + } else { + this.log('reconnect isn\'t needed'); + } + }; + + /** + * invoke only when closed + */ + public reconnect = () => { + if(this.reconnectTimeout !== undefined) { + clearTimeout(this.reconnectTimeout); + this.reconnectTimeout = undefined; + } + + if(this.connected) { + return; + } + + this.log('trying to reconnect...'); + this.lastCloseTime = Date.now(); - this.log('will try to reconnect after timeout:', needTimeout / 1000); - setTimeout(() => { - this.log('trying to reconnect...'); - this.lastCloseTime = Date.now(); - - if(!this.networker) { - for(const pending of this.pending) { - if(pending.bodySent) { - pending.bodySent = false; - } + if(!this.networker) { + for(const pending of this.pending) { + if(pending.bodySent) { + pending.bodySent = false; } } - - this.connect(); - }, needTimeout); - }; + } + + this.connect(); + } + + public destroy() { + this.setAutoReconnect(false); + this.close(); + } + + public close() { + if(this.connection) { + this.connection.close(); + } + } + + /** + * Will connect if enable and disconnected \ + * Will reset reconnection timeout if disable + */ + public setAutoReconnect(enable: boolean) { + this.autoReconnect = enable; + + if(!enable) { + if(this.reconnectTimeout !== undefined) { + clearTimeout(this.reconnectTimeout); + this.reconnectTimeout = undefined; + } + } else if(!this.connection && this.reconnectTimeout === undefined) { + this.reconnect(); + } + } private connect() { this.connection = new this.Connection(this.dcId, this.url, this.logSuffix);