From 6f4de7e0f02cba97e798b1cf0499423d41c5998f Mon Sep 17 00:00:00 2001 From: Eduard Kuzmenko Date: Fri, 26 Feb 2021 19:30:47 +0400 Subject: [PATCH] Fix mtproto containers Fix resending --- src/config/debug.ts | 4 +- src/lib/mtproto/dcConfigurator.ts | 2 +- src/lib/mtproto/networker.ts | 186 ++++++++++---------- src/lib/mtproto/transports/http.ts | 2 +- src/lib/mtproto/transports/tcpObfuscated.ts | 52 +++--- 5 files changed, 127 insertions(+), 119 deletions(-) diff --git a/src/config/debug.ts b/src/config/debug.ts index 91c76abc..3d62c9b1 100644 --- a/src/config/debug.ts +++ b/src/config/debug.ts @@ -4,7 +4,7 @@ export const DEBUG = process.env.NODE_ENV !== 'production' || Modes.debug; export const MOUNT_CLASS_TO: any = DEBUG ? (typeof(window) !== 'undefined' ? window : self) : null; export default DEBUG; -export const superDebug = (object: any, key: string) => { +/* export const superDebug = (object: any, key: string) => { var d = object[key]; var beforeStr = '', afterStr = ''; for(var r of d) { @@ -32,4 +32,4 @@ export const superDebug = (object: any, key: string) => { dada(key + '_' + 'after', afterStr); } -MOUNT_CLASS_TO && (MOUNT_CLASS_TO.superDebug = superDebug); +MOUNT_CLASS_TO && (MOUNT_CLASS_TO.superDebug = superDebug); */ diff --git a/src/lib/mtproto/dcConfigurator.ts b/src/lib/mtproto/dcConfigurator.ts index 2ad70463..c658b746 100644 --- a/src/lib/mtproto/dcConfigurator.ts +++ b/src/lib/mtproto/dcConfigurator.ts @@ -106,7 +106,7 @@ export class DcConfigurator { const chosenServer = 'wss://' + subdomain + '.web.telegram.org/' + path; const logSuffix = connectionType === 'upload' ? '-U' : connectionType === 'download' ? '-D' : ''; - const retryTimeout = connectionType === 'client' ? 15000 : 10000; + const retryTimeout = connectionType === 'client' ? 10000 : 10000; const oooohLetMeLive: MTConnectionConstructable = (isSafari && isWebWorker) /* || true */ ? SocketProxied : Socket; diff --git a/src/lib/mtproto/networker.ts b/src/lib/mtproto/networker.ts index a814d74c..78615d41 100644 --- a/src/lib/mtproto/networker.ts +++ b/src/lib/mtproto/networker.ts @@ -12,7 +12,6 @@ import { longToBytes } from '../crypto/crypto_utils'; import MTTransport from './transports/transport'; import { convertToUint8Array, bufferConcat, bytesCmp, bytesToHex } from '../../helpers/bytes'; import { nextRandomInt } from '../../helpers/random'; -import { CancellablePromise } from '../../helpers/cancellablePromise'; import App from '../../config/app'; import DEBUG from '../../config/debug'; import Modes from '../../config/modes'; @@ -108,11 +107,13 @@ export default class MTPNetworker { public isOnline = false; private lastResponseTime = 0; - private disconnectDelay: number; - private pingPromise: CancellablePromise; + + private schedulePromise: Promise; + //private disconnectDelay: number; + //private pingPromise: CancellablePromise; //public onConnectionStatusChange: (online: boolean) => void; - private debugRequests: Array<{before: Uint8Array, after: Uint8Array}> = []; + //private debugRequests: Array<{before: Uint8Array, after: Uint8Array}> = []; constructor(public dcId: number, private authKey: number[], private authKeyId: Uint8Array, serverSalt: number[], private transport: MTTransport, options: InvokeApiOptions = {}) { @@ -641,9 +642,9 @@ export default class MTPNetworker { this.log.error('timeout', message); this.setConnectionStatus(false); - this.getEncryptedOutput(message).then(bytes => { + /* this.getEncryptedOutput(message).then(bytes => { this.log.error('timeout encrypted', bytes); - }); + }); */ }, CONNECTION_TIMEOUT); promise.finally(() => { @@ -683,7 +684,7 @@ export default class MTPNetworker { } */ } - public pushResend(messageId: string, delay = 0) { + public pushResend(messageId: string, delay = 100) { const value = delay ? Date.now() + delay : 0; const sentMessage = this.sentMessages[messageId]; if(sentMessage.container) { @@ -784,8 +785,8 @@ export default class MTPNetworker { }; } - let message: MTPNetworker['sentMessages'][keyof MTPNetworker['sentMessages']]; - const messages: typeof message[] = []; + let outMessage: MTPNetworker['sentMessages'][keyof MTPNetworker['sentMessages']]; + const messages: typeof outMessage[] = []; const currentTime = Date.now(); let messagesByteLen = 0; @@ -796,8 +797,9 @@ export default class MTPNetworker { for(const messageId in this.pendingMessages) { const value = this.pendingMessages[messageId]; - if(!value || value >= currentTime) { - if(message = this.sentMessages[messageId]) { + if(!value || value <= currentTime) { + const message = this.sentMessages[messageId]; + if(message) { /* if(message.fileUpload) { this.log('performScheduledRequest message:', message, message.body.length, (message.body as Uint8Array).byteLength, (message.body as Uint8Array).buffer.byteLength); } */ @@ -810,7 +812,7 @@ export default class MTPNetworker { if(!message.notContentRelated && messagesByteLen && messagesByteLen + messageByteLength > 655360) { // 640 Kb - this.log.warn('lengthOverflow', message); + this.log.warn('lengthOverflow', message, messages); lengthOverflow = true; continue; // maybe break here } @@ -822,6 +824,8 @@ export default class MTPNetworker { } else if(message.longPoll) { hasHttpWait = true; } + + outMessage = message; } else { // this.log(message, messageId) } @@ -855,71 +859,34 @@ export default class MTPNetworker { return; } - const noResponseMsgs: Array = []; + /// #if MTPROTO_HTTP_UPLOAD || MTPROTO_HTTP + const noResponseMsgs: Array = messages.filter(message => message.noResponse).map(message => message.msg_id); + /// #endif if(messages.length > 1) { - const container = new TLSerialization({ - mtproto: true, - startMaxLength: messagesByteLen + 64 - }); - - container.storeInt(0x73f1f8dc, 'CONTAINER[id]'); - container.storeInt(messages.length, 'CONTAINER[count]'); - - const innerMessages: string[] = []; - messages.forEach((message, i) => { - container.storeLong(message.msg_id, 'CONTAINER[' + i + '][msg_id]'); - innerMessages.push(message.msg_id); - container.storeInt(message.seq_no, 'CONTAINER[' + i + '][seq_no]'); - container.storeInt(message.body.length, 'CONTAINER[' + i + '][bytes]'); - container.storeRawBytes(message.body, 'CONTAINER[' + i + '][body]'); - if(message.noResponse) { - noResponseMsgs.push(message.msg_id); - } - }); + const container = this.generateContainerMessage(messagesByteLen, messages); + outMessage = container.messageWithBody; - const containerSentMessage: MTMessage = { - msg_id: timeManager.generateId(), - seq_no: this.generateSeqNo(true), - container: true, - inner: innerMessages - }; - - message = Object.assign({ - body: container.getBytes(true) - }, containerSentMessage); - - this.sentMessages[message.msg_id] = containerSentMessage; - - if(Modes.debug) { - this.log('Container', innerMessages, message.msg_id, message.seq_no); - } + this.sentMessages[outMessage.msg_id] = container.message; } else { - if(message.noResponse) { - noResponseMsgs.push(message.msg_id); - } - - this.sentMessages[message.msg_id] = message; + this.sentMessages[outMessage.msg_id] = outMessage; } this.pendingAcks = []; - const promise = this.sendEncryptedRequest(message); + const promise = this.sendEncryptedRequest(outMessage); /// #if MTPROTO_HTTP_UPLOAD if(!(this.transport instanceof HTTP)) { //if(noResponseMsgs.length) this.log.error('noResponseMsgs length!', noResponseMsgs); this.cleanupSent(); // ! WARNING } else { - this.handleSentEncryptedRequestHTTP(promise, message, noResponseMsgs); + this.handleSentEncryptedRequestHTTP(promise, outMessage, noResponseMsgs); } /// #elif !MTPROTO_HTTP - //if(!(this.transport instanceof HTTP)) { - //if(noResponseMsgs.length) this.log.error('noResponseMsgs length!', noResponseMsgs); - this.cleanupSent(); // ! WARNING - //} else { + this.cleanupSent(); // ! WARNING /// #else - this.handleSentEncryptedRequestHTTP(promise, message, noResponseMsgs); + this.handleSentEncryptedRequestHTTP(promise, outMessage, noResponseMsgs); //} /// #endif @@ -928,6 +895,41 @@ export default class MTPNetworker { } }; + private generateContainerMessage(messagesByteLen: number, messages: MTMessage[]) { + const container = new TLSerialization({ + mtproto: true, + startMaxLength: messagesByteLen + 64 + }); + + container.storeInt(0x73f1f8dc, 'CONTAINER[id]'); + container.storeInt(messages.length, 'CONTAINER[count]'); + + const innerMessages: string[] = []; + messages.forEach((message, i) => { + innerMessages.push(message.msg_id); + container.storeLong(message.msg_id, 'CONTAINER[' + i + '][msg_id]'); + container.storeInt(message.seq_no, 'CONTAINER[' + i + '][seq_no]'); + container.storeInt(message.body.length, 'CONTAINER[' + i + '][bytes]'); + container.storeRawBytes(message.body, 'CONTAINER[' + i + '][body]'); + }); + + const message: MTMessage = { + msg_id: timeManager.generateId(), + seq_no: this.generateSeqNo(true), + container: true, + inner: innerMessages + }; + + if(Modes.debug/* || true */) { + this.log.warn('Container', innerMessages, message.msg_id, message.seq_no); + } + + return { + message, + messageWithBody: Object.assign({body: container.getBytes(true)}, message), + }; + } + public async getEncryptedMessage(dataWithPadding: ArrayBuffer) { const msgKey = await this.getMsgKey(dataWithPadding, true); const keyIv = await this.getAesKeyIv(msgKey, true); @@ -1045,6 +1047,8 @@ export default class MTPNetworker { public sendEncryptedRequest(message: MTMessage) { return this.getEncryptedOutput(message).then(requestData => { + //this.log('sendEncryptedRequest: launching message into space:', message); + const promise: Promise = this.transport.send(requestData) as any; /// #if !MTPROTO_HTTP && !MTPROTO_HTTP_UPLOAD return promise; @@ -1206,27 +1210,20 @@ export default class MTPNetworker { } // ! таймаут очень сильно тормозит скорость работы сокета (даже нулевой) - public scheduleRequest(delay = 0) { - /// #if !MTPROTO_HTTP && !MTPROTO_HTTP_UPLOAD - /* clearTimeout(this.nextReqTimeout); - this.nextReqTimeout = self.setTimeout(this.performScheduledRequest.bind(this), delay || 0); - return; */ - return this.performScheduledRequest(); - /// #else - if(!(this.transport instanceof HTTP)) return this.performScheduledRequest(); - if(this.offline/* && this.transport instanceof HTTP */) { + public scheduleRequest(delay?: number) { + /// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD + if(!(this.transport instanceof HTTP)) { + this.performScheduledRequest(); + return; + } else if(this.offline) { this.checkConnection('forced schedule'); } + /// #endif - /* if(delay && !(this.transport instanceof HTTP)) { - delay = 0; - } */ - - const nextReq = Date.now() + delay; - - if(delay && this.nextReq && this.nextReq <= nextReq) { + const nextReq = Date.now() + (delay || 0); + if(this.nextReq && (delay === undefined || this.nextReq <= nextReq)) { //this.log('scheduleRequest: nextReq', this.nextReq, nextReq); - return false; + return; } //this.log('scheduleRequest: delay', delay); @@ -1235,33 +1232,43 @@ export default class MTPNetworker { return; } */ - const perf = performance.now(); - clearTimeout(this.nextReqTimeout); - this.nextReqTimeout = self.setTimeout(() => { + //const perf = performance.now(); + if(this.nextReqTimeout) { + clearTimeout(this.nextReqTimeout); + } + + const cb = () => { //this.log('scheduleRequest: timeout delay was:', performance.now() - perf); this.nextReqTimeout = 0; + this.nextReq = 0; - /// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD || true if(this.offline) { //this.log('Cancel scheduled'); - return false; + return; } - this.nextReq = 0; - /// #endif - this.performScheduledRequest(); - }, delay); - + }; + this.nextReq = nextReq; - /// #endif + + if(delay) { + this.nextReqTimeout = self.setTimeout(cb, delay); + } else { + cb(); + } } public ackMessage(msgId: string) { // this.log('ack message', msgID) this.pendingAcks.push(msgId); + + /// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD this.scheduleRequest(30000); + /// #else + this.scheduleRequest(); + /// #endif } public reqResendMessage(msgId: string) { @@ -1322,7 +1329,8 @@ export default class MTPNetworker { } /** - * только для сокета, возможно это будет неправильно работать, но в тесте сработало правильно + * * только для сокета + * TODO: consider about containers resend */ public resend() { for(const id in this.sentMessages) { diff --git a/src/lib/mtproto/transports/http.ts b/src/lib/mtproto/transports/http.ts index 64c89a4a..8200a788 100644 --- a/src/lib/mtproto/transports/http.ts +++ b/src/lib/mtproto/transports/http.ts @@ -5,7 +5,7 @@ export default class HTTP implements MTTransport { constructor(protected dcId: number, protected url: string) { } - send = (data: Uint8Array) => { + public send(data: Uint8Array) { return fetch(this.url, {method: 'POST', body: data}).then(response => { //console.log('http response', response/* , response.arrayBuffer() */); diff --git a/src/lib/mtproto/transports/tcpObfuscated.ts b/src/lib/mtproto/transports/tcpObfuscated.ts index dc40ab54..2c8bb660 100644 --- a/src/lib/mtproto/transports/tcpObfuscated.ts +++ b/src/lib/mtproto/transports/tcpObfuscated.ts @@ -40,26 +40,24 @@ export default class TcpObfuscated implements MTTransport { const initPayload = this.obfuscation.init(this.codec); + this.connection.send(initPayload); + if(this.networker) { + this.pending.length = 0; // ! clear queue and reformat messages to container, because if sending simultaneously 10+ messages, connection will die this.networker.setConnectionStatus(true); - - if(this.lastCloseTime) { - this.networker.cleanupSent(); - this.networker.resend(); - } - } - - for(const pending of this.pending) { - if(pending.encoded && pending.body) { - pending.encoded = this.encodeBody(pending.body); + this.networker.cleanupSent(); + this.networker.resend(); + } else { + for(const pending of this.pending) { + if(pending.encoded && pending.body) { + pending.encoded = this.encodeBody(pending.body); + } } } setTimeout(() => { this.releasePending(); }, 0); - - this.connection.send(initPayload); }; private onMessage = (buffer: ArrayBuffer) => { @@ -100,6 +98,11 @@ export default class TcpObfuscated implements MTTransport { private onClose = () => { this.connected = false; + + this.connection.removeListener('open', this.onOpen); + this.connection.removeListener('close', this.onClose); + this.connection.removeListener('message', this.onMessage); + this.connection = undefined; const time = Date.now(); const diff = time - this.lastCloseTime; @@ -107,6 +110,7 @@ export default class TcpObfuscated implements MTTransport { if(this.networker) { this.networker.setConnectionStatus(false); + this.pending.length = 0; } this.log('will try to reconnect after timeout:', needTimeout / 1000); @@ -114,30 +118,26 @@ export default class TcpObfuscated implements MTTransport { this.log('trying to reconnect...'); this.lastCloseTime = Date.now(); - for(const pending of this.pending) { - if(pending.bodySent) { - pending.bodySent = false; + if(!this.networker) { + for(const pending of this.pending) { + if(pending.bodySent) { + pending.bodySent = false; + } } } this.connect(); }, needTimeout); - - this.connection.removeListener('open', this.onOpen); - this.connection.removeListener('close', this.onClose); - this.connection.removeListener('message', this.onMessage); - this.connection = undefined; }; private connect() { this.connection = new this.Connection(this.dcId, this.url, this.logSuffix); - this.connection.addListener('open', this.onOpen); this.connection.addListener('close', this.onClose); this.connection.addListener('message', this.onMessage); } - private encodeBody = (body: Uint8Array) => { + private encodeBody(body: Uint8Array) { const toEncode = this.codec.encodePacket(body); //this.log('send before obf:', /* body.hex, nonce.hex, */ toEncode.hex); @@ -145,9 +145,9 @@ export default class TcpObfuscated implements MTTransport { //this.log('send after obf:', enc.hex); return encoded; - }; + } - public send = (body: Uint8Array) => { + public send(body: Uint8Array) { this.debug && this.log.debug('-> body length to pending:', body.length); const encoded: typeof body = this.connected ? this.encodeBody(body) : undefined; @@ -166,7 +166,7 @@ export default class TcpObfuscated implements MTTransport { return promise; } - }; + } private releasePending(/* tt = false */) { if(!this.connected) { @@ -206,7 +206,7 @@ export default class TcpObfuscated implements MTTransport { } */ if(!encoded) { - encoded = this.encodeBody(body); + encoded = pending.encoded = this.encodeBody(body); } //this.lol.push(body);