From fceec8e8e8a1273e972d3edb11cee91a109bf70e Mon Sep 17 00:00:00 2001 From: Eduard Kuzmenko Date: Mon, 4 Apr 2022 03:16:22 +0300 Subject: [PATCH] Network: Fix stuck requests because of drain race condition Fix releasing transport on drain --- src/components/chat/bubbles.ts | 5 +- src/helpers/dom/getViewportSlice.ts | 4 +- src/lib/mtproto/apiManager.ts | 77 ++++++++++++++--------------- src/lib/mtproto/networker.ts | 72 ++++++++++++++------------- src/scss/style.scss | 2 +- 5 files changed, 83 insertions(+), 77 deletions(-) diff --git a/src/components/chat/bubbles.ts b/src/components/chat/bubbles.ts index d7a00d74..8bc1d6b1 100644 --- a/src/components/chat/bubbles.ts +++ b/src/components/chat/bubbles.ts @@ -332,7 +332,10 @@ export default class ChatBubbles { const timeSpan = div.querySelector('.time'); const newDiv = wrapDocument({message}); div.replaceWith(newDiv); - newDiv.querySelector('.document-size').append(timeSpan); + + if(timeSpan) { + newDiv.querySelector('.document-size').append(timeSpan); + } } if(container) { diff --git a/src/helpers/dom/getViewportSlice.ts b/src/helpers/dom/getViewportSlice.ts index ef24fc67..a370f721 100644 --- a/src/helpers/dom/getViewportSlice.ts +++ b/src/helpers/dom/getViewportSlice.ts @@ -13,7 +13,7 @@ export default function getViewportSlice({overflowElement, selector, extraSize}: selector: string, extraSize?: number }) { - const perf = performance.now(); + // const perf = performance.now(); const overflowRect = overflowElement.getBoundingClientRect(); const elements = Array.from(overflowElement.querySelectorAll(selector)); @@ -67,7 +67,7 @@ export default function getViewportSlice({overflowElement, selector, extraSize}: } } - console.log('getViewportSlice time:', performance.now() - perf); + // console.log('getViewportSlice time:', performance.now() - perf); return {invisibleTop, visible, invisibleBottom}; } diff --git a/src/lib/mtproto/apiManager.ts b/src/lib/mtproto/apiManager.ts index bd8bbaac..eab8ca0b 100644 --- a/src/lib/mtproto/apiManager.ts +++ b/src/lib/mtproto/apiManager.ts @@ -40,6 +40,7 @@ import rootScope from '../rootScope'; /// #if MTPROTO_AUTO import transportController from './transports/controller'; import MTTransport from './transports/transport'; +import { pause } from '../../helpers/schedulers/pause'; /// #endif /* var networker = apiManager.cachedNetworkers.websocket.upload[2]; @@ -407,7 +408,7 @@ export class ApiManager { }); } - private changeNetworkerTransport(networker: MTPNetworker, transport: MTTransport) { + private changeNetworkerTransport(networker: MTPNetworker, transport?: MTTransport) { const oldTransport = networker.transport; if(oldTransport) { DcConfigurator.removeTransport(dcConfigurator.chosenServers, oldTransport); @@ -434,6 +435,7 @@ export class ApiManager { this.log('networker drain', networker.dcId); networker.onDrain = undefined; + this.changeNetworkerTransport(networker); networker.destroy(); networkerFactory.removeNetworker(networker); DcConfigurator.removeTransport(this.cachedNetworkers, networker); @@ -475,14 +477,12 @@ export class ApiManager { }); } - const rejectPromise = (error: ApiError) => { + const rejectPromise = async(error: ApiError) => { if(!error) { error = {type: 'ERROR_EMPTY'}; } else if(!isObject(error)) { error = {message: error}; } - - deferred.reject(error); if((error.code === 401 && error.type === 'SESSION_REVOKED') || (error.code === 406 && error.type === 'AUTH_KEY_DUPLICATED')) { @@ -490,7 +490,7 @@ export class ApiManager { } if(options.ignoreErrors) { - return; + throw error; } if(error.code === 406) { @@ -512,13 +512,15 @@ export class ApiManager { } }, 100); } + + throw error; }; let dcId: DcId; let cachedNetworker: MTPNetworker; let stack = (new Error()).stack || 'empty stack'; - const performRequest = (networker: MTPNetworker) => { + const performRequest = (): Promise => { if(afterMessageId) { const after = this.afterMessageTempIds[afterMessageId]; if(after) { @@ -526,7 +528,7 @@ export class ApiManager { } } - const promise = (cachedNetworker = networker).wrapApiCall(method, params, options); + const promise = cachedNetworker.wrapApiCall(method, params, options); if(prepareTempMessageId) { this.afterMessageTempIds[prepareTempMessageId] = { @@ -535,7 +537,7 @@ export class ApiManager { }; } - return promise.then(deferred.resolve, (error: ApiError) => { + return promise.catch((error: ApiError) => { //if(!options.ignoreErrors) { if(error.type !== 'FILE_REFERENCE_EXPIRED'/* && error.type !== 'MSG_WAIT_FAILED' */) { this.log.error('Error', error.code, error.type, this.baseDcId, dcId, method, params); @@ -548,7 +550,7 @@ export class ApiManager { //this.telegramMeNotify(false); } - rejectPromise(error); + throw error; } else if(error.code === 401 && this.baseDcId && dcId !== this.baseDcId) { if(this.cachedExportPromise[dcId] === undefined) { const promise = new Promise((exportResolve, exportReject) => { @@ -563,10 +565,7 @@ export class ApiManager { this.cachedExportPromise[dcId] = promise; } - this.cachedExportPromise[dcId].then(() => { - //(cachedNetworker = networker).wrapApiCall(method, params, options).then(deferred.resolve, rejectPromise); - this.invokeApi(method, params, options).then(deferred.resolve, rejectPromise); - }, rejectPromise); + return this.cachedExportPromise[dcId].then(() => performRequest()); } else if(error.code === 303) { const newDcId = +error.type.match(/^(PHONE_MIGRATE_|NETWORK_MIGRATE_|USER_MIGRATE_)(\d+)/)[2] as DcId; if(newDcId !== dcId) { @@ -576,70 +575,70 @@ export class ApiManager { this.setBaseDcId(newDcId); } - this.getNetworker(newDcId, options).then((networker) => { - networker.wrapApiCall(method, params, options).then(deferred.resolve, rejectPromise); - }, rejectPromise); + return this.invokeApi(method, params, options); } } else if(error.code === 400 && error.type.indexOf('FILE_MIGRATE') === 0) { const newDcId = +error.type.match(/^(FILE_MIGRATE_)(\d+)/)[2] as DcId; if(newDcId !== dcId) { - this.getNetworker(newDcId, options).then((networker) => { - networker.wrapApiCall(method, params, options).then(deferred.resolve, rejectPromise); - }, rejectPromise); + options.dcId = newDcId; + return this.invokeApi(method, params, options); } else { - rejectPromise(error); + throw error; } } else if(error.code === 400 && error.type === 'CONNECTION_NOT_INITED') { networkerFactory.unsetConnectionInited(); - performRequest(cachedNetworker); + return performRequest(); } else if(!options.rawError && error.code === 420) { const waitTime = +error.type.match(/^FLOOD_WAIT_(\d+)/)[1] || 1; if(waitTime > (options.floodMaxTimeout !== undefined ? options.floodMaxTimeout : 60) && !options.prepareTempMessageId) { - return rejectPromise(error); + throw error; } - setTimeout(() => { - performRequest(cachedNetworker); - }, waitTime/* (waitTime + 5) */ * 1000); // 03.02.2020 + return pause(waitTime/* (waitTime + 5) */ * 1000).then(() => performRequest()); } else if(!options.rawError && ['MSG_WAIT_FAILED', 'MSG_WAIT_TIMEOUT'].includes(error.type)) { const after = this.afterMessageTempIds[afterMessageId]; afterMessageId = undefined; delete options.afterMessageId; - if(after) after.promise.then(() => performRequest(cachedNetworker)); - else performRequest(cachedNetworker); + if(after) return after.promise.then(() => performRequest()); + else return performRequest(); } else if(!options.rawError && error.code === 500) { const now = Date.now(); if(options.stopTime) { if(now >= options.stopTime) { - return rejectPromise(error); + throw error; } } options.waitTime = options.waitTime ? Math.min(60, options.waitTime * 1.5) : 1; - setTimeout(() => { - performRequest(cachedNetworker); - }, options.waitTime * 1000); + return pause(options.waitTime * 1000).then(() => performRequest()); } else if(error.type === 'UNKNOWN') { - setTimeout(() => { - performRequest(cachedNetworker); - }, 1000); + return pause(1000).then(() => performRequest()); } else { - rejectPromise(error); + throw error; } }); } + let p: Promise; if(dcId = (options.dcId || this.baseDcId)) { - this.getNetworker(dcId, options).then(performRequest, rejectPromise); + p = this.getNetworker(dcId, options); } else { - this.getBaseDcId().then(baseDcId => { - this.getNetworker(dcId = baseDcId, options).then(performRequest, rejectPromise); - }); + p = this.getBaseDcId().then((baseDcId) => this.getNetworker(dcId = baseDcId, options)); } + p.then((networker) => { + cachedNetworker = networker; + const promise = performRequest(); + cachedNetworker.attachPromise(deferred, options as MTMessage); + return promise; + }) + .then(deferred.resolve) + .catch(rejectPromise) + .catch(deferred.reject); + return deferred; } } diff --git a/src/lib/mtproto/networker.ts b/src/lib/mtproto/networker.ts index 99b5aa1a..9a41da01 100644 --- a/src/lib/mtproto/networker.ts +++ b/src/lib/mtproto/networker.ts @@ -469,7 +469,7 @@ export default class MTPNetworker { } public destroy() { - this.changeTransport(); + this.log('destroy'); } public forceReconnectTimeout() { @@ -812,43 +812,43 @@ export default class MTPNetworker { options.messageId = message.msg_id; } - if(promise) { - const canIncrement = !options.notContentRelated; - const timeout = setTimeout(() => { - if(this.lastResponseTime && (Date.now() - this.lastResponseTime) < this.delays.connectionTimeout) { - return; - } + return promise; + } - this.log.error('timeout', message); - if(this.isOnline) { - this.setConnectionStatus(ConnectionStatus.TimedOut); - } + public attachPromise(promise: Promise, message: MTMessage) { + const canIncrement = true; + const timeout = setTimeout(() => { + if(this.lastResponseTime && (Date.now() - this.lastResponseTime) < this.delays.connectionTimeout) { + return; + } - /* this.getEncryptedOutput(message).then(bytes => { - this.log.error('timeout encrypted', bytes); - }); */ - }, this.delays.connectionTimeout); - - promise.catch(noop).finally(() => { - clearTimeout(timeout); - this.setConnectionStatus(ConnectionStatus.Connected); + this.log.error('timeout', message); + if(this.isOnline) { + this.setConnectionStatus(ConnectionStatus.TimedOut); + } + + /* this.getEncryptedOutput(message).then(bytes => { + this.log.error('timeout encrypted', bytes); + }); */ + }, this.delays.connectionTimeout); + + promise.catch(noop).finally(() => { + clearTimeout(timeout); + this.setConnectionStatus(ConnectionStatus.Connected); - if(canIncrement) { - --this.activeRequests; - this.setDrainTimeout(); - } - }); - if(canIncrement) { - ++this.activeRequests; - if(this.onDrainTimeout !== undefined) { - clearTimeout(this.onDrainTimeout); - this.onDrainTimeout = undefined; - } + --this.activeRequests; + this.setDrainTimeout(); + } + }); + + if(canIncrement) { + ++this.activeRequests; + if(this.onDrainTimeout !== undefined) { + clearTimeout(this.onDrainTimeout); + this.onDrainTimeout = undefined; } } - - return promise; } public setDrainTimeout() { @@ -885,7 +885,7 @@ export default class MTPNetworker { this.scheduleRequest(); } - if((this.transport as TcpObfuscated).connection) { + if((this.transport as TcpObfuscated)?.connection) { this.clearPingDelayDisconnect(); this.sendPingDelayDisconnect(); } @@ -1288,8 +1288,12 @@ export default class MTPNetworker { private async sendEncryptedRequest(message: MTMessage) { const requestData = await this.getEncryptedOutput(message); + if(!this.transport) { + this.log.error('trying to send something when offline', this.transport, this); + } + this.debug && this.log.debug('sending:', message, [message.msg_id].concat(message.inner || []), requestData.length); - const promise: Promise = this.transport.send(requestData) as any; + const promise: Promise = this.transport ? this.transport.send(requestData) as any : Promise.reject({}); // this.debug && this.log.debug('sendEncryptedRequest: launched message into space:', message, promise); /// #if !MTPROTO_HAS_HTTP diff --git a/src/scss/style.scss b/src/scss/style.scss index d3b8fcc8..c67060f0 100644 --- a/src/scss/style.scss +++ b/src/scss/style.scss @@ -1451,7 +1451,7 @@ hr { .tgico-reply_filled:before, .tgico-forward_filled:before { font-size: 20px !important; - padding: 0 2px; + padding: 2px; } // ! TEMPORARY