Browse Source

Network:

Fix reconnecting
Fix resending answers
Try ping_disconnect_delay again
master
Eduard Kuzmenko 3 years ago
parent
commit
fa4df7ee4b
  1. 4
      src/lib/mtproto/apiManager.ts
  2. 614
      src/lib/mtproto/networker.ts

4
src/lib/mtproto/apiManager.ts

@ -76,6 +76,8 @@ export type ApiError = Partial<{ @@ -76,6 +76,8 @@ export type ApiError = Partial<{
}
} */
const FILE_NETWORKERS_COUNT = 3;
export class ApiManager {
private cachedNetworkers: {
[transportType in TransportType]: {
@ -321,7 +323,7 @@ export class ApiManager { @@ -321,7 +323,7 @@ export class ApiManager {
const networkers = cache[dcId];
// @ts-ignore
const maxNetworkers = connectionType === 'client' || transportType === 'https' ? 1 : (connectionType === 'download' ? 3 : 3);
const maxNetworkers = connectionType === 'client' || transportType === 'https' ? 1 : FILE_NETWORKERS_COUNT;
if(networkers.length >= maxNetworkers) {
let i = networkers.length - 1, found = false;
for(; i >= 0; --i) {

614
src/lib/mtproto/networker.ts

@ -41,6 +41,8 @@ import isObject from '../../helpers/object/isObject'; @@ -41,6 +41,8 @@ import isObject from '../../helpers/object/isObject';
import forEachReverse from '../../helpers/array/forEachReverse';
import sortLongsArray from '../../helpers/long/sortLongsArray';
import randomize from '../../helpers/array/randomize';
import { CancellablePromise, deferredPromise } from '../../helpers/cancellablePromise';
import { pause } from '../../helpers/schedulers/pause';
//console.error('networker included!', new Error().stack);
@ -50,6 +52,7 @@ export type MTMessageOptions = InvokeApiOptions & Partial<{ @@ -50,6 +52,7 @@ export type MTMessageOptions = InvokeApiOptions & Partial<{
notContentRelated: true, // ACK
noSchedule: true,
// withResult: true,
messageId: MTLong,
}>;
@ -62,10 +65,7 @@ export type MTMessage = InvokeApiOptions & MTMessageOptions & { @@ -62,10 +65,7 @@ export type MTMessage = InvokeApiOptions & MTMessageOptions & {
acked?: boolean,
deferred?: {
resolve: any,
reject: any
},
deferred?: CancellablePromise<void>,
container?: boolean,
inner?: MTLong[],
@ -81,8 +81,35 @@ export type MTMessage = InvokeApiOptions & MTMessageOptions & { @@ -81,8 +81,35 @@ export type MTMessage = InvokeApiOptions & MTMessageOptions & {
noResponse?: true, // only with http (http_wait for longPoll)
};
const CONNECTION_TIMEOUT = 5000;
const DRAIN_TIMEOUT = 10000;
const delays: {
[k in 'client' | 'file']: {
disconnectDelayMin: number,
disconnectDelayMax: number,
pingInterval: number,
pingMaxTime: number,
connectionTimeout: number
}
} = {
client: {
disconnectDelayMin: 7,
disconnectDelayMax: 20,
pingInterval: 2000,
pingMaxTime: 5,
connectionTimeout: 5000
},
file: {
disconnectDelayMin: 10,
disconnectDelayMax: 24,
pingInterval: 3000,
pingMaxTime: 7,
connectionTimeout: 7500
}
};
const RESEND_OPTIONS: MTMessageOptions = {
noSchedule: true,
notContentRelated: true
};
let invokeAfterMsgConstructor: number;
export default class MTPNetworker {
@ -100,7 +127,8 @@ export default class MTPNetworker { @@ -100,7 +127,8 @@ export default class MTPNetworker {
private pendingMessages: {[msgId: MTLong]: number} = {};
private pendingAcks: Array<MTLong> = [];
private pendingResends: Array<MTLong> = [];
private pendingResendReq: MTLong[] = [];
// private pendingResendAnsReq: MTLong[] = [];
public connectionInited: boolean;
private nextReqTimeout: number;
@ -123,15 +151,16 @@ export default class MTPNetworker { @@ -123,15 +151,16 @@ export default class MTPNetworker {
private serverSalt: Uint8Array;
private lastResendReq: {
req_msg_id: MTLong,
resend_msg_ids: Array<MTLong>
} | null = null;
reqMsgId: MTLong,
msgIds: MTPNetworker['pendingResendReq']
};
// private lastResendAnsReq: MTPNetworker['lastResendReq'];
private name: string;
private log: ReturnType<typeof logger>;
public isOnline = false;
public status: ConnectionStatus = ConnectionStatus.Closed;
private status: ConnectionStatus = ConnectionStatus.Closed;
private lastResponseTime = 0;
private debug = DEBUG /* && false */ || Modes.debug;
@ -143,12 +172,19 @@ export default class MTPNetworker { @@ -143,12 +172,19 @@ export default class MTPNetworker {
public transport: MTTransport;
//private disconnectDelay: number;
//private pingPromise: CancellablePromise<any>;
/// #if MTPROTO_HAS_WS
private pingDelayDisconnectDeferred: CancellablePromise<string>;
private pingPromise: Promise<void>;
// private pingInterval: number;
private lastPingTime: number;
private lastPingDelayDisconnectId: string;
/// #endif
//public onConnectionStatusChange: (online: boolean) => void;
//private debugRequests: Array<{before: Uint8Array, after: Uint8Array}> = [];
private delays: typeof delays[keyof typeof delays];
constructor(
public dcId: number,
private authKey: Uint8Array,
@ -162,6 +198,7 @@ export default class MTPNetworker { @@ -162,6 +198,7 @@ export default class MTPNetworker {
this.isFileUpload = !!options.fileUpload;
this.isFileDownload = !!options.fileDownload;
this.isFileNetworker = this.isFileUpload || this.isFileDownload;
this.delays = this.isFileNetworker ? delays.file : delays.client;
const suffix = this.isFileUpload ? '-U' : this.isFileDownload ? '-D' : '';
this.name = 'NET-' + dcId + suffix;
@ -182,13 +219,6 @@ export default class MTPNetworker { @@ -182,13 +219,6 @@ export default class MTPNetworker {
// /* rootScope.offline = true
// rootScope.offlineConnecting = true */
// }
// * handle outcoming dead socket, server will close the connection
// if((this.transport as TcpObfuscated).networker) {
// this.disconnectDelay = /* (this.transport as TcpObfuscated).retryTimeout */75;
// //setInterval(this.sendPingDelayDisconnect, (this.disconnectDelay - 5) * 1000);
// this.sendPingDelayDisconnect();
// }
}
private updateSession() {
@ -389,6 +419,19 @@ export default class MTPNetworker { @@ -389,6 +419,19 @@ export default class MTPNetworker {
/// #endif
}
this.log('change transport', transport, oldTransport);
/// #if MTPROTO_HAS_WS
this.clearPingDelayDisconnect();
// if(this.pingInterval !== undefined) {
// clearInterval(this.pingInterval);
// this.pingInterval = undefined;
// }
// this.clearPing();
/// #endif
this.transport = transport;
if(!transport) {
return;
@ -408,9 +451,19 @@ export default class MTPNetworker { @@ -408,9 +451,19 @@ export default class MTPNetworker {
/// #endif
/// #endif
if(transport.connected && (transport as TcpObfuscated).connection) {
this.setConnectionStatus(ConnectionStatus.Connected);
/// #if MTPROTO_HAS_WS
// * handle outcoming dead socket, server will close the connection
if((transport as TcpObfuscated).connection) {
// this.sendPingDelayDisconnect();
if(transport.connected) {
this.setConnectionStatus(ConnectionStatus.Connected);
}
// this.pingInterval = ctx.setInterval(this.sendPing, PING_INTERVAL);
// this.sendPing();
}
/// #endif
this.resend();
}
@ -435,112 +488,125 @@ export default class MTPNetworker { @@ -435,112 +488,125 @@ export default class MTPNetworker {
}
}
// private sendPingDelayDisconnect = () => {
// if(this.pingPromise || true) return;
// if(!this.isOnline) {
// if((this.transport as TcpObfuscated).connected) {
// (this.transport as TcpObfuscated).handleClose();
// }
// return;
// }
// this.log('sendPingDelayDisconnect', this.sentPingTimes);
// /* if(this.tt) clearTimeout(this.tt);
// this.tt = self.setTimeout(() => {
// (this.transport as any).ws.close(1000);
// this.tt = 0;
// }, this.disconnectDelay * 1000); */
// /* this.wrapMtpCall('ping_delay_disconnect', {
// ping_id: randomLong(),
// disconnect_delay: this.disconnectDelay
// }, {
// noResponse: true,
// notContentRelated: true
// }); */
// const deferred = this.pingPromise = deferredPromise<void>();
// const timeoutTime = this.disconnectDelay * 1000;
// /* if(!this.sentPingTimes || true) {
// ++this.sentPingTimes; */
// const startTime = Date.now();
// this.wrapMtpCall('ping', {
// ping_id: randomLong()
// }, {}).then(pong => {
// const elapsedTime = Date.now() - startTime;
// this.log('sendPingDelayDisconnect: response', pong, elapsedTime > timeoutTime);
// if(elapsedTime > timeoutTime) {
// deferred.reject();
// } else {
// setTimeout(deferred.resolve, timeoutTime - elapsedTime);
// }
// }, deferred.reject).finally(() => {
// clearTimeout(rejectTimeout);
// //--this.sentPingTimes;
// });
// //}
// const rejectTimeout = self.setTimeout(deferred.reject, timeoutTime);
// deferred.catch(() => {
// (this.transport as Socket).handleClose();
// });
// deferred.finally(() => {
// this.pingPromise = null;
// this.sendPingDelayDisconnect();
// });
// };
// private sendPingDelayDisconnect = () => {
// if(this.pingPromise || true) return;
// /* if(!this.isOnline) {
// if((this.transport as TcpObfuscated).connected) {
// (this.transport as TcpObfuscated).connection.close();
// }
// return;
// } */
// const deferred = this.pingPromise = deferredPromise<void>();
// const timeoutTime = this.disconnectDelay * 1000;
// const startTime = Date.now();
// this.wrapMtpCall('ping_delay_disconnect', {
// ping_id: randomLong(),
// disconnect_delay: this.disconnectDelay
// }, {}).then(pong => {
// const elapsedTime = Date.now() - startTime;
// this.log('sendPingDelayDisconnect: response', pong, elapsedTime > timeoutTime);
// if(elapsedTime > timeoutTime) {
// deferred.reject();
// } else {
// setTimeout(deferred.resolve, timeoutTime - elapsedTime);
// }
// }, deferred.reject).finally(() => {
// clearTimeout(rejectTimeout);
// //--this.sentPingTimes;
// });
// const rejectTimeout = self.setTimeout(deferred.reject, timeoutTime);
// deferred.catch(() => {
// this.log.error('sendPingDelayDisconnect: catch, closing connection if exists');
// (this.transport as TcpObfuscated).connection.close();
// });
// deferred.finally(() => {
// this.pingPromise = null;
// this.sendPingDelayDisconnect();
// });
// };
/* private clearPing() {
if(this.pingPromise) {
this.pingPromise = undefined;
}
this.lastPingTime = undefined;
}
private sendPing = () => {
// return;
// if(!(this.transport as TcpObfuscated).connected) {
// this.clearPing();
// return;
// }
if(this.pingPromise) {
return;
}
const startTime = Date.now();
this.log('sendPing: ping', startTime);
const promise = this.pingPromise = this.wrapMtpCall('ping', {
ping_id: randomLong()
}, {
notContentRelated: true
}).then(() => {
const elapsedTime = Date.now() - startTime;
this.lastPingTime = elapsedTime / 1000;
this.log('sendPing: pong', elapsedTime);
setTimeout(() => {
if(this.pingPromise !== promise) {
return;
}
this.pingPromise = undefined;
this.sendPing();
}, Math.max(0, PING_INTERVAL - elapsedTime));
});
}; */
private clearPingDelayDisconnect() {
const deferred = this.pingDelayDisconnectDeferred;
this.pingDelayDisconnectDeferred = undefined;
this.lastPingDelayDisconnectId = undefined;
if(deferred) {
deferred.reject();
}
}
private sendPingDelayDisconnect = () => {
// return;
if(this.pingDelayDisconnectDeferred || !this.transport || !this.transport.connected) return;
/* if(!this.isOnline) {
if((this.transport as TcpObfuscated).connected) {
(this.transport as TcpObfuscated).connection.close();
}
return;
} */
const deferred = this.pingDelayDisconnectDeferred = deferredPromise();
const delays = this.delays;
const pingMaxTime = this.delays.pingMaxTime;
const lastPingTime = Math.min(this.lastPingTime ?? 0, pingMaxTime);
const disconnectDelay = Math.round(delays.disconnectDelayMin + lastPingTime / pingMaxTime * (delays.disconnectDelayMax - delays.disconnectDelayMin));
const timeoutTime = disconnectDelay * 1000;
const startTime = Date.now();
const pingId = this.lastPingDelayDisconnectId = randomLong();
const options: MTMessageOptions = {notContentRelated: true};
this.wrapMtpCall('ping_delay_disconnect', {
ping_id: pingId,
disconnect_delay: disconnectDelay
}, options);
this.log(`sendPingDelayDisconnect: ping, timeout=${timeoutTime}, lastPingTime=${this.lastPingTime}, msgId=${options.messageId}`);
const rejectTimeout = self.setTimeout(deferred.reject, timeoutTime);
const onResolved = (reason: string) => {
clearTimeout(rejectTimeout);
const elapsedTime = Date.now() - startTime;
this.lastPingTime = elapsedTime / 1000;
this.log(`sendPingDelayDisconnect: pong, reason='${reason}', time=${lastPingTime}, msgId=${options.messageId}`);
if(elapsedTime > timeoutTime) {
throw undefined;
} else {
return pause(Math.max(0, this.delays.pingInterval - elapsedTime/* timeoutTime - elapsedTime - PING_INTERVAL */));
}
};
const onTimeout = () => {
clearTimeout(rejectTimeout);
const transport = this.transport as TcpObfuscated;
if(this.pingDelayDisconnectDeferred !== deferred || !transport?.connection) {
return;
}
this.log.error('sendPingDelayDisconnect: catch, closing connection', this.lastPingTime, options.messageId);
transport.connection.close();
};
const onFinally = () => {
if(this.pingDelayDisconnectDeferred !== deferred) {
return;
}
this.pingDelayDisconnectDeferred = undefined;
this.sendPingDelayDisconnect();
};
deferred
.then(onResolved)
.catch(onTimeout)
.finally(onFinally);
};
/// #if MTPROTO_HAS_HTTP
private checkLongPoll = () => {
@ -581,6 +647,7 @@ export default class MTPNetworker { @@ -581,6 +647,7 @@ export default class MTPNetworker {
max_wait: maxWait
}, {
noResponse: true,
// notContentRelated: true,
longPoll: true
}).then(() => {
this.longPollPending = undefined;
@ -701,10 +768,12 @@ export default class MTPNetworker { @@ -701,10 +768,12 @@ export default class MTPNetworker {
return false;
}).then((shouldResolve) => {
// clearTimeout(timeout);
const sentMessages = this.sentMessages;
noResponseMsgs.forEach((msgId) => {
if(this.sentMessages[msgId]) {
const deferred = this.sentMessages[msgId].deferred;
delete this.sentMessages[msgId];
const sentMessage = sentMessages[msgId];
if(sentMessage) {
const deferred = sentMessage.deferred;
delete sentMessages[msgId];
delete this.pendingMessages[msgId];
shouldResolve ? deferred.resolve() : deferred.reject();
}
@ -720,30 +789,33 @@ export default class MTPNetworker { @@ -720,30 +789,33 @@ export default class MTPNetworker {
body: Uint8Array | number[],
isAPI?: boolean
}, options: MTMessageOptions) {
const promise = new Promise((resolve, reject) => {
this.sentMessages[message.msg_id] = Object.assign(message, options, options.notContentRelated
? undefined
: {
deferred: {resolve, reject}
}
);
let promise: CancellablePromise<void>;
if(!options.notContentRelated || options.noResponse) {
promise = deferredPromise();
}
this.sentMessages[message.msg_id] = Object.assign(
message,
options,
promise ? {deferred: promise} : undefined
);
//this.log.error('Networker pushMessage:', this.sentMessages[message.msg_id]);
//this.log.error('Networker pushMessage:', this.sentMessages[message.msg_id]);
this.pendingMessages[message.msg_id] = 0;
if(!options.noSchedule) {
this.scheduleRequest();
}
this.pendingMessages[message.msg_id] = 0;
if(!options.noSchedule) {
this.scheduleRequest();
}
if(isObject(options)) {
options.messageId = message.msg_id;
}
});
if(isObject(options)) {
options.messageId = message.msg_id;
}
if(!options.notContentRelated && !options.noResponse) {
if(promise) {
const canIncrement = !options.notContentRelated;
const timeout = setTimeout(() => {
if(this.lastResponseTime && (Date.now() - this.lastResponseTime) < CONNECTION_TIMEOUT) {
if(this.lastResponseTime && (Date.now() - this.lastResponseTime) < this.delays.connectionTimeout) {
return;
}
@ -755,20 +827,24 @@ export default class MTPNetworker { @@ -755,20 +827,24 @@ export default class MTPNetworker {
/* this.getEncryptedOutput(message).then(bytes => {
this.log.error('timeout encrypted', bytes);
}); */
}, CONNECTION_TIMEOUT);
}, this.delays.connectionTimeout);
promise.catch(noop).finally(() => {
clearTimeout(timeout);
this.setConnectionStatus(ConnectionStatus.Connected);
--this.activeRequests;
this.setDrainTimeout();
if(canIncrement) {
--this.activeRequests;
this.setDrainTimeout();
}
});
++this.activeRequests;
if(this.onDrainTimeout !== undefined) {
clearTimeout(this.onDrainTimeout);
this.onDrainTimeout = undefined;
if(canIncrement) {
++this.activeRequests;
if(this.onDrainTimeout !== undefined) {
clearTimeout(this.onDrainTimeout);
this.onDrainTimeout = undefined;
}
}
}
@ -809,11 +885,10 @@ export default class MTPNetworker { @@ -809,11 +885,10 @@ export default class MTPNetworker {
this.scheduleRequest();
}
// if((this.transport as TcpObfuscated).networker) {
// this.sendPingDelayDisconnect();
// }
/* this.sentPingTimes = 0;
this.sendPingDelayDisconnect(); */
if((this.transport as TcpObfuscated).connection) {
this.clearPingDelayDisconnect();
this.sendPingDelayDisconnect();
}
}
/* if(this.onConnectionStatusChange) {
this.onConnectionStatusChange(this.isOnline);
@ -890,8 +965,6 @@ export default class MTPNetworker { @@ -890,8 +965,6 @@ export default class MTPNetworker {
}
private performScheduledRequest() {
// this.log('scheduled', this.dcId, this.iii)
if(this.isStopped()) {
return false;
}
@ -909,27 +982,42 @@ export default class MTPNetworker { @@ -909,27 +982,42 @@ export default class MTPNetworker {
});
}
if(this.pendingResends.length) {
const resendMsgIds = this.pendingResends.slice();
const resendOpts: MTMessageOptions = {
noSchedule: true,
notContentRelated: true,
messageId: '' // will set in wrapMtpMessage->pushMessage
};
//this.log('resendReq messages', resendMsgIds);
const pendingResendReqLength = this.pendingResendReq.length;
if(pendingResendReqLength) {
const options: MTMessageOptions = {...RESEND_OPTIONS};
const msgIds = this.pendingResendReq.splice(0, pendingResendReqLength);
this.wrapMtpMessage({
_: 'msg_resend_req',
msg_ids: resendMsgIds
}, resendOpts);
msg_ids: msgIds
}, options);
this.log('resend: resending requests', options.messageId, msgIds);
/* this.lastResendReq = {
reqMsgId: options.messageId,
msgIds: msgIds
}; */
this.lastResendReq = {
req_msg_id: resendOpts.messageId,
resend_msg_ids: resendMsgIds
};
// this.pendingResendReq.length = 0;
}
// if(this.pendingResendAnsReq.length) {
// const options: MTMessageOptions = {...RESEND_OPTIONS};
// const msgIds = this.pendingResendAnsReq.slice();
// this.wrapMtpMessage({
// _: 'msg_resend_ans_req',
// msg_ids: msgIds
// }, options);
// this.log('resend: requesting answers', options.messageId, msgIds);
// this.lastResendAnsReq = {
// reqMsgId: options.messageId,
// msgIds: msgIds
// };
// // this.pendingResendAnsReq.length = 0;
// }
let outMessage: MTPNetworker['sentMessages'][keyof MTPNetworker['sentMessages']];
let outMessage: MTMessage;
const messages: typeof outMessage[] = [];
//const currentTime = Date.now();
@ -1197,45 +1285,45 @@ export default class MTPNetworker { @@ -1197,45 +1285,45 @@ export default class MTPNetworker {
});
}
private sendEncryptedRequest(message: MTMessage) {
return this.getEncryptedOutput(message).then(requestData => {
this.debug && this.log.debug('sendEncryptedRequest: launching message into space:', message, [message.msg_id].concat(message.inner || []), requestData.length);
const promise: Promise<Uint8Array> = this.transport.send(requestData) as any;
// this.debug && this.log.debug('sendEncryptedRequest: launched message into space:', message, promise);
/// #if !MTPROTO_HAS_HTTP
return promise;
/// #else
/// #if MTPROTO_HAS_WS
if(!(this.transport instanceof HTTP)) return promise;
/// #endif
const baseError = {
code: 406,
type: 'NETWORK_BAD_RESPONSE',
transport: this.transport
};
private async sendEncryptedRequest(message: MTMessage) {
const requestData = await this.getEncryptedOutput(message);
this.debug && this.log.debug('sending:', message, [message.msg_id].concat(message.inner || []), requestData.length);
const promise: Promise<Uint8Array> = this.transport.send(requestData) as any;
// this.debug && this.log.debug('sendEncryptedRequest: launched message into space:', message, promise);
/// #if !MTPROTO_HAS_HTTP
return promise;
/// #else
/// #if MTPROTO_HAS_WS
if(!(this.transport instanceof HTTP)) return promise;
/// #endif
const baseError = {
code: 406,
type: 'NETWORK_BAD_RESPONSE',
transport: this.transport
};
return promise.then((result) => {
if(!result?.byteLength) {
throw baseError;
}
return promise.then((result) => {
if(!result?.byteLength) {
throw baseError;
}
// this.debug && this.log.debug('sendEncryptedRequest: got response for:', message, [message.msg_id].concat(message.inner || []));
return result;
}, (error) => {
if(!error.message && !error.type) {
error = Object.assign(baseError, {
type: 'NETWORK_BAD_REQUEST',
originalError: error
});
}
// this.debug && this.log.debug('sendEncryptedRequest: got response for:', message, [message.msg_id].concat(message.inner || []));
return result;
}, (error) => {
if(!error.message && !error.type) {
error = Object.assign(baseError, {
type: 'NETWORK_BAD_REQUEST',
originalError: error
});
}
throw error;
});
/// #endif
throw error;
});
/// #endif
}
public parseResponse(responseBuffer: Uint8Array) {
@ -1451,35 +1539,37 @@ export default class MTPNetworker { @@ -1451,35 +1539,37 @@ export default class MTPNetworker {
this.scheduleRequest(delay);
}
private reqResendMessage(msgId: MTLong) {
private reqResend(msgId: MTLong/* , isAnswer?: boolean */) {
if(this.debug) {
this.log.debug('Req resend', msgId);
this.log.debug('Req resend', msgId/* , isAnswer */);
}
this.pendingResends.push(msgId);
// (isAnswer ? this.pendingResendAnsReq : this.pendingResendReq).push(msgId);
this.pendingResendReq.push(msgId);
this.scheduleRequest(100);
}
public cleanupSent() {
let notEmpty = false;
// this.log('clean start', this.dcId/*, this.sentMessages*/)
Object.keys(this.sentMessages).forEach((msgId) => {
const message = this.sentMessages[msgId];
const sentMessages = this.sentMessages;
// this.log('clean start', this.dcId/*, sentMessages*/)
Object.keys(sentMessages).forEach((msgId) => {
const message = sentMessages[msgId];
// this.log('clean iter', msgID, message)
if(message.notContentRelated && this.pendingMessages[msgId] === undefined) {
// this.log('clean notContentRelated', msgID)
delete this.sentMessages[msgId];
delete sentMessages[msgId];
} else if(message.container) {
for(const innerMsgId of message.inner) {
if(this.sentMessages[innerMsgId] !== undefined) {
// this.log('clean failed, found', msgID, message.inner[i], this.sentMessages[message.inner[i]].seq_no)
if(sentMessages[innerMsgId] !== undefined) {
// this.log('clean failed, found', msgID, message.inner[i], sentMessages[message.inner[i]].seq_no)
notEmpty = true;
return;
}
}
// this.log('clean container', msgID)
delete this.sentMessages[msgId];
delete sentMessages[msgId];
} else {
notEmpty = true;
}
@ -1513,12 +1603,18 @@ export default class MTPNetworker { @@ -1513,12 +1603,18 @@ export default class MTPNetworker {
* TODO: consider about containers resend
*/
public resend() {
for(const id in this.sentMessages) {
const msg = this.sentMessages[id];
const sentMessages = this.sentMessages;
for(const id in sentMessages) {
const msg = sentMessages[id];
if(msg.body || msg.container) {
this.pushResend(id);
}
}
if((this.transport as TcpObfuscated).connection) {
this.clearPingDelayDisconnect();
this.sendPingDelayDisconnect();
}
}
/* public requestMessageStatus() {
@ -1555,9 +1651,13 @@ export default class MTPNetworker { @@ -1555,9 +1651,13 @@ export default class MTPNetworker {
return;
}
/* if(this.debug) {
this.log('process message', message, messageId, sessionId);
} */
if(this.debug) {
this.log.debug('process message', message, messageId);
}
if(this.pingDelayDisconnectDeferred) {
this.pingDelayDisconnectDeferred.resolve('any message');
}
switch(message._) {
case 'msg_container': {
@ -1671,28 +1771,41 @@ export default class MTPNetworker { @@ -1671,28 +1771,41 @@ export default class MTPNetworker {
break;
}
case 'msg_detailed_info':
if(!this.sentMessages[message.msg_id]) {
case 'msg_detailed_info': {
const sentMessage = this.sentMessages[message.msg_id];
if(!sentMessage) {
this.ackMessage(message.answer_msg_id);
break;
}/* else if(sentMessage.acked) {
this.reqResend(message.answer_msg_id, true);
}
case 'msg_new_detailed_info':
if(this.pendingAcks.indexOf(message.answer_msg_id)) {
break; */
}
case 'msg_new_detailed_info': {
if(this.pendingAcks.indexOf(message.answer_msg_id) !== -1) {
break;
}
this.reqResendMessage(message.answer_msg_id);
this.reqResend(message.answer_msg_id);
break;
}
case 'msgs_state_info': {
this.ackMessage(message.answer_msg_id);
if(this.lastResendReq &&
this.lastResendReq.req_msg_id === message.req_msg_id &&
this.pendingResends.length
) {
for(const badMsgId of this.lastResendReq.resend_msg_ids) {
const pos = this.pendingResends.indexOf(badMsgId);
if(pos !== -1) {
this.pendingResends.splice(pos, 1);
const arr = [
[this.lastResendReq, this.pendingResendReq] as const
// [this.lastResendAnsReq, this.pendingResendAnsReq] as const
];
for(const [lastResend, pendingResend] of arr) {
if(lastResend?.reqMsgId === message.req_msg_id && pendingResend.length) {
for(const badMsgId of lastResend.msgIds) {
const pos = pendingResend.indexOf(badMsgId);
if(pos !== -1) {
pendingResend.splice(pos, 1);
}
}
}
}
@ -1706,6 +1819,10 @@ export default class MTPNetworker { @@ -1706,6 +1819,10 @@ export default class MTPNetworker {
const sentMessageId = message.req_msg_id;
const sentMessage = this.sentMessages[sentMessageId];
// if(this.debug) {
// this.log.debug('Rpc response', message.result, sentMessage);
// }
this.processMessageAck(sentMessageId);
if(sentMessage) {
const deferred = sentMessage.deferred;
@ -1717,16 +1834,11 @@ export default class MTPNetworker { @@ -1717,16 +1834,11 @@ export default class MTPNetworker {
}
} else {
if(deferred) {
/* if(DEBUG) {
this.log.debug('Rpc response', message.result, sentMessage);
} */
deferred.resolve(message.result);
}
if(sentMessage.isAPI && !this.connectionInited) {
this.connectionInited = true;
////this.log('Rpc set connectionInited to:', this.connectionInited);
}
}
@ -1741,12 +1853,16 @@ export default class MTPNetworker { @@ -1741,12 +1853,16 @@ export default class MTPNetworker {
}
case 'pong': { // * https://core.telegram.org/mtproto/service_messages#ping-messages-pingpong - These messages don't require acknowledgments
const sentMessageId = message.msg_id;
/* const sentMessageId = message.msg_id;
const sentMessage = this.sentMessages[sentMessageId];
if(sentMessage) {
sentMessage.deferred.resolve(message);
delete this.sentMessages[sentMessageId];
} */
const pingId = message.ping_id;
if(this.lastPingDelayDisconnectId === pingId) {
this.pingDelayDisconnectDeferred.resolve('pong');
}
break;

Loading…
Cancel
Save