Fix mtproto containers

Fix resending
This commit is contained in:
Eduard Kuzmenko 2021-02-26 19:30:47 +04:00
parent cc518970e5
commit 6f4de7e0f0
5 changed files with 127 additions and 119 deletions

View File

@ -4,7 +4,7 @@ export const DEBUG = process.env.NODE_ENV !== 'production' || Modes.debug;
export const MOUNT_CLASS_TO: any = DEBUG ? (typeof(window) !== 'undefined' ? window : self) : null;
export default DEBUG;
export const superDebug = (object: any, key: string) => {
/* export const superDebug = (object: any, key: string) => {
var d = object[key];
var beforeStr = '', afterStr = '';
for(var r of d) {
@ -32,4 +32,4 @@ export const superDebug = (object: any, key: string) => {
dada(key + '_' + 'after', afterStr);
}
MOUNT_CLASS_TO && (MOUNT_CLASS_TO.superDebug = superDebug);
MOUNT_CLASS_TO && (MOUNT_CLASS_TO.superDebug = superDebug); */

View File

@ -106,7 +106,7 @@ export class DcConfigurator {
const chosenServer = 'wss://' + subdomain + '.web.telegram.org/' + path;
const logSuffix = connectionType === 'upload' ? '-U' : connectionType === 'download' ? '-D' : '';
const retryTimeout = connectionType === 'client' ? 15000 : 10000;
const retryTimeout = connectionType === 'client' ? 10000 : 10000;
const oooohLetMeLive: MTConnectionConstructable = (isSafari && isWebWorker) /* || true */ ? SocketProxied : Socket;

View File

@ -12,7 +12,6 @@ import { longToBytes } from '../crypto/crypto_utils';
import MTTransport from './transports/transport';
import { convertToUint8Array, bufferConcat, bytesCmp, bytesToHex } from '../../helpers/bytes';
import { nextRandomInt } from '../../helpers/random';
import { CancellablePromise } from '../../helpers/cancellablePromise';
import App from '../../config/app';
import DEBUG from '../../config/debug';
import Modes from '../../config/modes';
@ -108,11 +107,13 @@ export default class MTPNetworker {
public isOnline = false;
private lastResponseTime = 0;
private disconnectDelay: number;
private pingPromise: CancellablePromise<any>;
private schedulePromise: Promise<any>;
//private disconnectDelay: number;
//private pingPromise: CancellablePromise<any>;
//public onConnectionStatusChange: (online: boolean) => void;
private debugRequests: Array<{before: Uint8Array, after: Uint8Array}> = [];
//private debugRequests: Array<{before: Uint8Array, after: Uint8Array}> = [];
constructor(public dcId: number, private authKey: number[], private authKeyId: Uint8Array,
serverSalt: number[], private transport: MTTransport, options: InvokeApiOptions = {}) {
@ -641,9 +642,9 @@ export default class MTPNetworker {
this.log.error('timeout', message);
this.setConnectionStatus(false);
this.getEncryptedOutput(message).then(bytes => {
/* this.getEncryptedOutput(message).then(bytes => {
this.log.error('timeout encrypted', bytes);
});
}); */
}, CONNECTION_TIMEOUT);
promise.finally(() => {
@ -683,7 +684,7 @@ export default class MTPNetworker {
} */
}
public pushResend(messageId: string, delay = 0) {
public pushResend(messageId: string, delay = 100) {
const value = delay ? Date.now() + delay : 0;
const sentMessage = this.sentMessages[messageId];
if(sentMessage.container) {
@ -784,8 +785,8 @@ export default class MTPNetworker {
};
}
let message: MTPNetworker['sentMessages'][keyof MTPNetworker['sentMessages']];
const messages: typeof message[] = [];
let outMessage: MTPNetworker['sentMessages'][keyof MTPNetworker['sentMessages']];
const messages: typeof outMessage[] = [];
const currentTime = Date.now();
let messagesByteLen = 0;
@ -796,8 +797,9 @@ export default class MTPNetworker {
for(const messageId in this.pendingMessages) {
const value = this.pendingMessages[messageId];
if(!value || value >= currentTime) {
if(message = this.sentMessages[messageId]) {
if(!value || value <= currentTime) {
const message = this.sentMessages[messageId];
if(message) {
/* if(message.fileUpload) {
this.log('performScheduledRequest message:', message, message.body.length, (message.body as Uint8Array).byteLength, (message.body as Uint8Array).buffer.byteLength);
} */
@ -810,7 +812,7 @@ export default class MTPNetworker {
if(!message.notContentRelated &&
messagesByteLen &&
messagesByteLen + messageByteLength > 655360) { // 640 Kb
this.log.warn('lengthOverflow', message);
this.log.warn('lengthOverflow', message, messages);
lengthOverflow = true;
continue; // maybe break here
}
@ -822,6 +824,8 @@ export default class MTPNetworker {
} else if(message.longPoll) {
hasHttpWait = true;
}
outMessage = message;
} else {
// this.log(message, messageId)
}
@ -855,71 +859,34 @@ export default class MTPNetworker {
return;
}
const noResponseMsgs: Array<string> = [];
/// #if MTPROTO_HTTP_UPLOAD || MTPROTO_HTTP
const noResponseMsgs: Array<string> = messages.filter(message => message.noResponse).map(message => message.msg_id);
/// #endif
if(messages.length > 1) {
const container = new TLSerialization({
mtproto: true,
startMaxLength: messagesByteLen + 64
});
container.storeInt(0x73f1f8dc, 'CONTAINER[id]');
container.storeInt(messages.length, 'CONTAINER[count]');
const innerMessages: string[] = [];
messages.forEach((message, i) => {
container.storeLong(message.msg_id, 'CONTAINER[' + i + '][msg_id]');
innerMessages.push(message.msg_id);
container.storeInt(message.seq_no, 'CONTAINER[' + i + '][seq_no]');
container.storeInt(message.body.length, 'CONTAINER[' + i + '][bytes]');
container.storeRawBytes(message.body, 'CONTAINER[' + i + '][body]');
if(message.noResponse) {
noResponseMsgs.push(message.msg_id);
}
});
const container = this.generateContainerMessage(messagesByteLen, messages);
outMessage = container.messageWithBody;
const containerSentMessage: MTMessage = {
msg_id: timeManager.generateId(),
seq_no: this.generateSeqNo(true),
container: true,
inner: innerMessages
};
message = Object.assign({
body: container.getBytes(true)
}, containerSentMessage);
this.sentMessages[message.msg_id] = containerSentMessage;
if(Modes.debug) {
this.log('Container', innerMessages, message.msg_id, message.seq_no);
}
this.sentMessages[outMessage.msg_id] = container.message;
} else {
if(message.noResponse) {
noResponseMsgs.push(message.msg_id);
}
this.sentMessages[message.msg_id] = message;
this.sentMessages[outMessage.msg_id] = outMessage;
}
this.pendingAcks = [];
const promise = this.sendEncryptedRequest(message);
const promise = this.sendEncryptedRequest(outMessage);
/// #if MTPROTO_HTTP_UPLOAD
if(!(this.transport instanceof HTTP)) {
//if(noResponseMsgs.length) this.log.error('noResponseMsgs length!', noResponseMsgs);
this.cleanupSent(); // ! WARNING
} else {
this.handleSentEncryptedRequestHTTP(promise, message, noResponseMsgs);
this.handleSentEncryptedRequestHTTP(promise, outMessage, noResponseMsgs);
}
/// #elif !MTPROTO_HTTP
//if(!(this.transport instanceof HTTP)) {
//if(noResponseMsgs.length) this.log.error('noResponseMsgs length!', noResponseMsgs);
this.cleanupSent(); // ! WARNING
//} else {
this.cleanupSent(); // ! WARNING
/// #else
this.handleSentEncryptedRequestHTTP(promise, message, noResponseMsgs);
this.handleSentEncryptedRequestHTTP(promise, outMessage, noResponseMsgs);
//}
/// #endif
@ -928,6 +895,41 @@ export default class MTPNetworker {
}
};
private generateContainerMessage(messagesByteLen: number, messages: MTMessage[]) {
const container = new TLSerialization({
mtproto: true,
startMaxLength: messagesByteLen + 64
});
container.storeInt(0x73f1f8dc, 'CONTAINER[id]');
container.storeInt(messages.length, 'CONTAINER[count]');
const innerMessages: string[] = [];
messages.forEach((message, i) => {
innerMessages.push(message.msg_id);
container.storeLong(message.msg_id, 'CONTAINER[' + i + '][msg_id]');
container.storeInt(message.seq_no, 'CONTAINER[' + i + '][seq_no]');
container.storeInt(message.body.length, 'CONTAINER[' + i + '][bytes]');
container.storeRawBytes(message.body, 'CONTAINER[' + i + '][body]');
});
const message: MTMessage = {
msg_id: timeManager.generateId(),
seq_no: this.generateSeqNo(true),
container: true,
inner: innerMessages
};
if(Modes.debug/* || true */) {
this.log.warn('Container', innerMessages, message.msg_id, message.seq_no);
}
return {
message,
messageWithBody: Object.assign({body: container.getBytes(true)}, message),
};
}
public async getEncryptedMessage(dataWithPadding: ArrayBuffer) {
const msgKey = await this.getMsgKey(dataWithPadding, true);
const keyIv = await this.getAesKeyIv(msgKey, true);
@ -1045,6 +1047,8 @@ export default class MTPNetworker {
public sendEncryptedRequest(message: MTMessage) {
return this.getEncryptedOutput(message).then(requestData => {
//this.log('sendEncryptedRequest: launching message into space:', message);
const promise: Promise<Uint8Array> = this.transport.send(requestData) as any;
/// #if !MTPROTO_HTTP && !MTPROTO_HTTP_UPLOAD
return promise;
@ -1206,27 +1210,20 @@ export default class MTPNetworker {
}
// ! таймаут очень сильно тормозит скорость работы сокета (даже нулевой)
public scheduleRequest(delay = 0) {
/// #if !MTPROTO_HTTP && !MTPROTO_HTTP_UPLOAD
/* clearTimeout(this.nextReqTimeout);
this.nextReqTimeout = self.setTimeout(this.performScheduledRequest.bind(this), delay || 0);
return; */
return this.performScheduledRequest();
/// #else
if(!(this.transport instanceof HTTP)) return this.performScheduledRequest();
if(this.offline/* && this.transport instanceof HTTP */) {
public scheduleRequest(delay?: number) {
/// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD
if(!(this.transport instanceof HTTP)) {
this.performScheduledRequest();
return;
} else if(this.offline) {
this.checkConnection('forced schedule');
}
/// #endif
/* if(delay && !(this.transport instanceof HTTP)) {
delay = 0;
} */
const nextReq = Date.now() + delay;
if(delay && this.nextReq && this.nextReq <= nextReq) {
const nextReq = Date.now() + (delay || 0);
if(this.nextReq && (delay === undefined || this.nextReq <= nextReq)) {
//this.log('scheduleRequest: nextReq', this.nextReq, nextReq);
return false;
return;
}
//this.log('scheduleRequest: delay', delay);
@ -1235,33 +1232,43 @@ export default class MTPNetworker {
return;
} */
const perf = performance.now();
clearTimeout(this.nextReqTimeout);
this.nextReqTimeout = self.setTimeout(() => {
//const perf = performance.now();
if(this.nextReqTimeout) {
clearTimeout(this.nextReqTimeout);
}
const cb = () => {
//this.log('scheduleRequest: timeout delay was:', performance.now() - perf);
this.nextReqTimeout = 0;
this.nextReq = 0;
/// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD || true
if(this.offline) {
//this.log('Cancel scheduled');
return false;
return;
}
this.nextReq = 0;
/// #endif
this.performScheduledRequest();
}, delay);
};
this.nextReq = nextReq;
/// #endif
if(delay) {
this.nextReqTimeout = self.setTimeout(cb, delay);
} else {
cb();
}
}
public ackMessage(msgId: string) {
// this.log('ack message', msgID)
this.pendingAcks.push(msgId);
/// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD
this.scheduleRequest(30000);
/// #else
this.scheduleRequest();
/// #endif
}
public reqResendMessage(msgId: string) {
@ -1322,7 +1329,8 @@ export default class MTPNetworker {
}
/**
* только для сокета, возможно это будет неправильно работать, но в тесте сработало правильно
* * только для сокета
* TODO: consider about containers resend
*/
public resend() {
for(const id in this.sentMessages) {

View File

@ -5,7 +5,7 @@ export default class HTTP implements MTTransport {
constructor(protected dcId: number, protected url: string) {
}
send = (data: Uint8Array) => {
public send(data: Uint8Array) {
return fetch(this.url, {method: 'POST', body: data}).then(response => {
//console.log('http response', response/* , response.arrayBuffer() */);

View File

@ -40,26 +40,24 @@ export default class TcpObfuscated implements MTTransport {
const initPayload = this.obfuscation.init(this.codec);
this.connection.send(initPayload);
if(this.networker) {
this.pending.length = 0; // ! clear queue and reformat messages to container, because if sending simultaneously 10+ messages, connection will die
this.networker.setConnectionStatus(true);
if(this.lastCloseTime) {
this.networker.cleanupSent();
this.networker.resend();
}
}
for(const pending of this.pending) {
if(pending.encoded && pending.body) {
pending.encoded = this.encodeBody(pending.body);
this.networker.cleanupSent();
this.networker.resend();
} else {
for(const pending of this.pending) {
if(pending.encoded && pending.body) {
pending.encoded = this.encodeBody(pending.body);
}
}
}
setTimeout(() => {
this.releasePending();
}, 0);
this.connection.send(initPayload);
};
private onMessage = (buffer: ArrayBuffer) => {
@ -100,6 +98,11 @@ export default class TcpObfuscated implements MTTransport {
private onClose = () => {
this.connected = false;
this.connection.removeListener('open', this.onOpen);
this.connection.removeListener('close', this.onClose);
this.connection.removeListener('message', this.onMessage);
this.connection = undefined;
const time = Date.now();
const diff = time - this.lastCloseTime;
@ -107,6 +110,7 @@ export default class TcpObfuscated implements MTTransport {
if(this.networker) {
this.networker.setConnectionStatus(false);
this.pending.length = 0;
}
this.log('will try to reconnect after timeout:', needTimeout / 1000);
@ -114,30 +118,26 @@ export default class TcpObfuscated implements MTTransport {
this.log('trying to reconnect...');
this.lastCloseTime = Date.now();
for(const pending of this.pending) {
if(pending.bodySent) {
pending.bodySent = false;
if(!this.networker) {
for(const pending of this.pending) {
if(pending.bodySent) {
pending.bodySent = false;
}
}
}
this.connect();
}, needTimeout);
this.connection.removeListener('open', this.onOpen);
this.connection.removeListener('close', this.onClose);
this.connection.removeListener('message', this.onMessage);
this.connection = undefined;
};
private connect() {
this.connection = new this.Connection(this.dcId, this.url, this.logSuffix);
this.connection.addListener('open', this.onOpen);
this.connection.addListener('close', this.onClose);
this.connection.addListener('message', this.onMessage);
}
private encodeBody = (body: Uint8Array) => {
private encodeBody(body: Uint8Array) {
const toEncode = this.codec.encodePacket(body);
//this.log('send before obf:', /* body.hex, nonce.hex, */ toEncode.hex);
@ -145,9 +145,9 @@ export default class TcpObfuscated implements MTTransport {
//this.log('send after obf:', enc.hex);
return encoded;
};
}
public send = (body: Uint8Array) => {
public send(body: Uint8Array) {
this.debug && this.log.debug('-> body length to pending:', body.length);
const encoded: typeof body = this.connected ? this.encodeBody(body) : undefined;
@ -166,7 +166,7 @@ export default class TcpObfuscated implements MTTransport {
return promise;
}
};
}
private releasePending(/* tt = false */) {
if(!this.connected) {
@ -206,7 +206,7 @@ export default class TcpObfuscated implements MTTransport {
} */
if(!encoded) {
encoded = this.encodeBody(body);
encoded = pending.encoded = this.encodeBody(body);
}
//this.lol.push(body);