[temp] networker drain

This commit is contained in:
Eduard Kuzmenko 2021-06-20 20:07:20 +03:00
parent 2adb7e3528
commit d7d045818c
7 changed files with 54 additions and 18 deletions

View File

@ -9,6 +9,8 @@
* https://github.com/zhukov/webogram/blob/master/LICENSE
*/
import type { DcId } from "../types";
const App = {
id: 1025907,
hash: '452b0359b988148995f22ff0f4229750',
@ -17,7 +19,7 @@ const App = {
langPack: 'macos',
langPackCode: 'en',
domains: [] as string[],
baseDcId: 2,
baseDcId: 2 as DcId,
isMainDomain: location.hostname === 'web.telegram.org',
suffix: 'K'
};

View File

@ -16,6 +16,7 @@ import { notifyAll, notifySomeone } from "../../helpers/context";
import { getFileNameByLocation } from "../../helpers/fileName";
import { nextRandomInt } from "../../helpers/random";
import { InputFile, InputFileLocation, UploadFile } from "../../layer";
import { DcId } from "../../types";
import CacheStorageController from "../cacheStorage";
import cryptoWorker from "../crypto/cryptoworker";
import FileManager from "../filemanager";
@ -30,7 +31,7 @@ type Delayed = {
};
export type DownloadOptions = {
dcId: number,
dcId: DcId,
location: InputFileLocation,
size?: number,
fileName?: string,
@ -155,7 +156,7 @@ export class ApiFileManager {
return canceled;
}
public requestFilePart(dcId: number, location: InputFileLocation, offset: number, limit: number, id = 0, queueId = 0, checkCancel?: () => void) {
public requestFilePart(dcId: DcId, location: InputFileLocation, offset: number, limit: number, id = 0, queueId = 0, checkCancel?: () => void) {
return this.downloadRequest(dcId, id, async() => {
checkCancel && checkCancel();

View File

@ -18,7 +18,7 @@ import networkerFactory from './networkerFactory';
import authorizer from './authorizer';
import dcConfigurator, { ConnectionType, TransportType } from './dcConfigurator';
import { logger } from '../logger';
import type { InvokeApiOptions } from '../../types';
import type { DcId, InvokeApiOptions, TrueDcId } from '../../types';
import type { MethodDeclMap } from '../../layer';
import { CancellablePromise, deferredPromise } from '../../helpers/cancellablePromise';
import { bytesFromHex, bytesToHex } from '../../helpers/bytes';
@ -78,7 +78,7 @@ export class ApiManager {
private cachedExportPromise: {[x: number]: Promise<unknown>} = {};
private gettingNetworkers: {[dcIdAndType: string]: Promise<MTPNetworker>} = {};
private baseDcId = 0;
private baseDcId: DcId = 0 as DcId;
//public telegramMeNotified = false;
@ -140,7 +140,7 @@ export class ApiManager {
/// #endif
}
public setBaseDcId(dcId: number) {
public setBaseDcId(dcId: DcId) {
this.baseDcId = dcId;
sessionStorage.set({
@ -163,14 +163,14 @@ export class ApiManager {
const logoutPromises: Promise<any>[] = [];
for(let i = 0; i < storageResult.length; i++) {
if(storageResult[i]) {
logoutPromises.push(this.invokeApi('auth.logOut', {}, {dcId: i + 1, ignoreErrors: true}));
logoutPromises.push(this.invokeApi('auth.logOut', {}, {dcId: (i + 1) as DcId, ignoreErrors: true}));
}
}
const clear = () => {
//console.error('apiManager: logOut clear');
this.baseDcId = 0;
this.baseDcId = undefined;
//this.telegramMeNotify(false);
IDBStorage.closeDatabases();
self.postMessage({type: 'clear'});
@ -189,7 +189,7 @@ export class ApiManager {
}
// mtpGetNetworker
public getNetworker(dcId: number, options: InvokeApiOptions = {}): Promise<MTPNetworker> {
public getNetworker(dcId: DcId, options: InvokeApiOptions = {}): Promise<MTPNetworker> {
const connectionType: ConnectionType = options.fileDownload ? 'download' : (options.fileUpload ? 'upload' : 'client');
//const connectionType: ConnectionType = 'client';
@ -237,10 +237,10 @@ export class ApiManager {
return this.gettingNetworkers[getKey];
}
const ak = 'dc' + dcId + '_auth_key';
const ss = 'dc' + dcId + '_server_salt';
const ak: `dc${TrueDcId}_auth_key` = `dc${dcId}_auth_key` as any;
const ss: `dc${TrueDcId}_server_salt` = `dc${dcId}_server_salt` as any;
return this.gettingNetworkers[getKey] = Promise.all([ak, ss].map(key => sessionStorage.get(key as any)))
return this.gettingNetworkers[getKey] = Promise.all([ak, ss].map(key => sessionStorage.get(key)))
.then(async([authKeyHex, serverSaltHex]) => {
const transport = dcConfigurator.chooseServer(dcId, connectionType, transportType, false);
let networker: MTPNetworker;
@ -273,6 +273,16 @@ export class ApiManager {
}
}
if(networker.isFileNetworker) {
networker.onDrain = () => {
this.log('networker drain', networker);
networker.onDrain = undefined;
const idx = networkers.indexOf(networker);
networkers.splice(idx, 1);
};
}
/* networker.onConnectionStatusChange = (online) => {
console.log('status:', online);
}; */
@ -354,7 +364,7 @@ export class ApiManager {
}
};
let dcId: number;
let dcId: DcId;
let cachedNetworker: MTPNetworker;
let stack = (new Error()).stack || 'empty stack';
@ -400,7 +410,7 @@ export class ApiManager {
this.invokeApi(method, params, options).then(deferred.resolve, rejectPromise);
}, rejectPromise);
} else if(error.code === 303) {
const newDcId = +error.type.match(/^(PHONE_MIGRATE_|NETWORK_MIGRATE_|USER_MIGRATE_|FILE_MIGRATE_)(\d+)/)[2];
const newDcId = +error.type.match(/^(PHONE_MIGRATE_|NETWORK_MIGRATE_|USER_MIGRATE_|FILE_MIGRATE_)(\d+)/)[2] as DcId;
if(newDcId !== dcId) {
if(options.dcId) {
options.dcId = newDcId;

View File

@ -74,6 +74,7 @@ export type MTMessage = InvokeApiOptions & MTMessageOptions & {
};
const CONNECTION_TIMEOUT = 5000;
const DRAIN_TIMEOUT = 10000;
let invokeAfterMsgConstructor: number;
export default class MTPNetworker {
@ -124,6 +125,11 @@ export default class MTPNetworker {
private debug = DEBUG /* && false */ || Modes.debug;
public activeRequests = 0;
public onDrain: () => void;
private onDrainTimeout: number;
//private disconnectDelay: number;
//private pingPromise: CancellablePromise<any>;
//public onConnectionStatusChange: (online: boolean) => void;
@ -687,7 +693,19 @@ export default class MTPNetworker {
promise.finally(() => {
clearTimeout(timeout);
this.setConnectionStatus(true);
if(!--this.activeRequests && this.onDrain) {
this.onDrainTimeout = self.setTimeout(() => {
this.log('drain');
this.onDrain();
}, DRAIN_TIMEOUT);
}
});
++this.activeRequests;
if(this.onDrainTimeout !== undefined) {
clearTimeout(this.onDrainTimeout);
}
}
return promise;

View File

@ -6,11 +6,12 @@
import type { AppInstance } from './mtproto/singleInstance';
import type { UserAuth } from './mtproto/mtproto_config';
import type { DcId } from '../types';
import { MOUNT_CLASS_TO } from '../config/debug';
import LocalStorageController from './localStorage';
const sessionStorage = new LocalStorageController<{
dc: number,
dc: DcId,
user_auth: UserAuth,
dc1_auth_key: string,
dc2_auth_key: string,

View File

@ -4,6 +4,7 @@
* https://github.com/morethanwords/tweb/blob/master/LICENSE
*/
import type { DcId } from '../types';
import apiManager from '../lib/mtproto/mtprotoworker';
import Page from './page';
import serverTimeManager from '../lib/mtproto/serverTimeManager';
@ -65,7 +66,7 @@ let onFirstMount = async() => {
cachedPromise = null;
}, true);
let options: {dcId?: number, ignoreErrors: true} = {ignoreErrors: true};
let options: {dcId?: DcId, ignoreErrors: true} = {ignoreErrors: true};
let prevToken: Uint8Array | number[];
const iterate = async(isLoop: boolean) => {
@ -78,7 +79,7 @@ let onFirstMount = async() => {
if(loginToken._ === 'auth.loginTokenMigrateTo') {
if(!options.dcId) {
options.dcId = loginToken.dc_id;
options.dcId = loginToken.dc_id as DcId;
apiManager.setBaseDcId(loginToken.dc_id);
//continue;
}

5
src/types.d.ts vendored
View File

@ -1,8 +1,11 @@
import { AuthSentCode } from "./layer";
import type { ApiError } from "./lib/mtproto/apiManager";
export type DcId = number;
export type TrueDcId = 1 | 2 | 3 | 4 | 5;
export type InvokeApiOptions = Partial<{
dcId: number,
dcId: DcId,
floodMaxTimeout: number,
noErrorBox: true,
fileUpload: true,