From d7d045818cb8b4699aa1ffa128747b1a511186c5 Mon Sep 17 00:00:00 2001 From: Eduard Kuzmenko Date: Sun, 20 Jun 2021 20:07:20 +0300 Subject: [PATCH] [temp] networker drain --- src/config/app.ts | 4 +++- src/lib/mtproto/apiFileManager.ts | 5 +++-- src/lib/mtproto/apiManager.ts | 32 ++++++++++++++++++++----------- src/lib/mtproto/networker.ts | 18 +++++++++++++++++ src/lib/sessionStorage.ts | 3 ++- src/pages/pageSignQR.ts | 5 +++-- src/types.d.ts | 5 ++++- 7 files changed, 54 insertions(+), 18 deletions(-) diff --git a/src/config/app.ts b/src/config/app.ts index a7a379c4..6fef9fe3 100644 --- a/src/config/app.ts +++ b/src/config/app.ts @@ -9,6 +9,8 @@ * https://github.com/zhukov/webogram/blob/master/LICENSE */ +import type { DcId } from "../types"; + const App = { id: 1025907, hash: '452b0359b988148995f22ff0f4229750', @@ -17,7 +19,7 @@ const App = { langPack: 'macos', langPackCode: 'en', domains: [] as string[], - baseDcId: 2, + baseDcId: 2 as DcId, isMainDomain: location.hostname === 'web.telegram.org', suffix: 'K' }; diff --git a/src/lib/mtproto/apiFileManager.ts b/src/lib/mtproto/apiFileManager.ts index cba9a8cd..477213f8 100644 --- a/src/lib/mtproto/apiFileManager.ts +++ b/src/lib/mtproto/apiFileManager.ts @@ -16,6 +16,7 @@ import { notifyAll, notifySomeone } from "../../helpers/context"; import { getFileNameByLocation } from "../../helpers/fileName"; import { nextRandomInt } from "../../helpers/random"; import { InputFile, InputFileLocation, UploadFile } from "../../layer"; +import { DcId } from "../../types"; import CacheStorageController from "../cacheStorage"; import cryptoWorker from "../crypto/cryptoworker"; import FileManager from "../filemanager"; @@ -30,7 +31,7 @@ type Delayed = { }; export type DownloadOptions = { - dcId: number, + dcId: DcId, location: InputFileLocation, size?: number, fileName?: string, @@ -155,7 +156,7 @@ export class ApiFileManager { return canceled; } - public requestFilePart(dcId: number, location: InputFileLocation, offset: number, limit: number, id = 0, queueId = 0, checkCancel?: () => void) { + public requestFilePart(dcId: DcId, location: InputFileLocation, offset: number, limit: number, id = 0, queueId = 0, checkCancel?: () => void) { return this.downloadRequest(dcId, id, async() => { checkCancel && checkCancel(); diff --git a/src/lib/mtproto/apiManager.ts b/src/lib/mtproto/apiManager.ts index 33c3f540..0514b2d4 100644 --- a/src/lib/mtproto/apiManager.ts +++ b/src/lib/mtproto/apiManager.ts @@ -18,7 +18,7 @@ import networkerFactory from './networkerFactory'; import authorizer from './authorizer'; import dcConfigurator, { ConnectionType, TransportType } from './dcConfigurator'; import { logger } from '../logger'; -import type { InvokeApiOptions } from '../../types'; +import type { DcId, InvokeApiOptions, TrueDcId } from '../../types'; import type { MethodDeclMap } from '../../layer'; import { CancellablePromise, deferredPromise } from '../../helpers/cancellablePromise'; import { bytesFromHex, bytesToHex } from '../../helpers/bytes'; @@ -78,7 +78,7 @@ export class ApiManager { private cachedExportPromise: {[x: number]: Promise} = {}; private gettingNetworkers: {[dcIdAndType: string]: Promise} = {}; - private baseDcId = 0; + private baseDcId: DcId = 0 as DcId; //public telegramMeNotified = false; @@ -140,7 +140,7 @@ export class ApiManager { /// #endif } - public setBaseDcId(dcId: number) { + public setBaseDcId(dcId: DcId) { this.baseDcId = dcId; sessionStorage.set({ @@ -163,14 +163,14 @@ export class ApiManager { const logoutPromises: Promise[] = []; for(let i = 0; i < storageResult.length; i++) { if(storageResult[i]) { - logoutPromises.push(this.invokeApi('auth.logOut', {}, {dcId: i + 1, ignoreErrors: true})); + logoutPromises.push(this.invokeApi('auth.logOut', {}, {dcId: (i + 1) as DcId, ignoreErrors: true})); } } const clear = () => { //console.error('apiManager: logOut clear'); - this.baseDcId = 0; + this.baseDcId = undefined; //this.telegramMeNotify(false); IDBStorage.closeDatabases(); self.postMessage({type: 'clear'}); @@ -189,7 +189,7 @@ export class ApiManager { } // mtpGetNetworker - public getNetworker(dcId: number, options: InvokeApiOptions = {}): Promise { + public getNetworker(dcId: DcId, options: InvokeApiOptions = {}): Promise { const connectionType: ConnectionType = options.fileDownload ? 'download' : (options.fileUpload ? 'upload' : 'client'); //const connectionType: ConnectionType = 'client'; @@ -237,10 +237,10 @@ export class ApiManager { return this.gettingNetworkers[getKey]; } - const ak = 'dc' + dcId + '_auth_key'; - const ss = 'dc' + dcId + '_server_salt'; + const ak: `dc${TrueDcId}_auth_key` = `dc${dcId}_auth_key` as any; + const ss: `dc${TrueDcId}_server_salt` = `dc${dcId}_server_salt` as any; - return this.gettingNetworkers[getKey] = Promise.all([ak, ss].map(key => sessionStorage.get(key as any))) + return this.gettingNetworkers[getKey] = Promise.all([ak, ss].map(key => sessionStorage.get(key))) .then(async([authKeyHex, serverSaltHex]) => { const transport = dcConfigurator.chooseServer(dcId, connectionType, transportType, false); let networker: MTPNetworker; @@ -273,6 +273,16 @@ export class ApiManager { } } + if(networker.isFileNetworker) { + networker.onDrain = () => { + this.log('networker drain', networker); + + networker.onDrain = undefined; + const idx = networkers.indexOf(networker); + networkers.splice(idx, 1); + }; + } + /* networker.onConnectionStatusChange = (online) => { console.log('status:', online); }; */ @@ -354,7 +364,7 @@ export class ApiManager { } }; - let dcId: number; + let dcId: DcId; let cachedNetworker: MTPNetworker; let stack = (new Error()).stack || 'empty stack'; @@ -400,7 +410,7 @@ export class ApiManager { this.invokeApi(method, params, options).then(deferred.resolve, rejectPromise); }, rejectPromise); } else if(error.code === 303) { - const newDcId = +error.type.match(/^(PHONE_MIGRATE_|NETWORK_MIGRATE_|USER_MIGRATE_|FILE_MIGRATE_)(\d+)/)[2]; + const newDcId = +error.type.match(/^(PHONE_MIGRATE_|NETWORK_MIGRATE_|USER_MIGRATE_|FILE_MIGRATE_)(\d+)/)[2] as DcId; if(newDcId !== dcId) { if(options.dcId) { options.dcId = newDcId; diff --git a/src/lib/mtproto/networker.ts b/src/lib/mtproto/networker.ts index b1b55a2f..594f454e 100644 --- a/src/lib/mtproto/networker.ts +++ b/src/lib/mtproto/networker.ts @@ -74,6 +74,7 @@ export type MTMessage = InvokeApiOptions & MTMessageOptions & { }; const CONNECTION_TIMEOUT = 5000; +const DRAIN_TIMEOUT = 10000; let invokeAfterMsgConstructor: number; export default class MTPNetworker { @@ -124,6 +125,11 @@ export default class MTPNetworker { private debug = DEBUG /* && false */ || Modes.debug; + public activeRequests = 0; + + public onDrain: () => void; + private onDrainTimeout: number; + //private disconnectDelay: number; //private pingPromise: CancellablePromise; //public onConnectionStatusChange: (online: boolean) => void; @@ -687,7 +693,19 @@ export default class MTPNetworker { promise.finally(() => { clearTimeout(timeout); this.setConnectionStatus(true); + + if(!--this.activeRequests && this.onDrain) { + this.onDrainTimeout = self.setTimeout(() => { + this.log('drain'); + this.onDrain(); + }, DRAIN_TIMEOUT); + } }); + + ++this.activeRequests; + if(this.onDrainTimeout !== undefined) { + clearTimeout(this.onDrainTimeout); + } } return promise; diff --git a/src/lib/sessionStorage.ts b/src/lib/sessionStorage.ts index b91de8f3..99798310 100644 --- a/src/lib/sessionStorage.ts +++ b/src/lib/sessionStorage.ts @@ -6,11 +6,12 @@ import type { AppInstance } from './mtproto/singleInstance'; import type { UserAuth } from './mtproto/mtproto_config'; +import type { DcId } from '../types'; import { MOUNT_CLASS_TO } from '../config/debug'; import LocalStorageController from './localStorage'; const sessionStorage = new LocalStorageController<{ - dc: number, + dc: DcId, user_auth: UserAuth, dc1_auth_key: string, dc2_auth_key: string, diff --git a/src/pages/pageSignQR.ts b/src/pages/pageSignQR.ts index 25729890..3838946a 100644 --- a/src/pages/pageSignQR.ts +++ b/src/pages/pageSignQR.ts @@ -4,6 +4,7 @@ * https://github.com/morethanwords/tweb/blob/master/LICENSE */ +import type { DcId } from '../types'; import apiManager from '../lib/mtproto/mtprotoworker'; import Page from './page'; import serverTimeManager from '../lib/mtproto/serverTimeManager'; @@ -65,7 +66,7 @@ let onFirstMount = async() => { cachedPromise = null; }, true); - let options: {dcId?: number, ignoreErrors: true} = {ignoreErrors: true}; + let options: {dcId?: DcId, ignoreErrors: true} = {ignoreErrors: true}; let prevToken: Uint8Array | number[]; const iterate = async(isLoop: boolean) => { @@ -78,7 +79,7 @@ let onFirstMount = async() => { if(loginToken._ === 'auth.loginTokenMigrateTo') { if(!options.dcId) { - options.dcId = loginToken.dc_id; + options.dcId = loginToken.dc_id as DcId; apiManager.setBaseDcId(loginToken.dc_id); //continue; } diff --git a/src/types.d.ts b/src/types.d.ts index 97056aad..783e11c2 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -1,8 +1,11 @@ import { AuthSentCode } from "./layer"; import type { ApiError } from "./lib/mtproto/apiManager"; +export type DcId = number; +export type TrueDcId = 1 | 2 | 3 | 4 | 5; + export type InvokeApiOptions = Partial<{ - dcId: number, + dcId: DcId, floodMaxTimeout: number, noErrorBox: true, fileUpload: true,