Browse Source

Release client networkers

Fix transport reusing
Release transport
master
Eduard Kuzmenko 3 years ago
parent
commit
c382e3e684
  1. 53
      src/lib/mtproto/apiManager.ts
  2. 19
      src/lib/mtproto/dcConfigurator.ts
  3. 21
      src/lib/mtproto/networker.ts
  4. 3
      src/lib/mtproto/transports/tcpObfuscated.ts
  5. 24
      src/pages/pageSignIn.ts

53
src/lib/mtproto/apiManager.ts

@ -16,7 +16,7 @@ import { isObject } from './bin_utils';
import networkerFactory from './networkerFactory'; import networkerFactory from './networkerFactory';
//import { telegramMeWebService } from './mtproto'; //import { telegramMeWebService } from './mtproto';
import authorizer from './authorizer'; import authorizer from './authorizer';
import dcConfigurator, { ConnectionType, TransportType } from './dcConfigurator'; import dcConfigurator, { ConnectionType, DcConfigurator, TransportType } from './dcConfigurator';
import { logger } from '../logger'; import { logger } from '../logger';
import type { DcId, InvokeApiOptions, TrueDcId } from '../../types'; import type { DcId, InvokeApiOptions, TrueDcId } from '../../types';
import type { MethodDeclMap } from '../../layer'; import type { MethodDeclMap } from '../../layer';
@ -141,6 +141,13 @@ export class ApiManager {
} }
public setBaseDcId(dcId: DcId) { public setBaseDcId(dcId: DcId) {
const wasDcId = this.baseDcId;
if(wasDcId) { // if migrated set ondrain
this.getNetworker(wasDcId).then(networker => {
this.setOnDrainIfNeeded(networker);
});
}
this.baseDcId = dcId; this.baseDcId = dcId;
sessionStorage.set({ sessionStorage.set({
@ -242,7 +249,7 @@ export class ApiManager {
return this.gettingNetworkers[getKey] = Promise.all([ak, ss].map(key => sessionStorage.get(key))) return this.gettingNetworkers[getKey] = Promise.all([ak, ss].map(key => sessionStorage.get(key)))
.then(async([authKeyHex, serverSaltHex]) => { .then(async([authKeyHex, serverSaltHex]) => {
const transport = dcConfigurator.chooseServer(dcId, connectionType, transportType, false); const transport = dcConfigurator.chooseServer(dcId, connectionType, transportType, connectionType === 'client');
let networker: MTPNetworker; let networker: MTPNetworker;
if(authKeyHex && authKeyHex.length === 512) { if(authKeyHex && authKeyHex.length === 512) {
if(!serverSaltHex || serverSaltHex.length !== 16) { if(!serverSaltHex || serverSaltHex.length !== 16) {
@ -273,28 +280,46 @@ export class ApiManager {
} }
} }
if(transportType === 'websocket' && networker.isFileNetworker) {
networker.onDrain = () => {
this.log('networker drain', networker.dcId);
networker.onDrain = undefined;
const idx = networkers.indexOf(networker);
networkers.splice(idx, 1);
networkerFactory.removeNetworker(networker);
networker.destroy();
};
}
/* networker.onConnectionStatusChange = (online) => { /* networker.onConnectionStatusChange = (online) => {
console.log('status:', online); console.log('status:', online);
}; */ }; */
delete this.gettingNetworkers[getKey]; delete this.gettingNetworkers[getKey];
networkers.unshift(networker); networkers.unshift(networker);
this.setOnDrainIfNeeded(networker);
return networker; return networker;
}); });
} }
public setOnDrainIfNeeded(networker: MTPNetworker) {
if(networker.onDrain) {
return;
}
const checkPromise: Promise<boolean> = networker.isFileNetworker ?
Promise.resolve(true) :
this.getBaseDcId().then(baseDcId => networker.dcId !== baseDcId);
checkPromise.then(canRelease => {
if(networker.onDrain) {
return;
}
if(canRelease) {
networker.onDrain = () => {
this.log('networker drain', networker.dcId);
networker.onDrain = undefined;
networker.destroy();
networkerFactory.removeNetworker(networker);
DcConfigurator.removeTransport(this.cachedNetworkers, networker);
DcConfigurator.removeTransport(dcConfigurator.chosenServers, networker.transport);
};
networker.setDrainTimeout();
}
});
}
// mtpInvokeApi // mtpInvokeApi
public invokeApi<T extends keyof MethodDeclMap>(method: T, params: MethodDeclMap[T]['req'] = {}, options: InvokeApiOptions = {}): CancellablePromise<MethodDeclMap[T]["res"]> { public invokeApi<T extends keyof MethodDeclMap>(method: T, params: MethodDeclMap[T]['req'] = {}, options: InvokeApiOptions = {}): CancellablePromise<MethodDeclMap[T]["res"]> {
///////this.log('Invoke api', method, params, options); ///////this.log('Invoke api', method, params, options);

19
src/lib/mtproto/dcConfigurator.ts

@ -53,7 +53,7 @@ export class DcConfigurator {
{id: 5, host: '149.154.171.5', port: 80} {id: 5, host: '149.154.171.5', port: 80}
]; ];
private chosenServers: Servers = {} as any; public chosenServers: Servers = {} as any;
/// #if !MTPROTO_HTTP /// #if !MTPROTO_HTTP
private transportSocket = (dcId: number, connectionType: ConnectionType) => { private transportSocket = (dcId: number, connectionType: ConnectionType) => {
@ -134,6 +134,23 @@ export class DcConfigurator {
return transports[0]; return transports[0];
} }
public static removeTransport<T>(obj: any, transport: T) {
for(const transportType in obj) {
// @ts-ignore
for(const connectionType in obj[transportType]) {
// @ts-ignore
for(const dcId in obj[transportType][connectionType]) {
// @ts-ignore
const transports: T[] = obj[transportType][connectionType][dcId];
const idx = transports.indexOf(transport);
if(idx !== -1) {
transports.splice(idx, 1);
}
}
}
}
}
} }
export default new DcConfigurator(); export default new DcConfigurator();

21
src/lib/mtproto/networker.ts

@ -139,7 +139,7 @@ export default class MTPNetworker {
//private debugRequests: Array<{before: Uint8Array, after: Uint8Array}> = []; //private debugRequests: Array<{before: Uint8Array, after: Uint8Array}> = [];
constructor(public dcId: number, private authKey: number[], private authKeyId: Uint8Array, constructor(public dcId: number, private authKey: number[], private authKeyId: Uint8Array,
serverSalt: number[], private transport: MTTransport, options: InvokeApiOptions = {}) { serverSalt: number[], public transport: MTTransport, options: InvokeApiOptions = {}) {
this.authKeyUint8 = convertToUint8Array(this.authKey); this.authKeyUint8 = convertToUint8Array(this.authKey);
this.serverSalt = convertToUint8Array(serverSalt); this.serverSalt = convertToUint8Array(serverSalt);
@ -715,23 +715,30 @@ export default class MTPNetworker {
clearTimeout(timeout); clearTimeout(timeout);
this.setConnectionStatus(ConnectionStatus.Connected); this.setConnectionStatus(ConnectionStatus.Connected);
if(!--this.activeRequests && this.onDrain) { --this.activeRequests;
this.onDrainTimeout = self.setTimeout(() => { this.setDrainTimeout();
this.log('drain');
this.onDrain();
}, DRAIN_TIMEOUT);
}
}); });
++this.activeRequests; ++this.activeRequests;
if(this.onDrainTimeout !== undefined) { if(this.onDrainTimeout !== undefined) {
clearTimeout(this.onDrainTimeout); clearTimeout(this.onDrainTimeout);
this.onDrainTimeout = undefined;
} }
} }
return promise; return promise;
} }
public setDrainTimeout() {
if(!this.activeRequests && this.onDrain && this.onDrainTimeout === undefined) {
this.onDrainTimeout = self.setTimeout(() => {
this.onDrainTimeout = undefined;
this.log('drain');
this.onDrain();
}, DRAIN_TIMEOUT);
}
}
public setConnectionStatus(status: ConnectionStatus, retryAt?: number) { public setConnectionStatus(status: ConnectionStatus, retryAt?: number) {
const isOnline = status === ConnectionStatus.Connected; const isOnline = status === ConnectionStatus.Connected;
const willChange = this.status !== status; const willChange = this.status !== status;

3
src/lib/mtproto/transports/tcpObfuscated.ts

@ -168,9 +168,10 @@ export default class TcpObfuscated implements MTTransport {
pending.bodySent = false; pending.bodySent = false;
} }
} }
} else {
this.networker.setConnectionStatus(ConnectionStatus.Connecting);
} }
this.networker.setConnectionStatus(ConnectionStatus.Connecting);
this.connect(); this.connect();
} }

24
src/pages/pageSignIn.ts

@ -34,6 +34,7 @@ import { attachClickEvent } from "../helpers/dom/clickEvent";
import replaceContent from "../helpers/dom/replaceContent"; import replaceContent from "../helpers/dom/replaceContent";
import toggleDisability from "../helpers/dom/toggleDisability"; import toggleDisability from "../helpers/dom/toggleDisability";
import sessionStorage from "../lib/sessionStorage"; import sessionStorage from "../lib/sessionStorage";
import { TrueDcId } from "../types";
type Country = _Country & { type Country = _Country & {
li?: HTMLLIElement[] li?: HTMLLIElement[]
@ -432,24 +433,35 @@ let onFirstMount = () => {
let tryAgain = () => { let tryAgain = () => {
apiManager.invokeApi('help.getNearestDc').then((nearestDcResult) => { apiManager.invokeApi('help.getNearestDc').then((nearestDcResult) => {
const dcs = [1, 2, 3, 4, 5]; const dcs = new Set([1, 2, 3, 4, 5]);
const done: number[] = [nearestDcResult.this_dc]; const done: number[] = [nearestDcResult.this_dc];
let promise: Promise<any>; let promise: Promise<any>;
if(nearestDcResult.nearest_dc !== nearestDcResult.this_dc) { if(nearestDcResult.nearest_dc !== nearestDcResult.this_dc) {
promise = apiManager.getNetworker(nearestDcResult.nearest_dc).then(() => { promise = apiManager.getNetworker(nearestDcResult.nearest_dc).then(() => {
done.push(nearestDcResult.nearest_dc) done.push(nearestDcResult.nearest_dc);
}); });
} }
(promise || Promise.resolve()).then(() => { (promise || Promise.resolve()).then(() => {
const g = () => { done.forEach(dcId => {
const dcId = dcs.shift(); dcs.delete(dcId);
});
const _dcs = [...dcs];
const g = async(): Promise<void> => {
const dcId = _dcs.shift();
if(!dcId) return; if(!dcId) return;
const dbKey: `dc${TrueDcId}_auth_key` = `dc${dcId}_auth_key` as any;
const key = await sessionStorage.get(dbKey);
if(key) {
return g();
}
setTimeout(() => { // * если одновременно запросить все нетворкеры, не будет проходить запрос на код setTimeout(() => { // * если одновременно запросить все нетворкеры, не будет проходить запрос на код
apiManager.getNetworker(dcId, {fileDownload: true}).finally(g); apiManager.getNetworker(dcId/* , {fileDownload: true} */).finally(g);
}, done.includes(dcId) ? 0 : 3000); }, /* done.includes(dcId) ? 0 : */3000);
}; };
g(); g();

Loading…
Cancel
Save