Fix stuck requests because of drain race condition
Fix releasing transport on drain
This commit is contained in:
Eduard Kuzmenko 2022-04-04 03:16:22 +03:00
parent becdadfe44
commit fceec8e8e8
5 changed files with 83 additions and 77 deletions

View File

@ -332,7 +332,10 @@ export default class ChatBubbles {
const timeSpan = div.querySelector('.time');
const newDiv = wrapDocument({message});
div.replaceWith(newDiv);
newDiv.querySelector('.document-size').append(timeSpan);
if(timeSpan) {
newDiv.querySelector('.document-size').append(timeSpan);
}
}
if(container) {

View File

@ -13,7 +13,7 @@ export default function getViewportSlice({overflowElement, selector, extraSize}:
selector: string,
extraSize?: number
}) {
const perf = performance.now();
// const perf = performance.now();
const overflowRect = overflowElement.getBoundingClientRect();
const elements = Array.from(overflowElement.querySelectorAll<HTMLElement>(selector));
@ -67,7 +67,7 @@ export default function getViewportSlice({overflowElement, selector, extraSize}:
}
}
console.log('getViewportSlice time:', performance.now() - perf);
// console.log('getViewportSlice time:', performance.now() - perf);
return {invisibleTop, visible, invisibleBottom};
}

View File

@ -40,6 +40,7 @@ import rootScope from '../rootScope';
/// #if MTPROTO_AUTO
import transportController from './transports/controller';
import MTTransport from './transports/transport';
import { pause } from '../../helpers/schedulers/pause';
/// #endif
/* var networker = apiManager.cachedNetworkers.websocket.upload[2];
@ -407,7 +408,7 @@ export class ApiManager {
});
}
private changeNetworkerTransport(networker: MTPNetworker, transport: MTTransport) {
private changeNetworkerTransport(networker: MTPNetworker, transport?: MTTransport) {
const oldTransport = networker.transport;
if(oldTransport) {
DcConfigurator.removeTransport(dcConfigurator.chosenServers, oldTransport);
@ -434,6 +435,7 @@ export class ApiManager {
this.log('networker drain', networker.dcId);
networker.onDrain = undefined;
this.changeNetworkerTransport(networker);
networker.destroy();
networkerFactory.removeNetworker(networker);
DcConfigurator.removeTransport(this.cachedNetworkers, networker);
@ -475,14 +477,12 @@ export class ApiManager {
});
}
const rejectPromise = (error: ApiError) => {
const rejectPromise = async(error: ApiError) => {
if(!error) {
error = {type: 'ERROR_EMPTY'};
} else if(!isObject(error)) {
error = {message: error};
}
deferred.reject(error);
if((error.code === 401 && error.type === 'SESSION_REVOKED') ||
(error.code === 406 && error.type === 'AUTH_KEY_DUPLICATED')) {
@ -490,7 +490,7 @@ export class ApiManager {
}
if(options.ignoreErrors) {
return;
throw error;
}
if(error.code === 406) {
@ -512,13 +512,15 @@ export class ApiManager {
}
}, 100);
}
throw error;
};
let dcId: DcId;
let cachedNetworker: MTPNetworker;
let stack = (new Error()).stack || 'empty stack';
const performRequest = (networker: MTPNetworker) => {
const performRequest = (): Promise<any> => {
if(afterMessageId) {
const after = this.afterMessageTempIds[afterMessageId];
if(after) {
@ -526,7 +528,7 @@ export class ApiManager {
}
}
const promise = (cachedNetworker = networker).wrapApiCall(method, params, options);
const promise = cachedNetworker.wrapApiCall(method, params, options);
if(prepareTempMessageId) {
this.afterMessageTempIds[prepareTempMessageId] = {
@ -535,7 +537,7 @@ export class ApiManager {
};
}
return promise.then(deferred.resolve, (error: ApiError) => {
return promise.catch((error: ApiError) => {
//if(!options.ignoreErrors) {
if(error.type !== 'FILE_REFERENCE_EXPIRED'/* && error.type !== 'MSG_WAIT_FAILED' */) {
this.log.error('Error', error.code, error.type, this.baseDcId, dcId, method, params);
@ -548,7 +550,7 @@ export class ApiManager {
//this.telegramMeNotify(false);
}
rejectPromise(error);
throw error;
} else if(error.code === 401 && this.baseDcId && dcId !== this.baseDcId) {
if(this.cachedExportPromise[dcId] === undefined) {
const promise = new Promise((exportResolve, exportReject) => {
@ -563,10 +565,7 @@ export class ApiManager {
this.cachedExportPromise[dcId] = promise;
}
this.cachedExportPromise[dcId].then(() => {
//(cachedNetworker = networker).wrapApiCall(method, params, options).then(deferred.resolve, rejectPromise);
this.invokeApi(method, params, options).then(deferred.resolve, rejectPromise);
}, rejectPromise);
return this.cachedExportPromise[dcId].then(() => performRequest());
} else if(error.code === 303) {
const newDcId = +error.type.match(/^(PHONE_MIGRATE_|NETWORK_MIGRATE_|USER_MIGRATE_)(\d+)/)[2] as DcId;
if(newDcId !== dcId) {
@ -576,70 +575,70 @@ export class ApiManager {
this.setBaseDcId(newDcId);
}
this.getNetworker(newDcId, options).then((networker) => {
networker.wrapApiCall(method, params, options).then(deferred.resolve, rejectPromise);
}, rejectPromise);
return this.invokeApi(method, params, options);
}
} else if(error.code === 400 && error.type.indexOf('FILE_MIGRATE') === 0) {
const newDcId = +error.type.match(/^(FILE_MIGRATE_)(\d+)/)[2] as DcId;
if(newDcId !== dcId) {
this.getNetworker(newDcId, options).then((networker) => {
networker.wrapApiCall(method, params, options).then(deferred.resolve, rejectPromise);
}, rejectPromise);
options.dcId = newDcId;
return this.invokeApi(method, params, options);
} else {
rejectPromise(error);
throw error;
}
} else if(error.code === 400 && error.type === 'CONNECTION_NOT_INITED') {
networkerFactory.unsetConnectionInited();
performRequest(cachedNetworker);
return performRequest();
} else if(!options.rawError && error.code === 420) {
const waitTime = +error.type.match(/^FLOOD_WAIT_(\d+)/)[1] || 1;
if(waitTime > (options.floodMaxTimeout !== undefined ? options.floodMaxTimeout : 60) && !options.prepareTempMessageId) {
return rejectPromise(error);
throw error;
}
setTimeout(() => {
performRequest(cachedNetworker);
}, waitTime/* (waitTime + 5) */ * 1000); // 03.02.2020
return pause(waitTime/* (waitTime + 5) */ * 1000).then(() => performRequest());
} else if(!options.rawError && ['MSG_WAIT_FAILED', 'MSG_WAIT_TIMEOUT'].includes(error.type)) {
const after = this.afterMessageTempIds[afterMessageId];
afterMessageId = undefined;
delete options.afterMessageId;
if(after) after.promise.then(() => performRequest(cachedNetworker));
else performRequest(cachedNetworker);
if(after) return after.promise.then(() => performRequest());
else return performRequest();
} else if(!options.rawError && error.code === 500) {
const now = Date.now();
if(options.stopTime) {
if(now >= options.stopTime) {
return rejectPromise(error);
throw error;
}
}
options.waitTime = options.waitTime ? Math.min(60, options.waitTime * 1.5) : 1;
setTimeout(() => {
performRequest(cachedNetworker);
}, options.waitTime * 1000);
return pause(options.waitTime * 1000).then(() => performRequest());
} else if(error.type === 'UNKNOWN') {
setTimeout(() => {
performRequest(cachedNetworker);
}, 1000);
return pause(1000).then(() => performRequest());
} else {
rejectPromise(error);
throw error;
}
});
}
let p: Promise<MTPNetworker>;
if(dcId = (options.dcId || this.baseDcId)) {
this.getNetworker(dcId, options).then(performRequest, rejectPromise);
p = this.getNetworker(dcId, options);
} else {
this.getBaseDcId().then(baseDcId => {
this.getNetworker(dcId = baseDcId, options).then(performRequest, rejectPromise);
});
p = this.getBaseDcId().then((baseDcId) => this.getNetworker(dcId = baseDcId, options));
}
p.then((networker) => {
cachedNetworker = networker;
const promise = performRequest();
cachedNetworker.attachPromise(deferred, options as MTMessage);
return promise;
})
.then(deferred.resolve)
.catch(rejectPromise)
.catch(deferred.reject);
return deferred;
}
}

View File

@ -469,7 +469,7 @@ export default class MTPNetworker {
}
public destroy() {
this.changeTransport();
this.log('destroy');
}
public forceReconnectTimeout() {
@ -812,43 +812,43 @@ export default class MTPNetworker {
options.messageId = message.msg_id;
}
if(promise) {
const canIncrement = !options.notContentRelated;
const timeout = setTimeout(() => {
if(this.lastResponseTime && (Date.now() - this.lastResponseTime) < this.delays.connectionTimeout) {
return;
}
return promise;
}
this.log.error('timeout', message);
if(this.isOnline) {
this.setConnectionStatus(ConnectionStatus.TimedOut);
}
public attachPromise(promise: Promise<any>, message: MTMessage) {
const canIncrement = true;
const timeout = setTimeout(() => {
if(this.lastResponseTime && (Date.now() - this.lastResponseTime) < this.delays.connectionTimeout) {
return;
}
/* this.getEncryptedOutput(message).then(bytes => {
this.log.error('timeout encrypted', bytes);
}); */
}, this.delays.connectionTimeout);
promise.catch(noop).finally(() => {
clearTimeout(timeout);
this.setConnectionStatus(ConnectionStatus.Connected);
this.log.error('timeout', message);
if(this.isOnline) {
this.setConnectionStatus(ConnectionStatus.TimedOut);
}
/* this.getEncryptedOutput(message).then(bytes => {
this.log.error('timeout encrypted', bytes);
}); */
}, this.delays.connectionTimeout);
promise.catch(noop).finally(() => {
clearTimeout(timeout);
this.setConnectionStatus(ConnectionStatus.Connected);
if(canIncrement) {
--this.activeRequests;
this.setDrainTimeout();
}
});
if(canIncrement) {
++this.activeRequests;
if(this.onDrainTimeout !== undefined) {
clearTimeout(this.onDrainTimeout);
this.onDrainTimeout = undefined;
}
--this.activeRequests;
this.setDrainTimeout();
}
});
if(canIncrement) {
++this.activeRequests;
if(this.onDrainTimeout !== undefined) {
clearTimeout(this.onDrainTimeout);
this.onDrainTimeout = undefined;
}
}
return promise;
}
public setDrainTimeout() {
@ -885,7 +885,7 @@ export default class MTPNetworker {
this.scheduleRequest();
}
if((this.transport as TcpObfuscated).connection) {
if((this.transport as TcpObfuscated)?.connection) {
this.clearPingDelayDisconnect();
this.sendPingDelayDisconnect();
}
@ -1288,8 +1288,12 @@ export default class MTPNetworker {
private async sendEncryptedRequest(message: MTMessage) {
const requestData = await this.getEncryptedOutput(message);
if(!this.transport) {
this.log.error('trying to send something when offline', this.transport, this);
}
this.debug && this.log.debug('sending:', message, [message.msg_id].concat(message.inner || []), requestData.length);
const promise: Promise<Uint8Array> = this.transport.send(requestData) as any;
const promise: Promise<Uint8Array> = this.transport ? this.transport.send(requestData) as any : Promise.reject({});
// this.debug && this.log.debug('sendEncryptedRequest: launched message into space:', message, promise);
/// #if !MTPROTO_HAS_HTTP

View File

@ -1451,7 +1451,7 @@ hr {
.tgico-reply_filled:before,
.tgico-forward_filled:before {
font-size: 20px !important;
padding: 0 2px;
padding: 2px;
}
// ! TEMPORARY