[temp] networker destroy

This commit is contained in:
Eduard Kuzmenko 2021-06-21 18:19:33 +03:00
parent d7d045818c
commit d031ff7db5
6 changed files with 179 additions and 83 deletions

View File

@ -9,7 +9,7 @@
* https://github.com/zhukov/webogram/blob/master/LICENSE
*/
import MTTransport, { MTConnection, MTConnectionConstructable } from './transports/transport';
import MTTransport, { MTConnectionConstructable } from './transports/transport';
import Modes from '../../config/modes';
/// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD
@ -19,9 +19,9 @@ import HTTP from './transports/http';
/// #if !MTPROTO_HTTP
import Socket from './transports/websocket';
import TcpObfuscated from './transports/tcpObfuscated';
import EventListenerBase from '../../helpers/eventListenerBase';
import { isSafari } from '../../helpers/userAgent';
import { notifyAll, isWebWorker } from '../../helpers/context';
import { isWebWorker } from '../../helpers/context';
import SocketProxied from './transports/socketProxied';
/// #endif
export type TransportType = 'websocket' | 'https' | 'http';
@ -34,65 +34,8 @@ type Servers = {
}
};
let socketId = 0;
const TEST_SUFFIX = Modes.test ? '_test' : '';
/// #if !MTPROTO_SW
class SocketProxied extends EventListenerBase<{
open: () => void,
message: (buffer: ArrayBuffer) => any,
close: () => void,
}> implements MTConnection {
private id: number;
constructor(protected dcId: number, protected url: string, logSuffix: string) {
super();
this.id = ++socketId;
socketsProxied.set(this.id, this);
notifyAll({
type: 'socketProxy',
payload: {
type: 'setup',
payload: {
dcId,
url,
logSuffix
},
id: this.id
}
});
}
public send(payload: Uint8Array) {
const task: any = {
type: 'socketProxy',
payload: {
type: 'send',
payload,
id: this.id
}
};
notifyAll(task);
}
public close() {
const task: any = {
type: 'socketProxy',
payload: {
type: 'close',
id: this.id
}
};
notifyAll(task);
}
}
/// #endif
export const socketsProxied: Map<number, SocketProxied> = new Map();
export class DcConfigurator {
private sslSubdomains = ['pluto', 'venus', 'aurora', 'vesta', 'flora'];

View File

@ -13,13 +13,13 @@ import networkerFactory from "./networkerFactory";
import apiFileManager from './apiFileManager';
import type { RequestFilePartTask, RequestFilePartTaskResponse } from '../serviceWorker/index.service';
import { ctx } from '../../helpers/userAgent';
import { socketsProxied } from './dcConfigurator';
import { notifyAll } from '../../helpers/context';
// import AppStorage from '../storage';
import CacheStorageController from '../cacheStorage';
import sessionStorage from '../sessionStorage';
import { LocalStorageProxyTask } from '../localStorage';
import { WebpConvertTask } from '../webp/webpWorkerController';
import { socketsProxied } from './transports/socketProxied';
let webpSupported = false;
export const isWebpSupported = () => {

View File

@ -26,6 +26,7 @@ import sessionStorage from '../sessionStorage';
import webPushApiManager from './webPushApiManager';
import AppStorage from '../storage';
import appRuntimeManager from '../appManagers/appRuntimeManager';
import { SocketProxyTask } from './transports/socketProxied';
type Task = {
taskId: number,
@ -115,7 +116,7 @@ export class ApiManagerProxy extends CryptoWorkerMethods {
webpWorkerController.postMessage(task);
});
this.addTaskListener('socketProxy', (task) => {
this.addTaskListener('socketProxy', (task: SocketProxyTask) => {
const socketTask = task.payload;
const id = socketTask.id;
//console.log('socketProxy', socketTask, id);
@ -123,7 +124,7 @@ export class ApiManagerProxy extends CryptoWorkerMethods {
if(socketTask.type === 'send') {
const socket = this.sockets.get(id);
socket.send(socketTask.payload);
} else if(socketTask.type === 'close') {
} else if(socketTask.type === 'close') { // will remove from map in onClose
const socket = this.sockets.get(id);
socket.close();
} else if(socketTask.type === 'setup') {

View File

@ -17,7 +17,7 @@ import Schema from './schema';
import timeManager from './timeManager';
import networkerFactory from './networkerFactory';
import { logger, LogTypes } from '../logger';
import { InvokeApiOptions } from '../../types';
import { assumeType, InvokeApiOptions } from '../../types';
import { longToBytes } from '../crypto/crypto_utils';
import MTTransport from './transports/transport';
import { convertToUint8Array, bufferConcat, bytesCmp, bytesToHex } from '../../helpers/bytes';
@ -31,7 +31,7 @@ import HTTP from './transports/http';
/// #endif
import type TcpObfuscated from './transports/tcpObfuscated';
import { bigInt2str, cmp, rightShift_, str2bigInt } from '../../vendor/leemon';
import { bigInt2str, rightShift_, str2bigInt } from '../../vendor/leemon';
import { forEachReverse } from '../../helpers/array';
//console.error('networker included!', new Error().stack);
@ -369,6 +369,11 @@ export default class MTPNetworker {
return this.pushMessage(message, options);
}
public destroy() {
assumeType<TcpObfuscated>(this.transport);
this.transport.destroy();
}
// private sendPingDelayDisconnect = () => {
// if(this.pingPromise || true) return;
@ -693,7 +698,7 @@ export default class MTPNetworker {
promise.finally(() => {
clearTimeout(timeout);
this.setConnectionStatus(true);
if(!--this.activeRequests && this.onDrain) {
this.onDrainTimeout = self.setTimeout(() => {
this.log('drain');

View File

@ -0,0 +1,95 @@
/*
* https://github.com/morethanwords/tweb
* Copyright (C) 2019-2021 Eduard Kuzmenko
* https://github.com/morethanwords/tweb/blob/master/LICENSE
*/
import { notifyAll } from "../../../helpers/context";
import EventListenerBase from "../../../helpers/eventListenerBase";
import { WorkerTaskVoidTemplate } from "../../../types";
import { MTConnection } from "./transport";
let socketId = 0;
export interface SocketProxyTask extends WorkerTaskVoidTemplate {
type: 'socketProxy',
payload: SocketProxySetupTask | SocketProxySendTask | SocketProxyCloseTask
};
export interface SocketProxySetupTask extends WorkerTaskVoidTemplate {
type: 'setup',
payload: {
dcId: number,
url: string,
logSuffix: string
},
id: number
};
export interface SocketProxySendTask extends WorkerTaskVoidTemplate {
type: 'send',
payload: Uint8Array,
id: number
};
export interface SocketProxyCloseTask extends WorkerTaskVoidTemplate {
type: 'close',
id: number
};
/// #if !MTPROTO_SW
export default class SocketProxied extends EventListenerBase<{
open: () => void,
message: (buffer: ArrayBuffer) => any,
close: () => void,
}> implements MTConnection {
private id: number;
constructor(protected dcId: number, protected url: string, logSuffix: string) {
super();
this.id = ++socketId;
socketsProxied.set(this.id, this);
const task: SocketProxyTask = {
type: 'socketProxy',
payload: {
type: 'setup',
payload: {
dcId,
url,
logSuffix
},
id: this.id
}
};
notifyAll(task);
}
public send(payload: Uint8Array) {
const task: SocketProxyTask = {
type: 'socketProxy',
payload: {
type: 'send',
payload,
id: this.id
}
};
notifyAll(task);
}
public close() {
const task: SocketProxyTask = {
type: 'socketProxy',
payload: {
type: 'close',
id: this.id
}
};
notifyAll(task);
}
}
/// #endif
export const socketsProxied: Map<number, SocketProxied> = new Map();

View File

@ -30,6 +30,9 @@ export default class TcpObfuscated implements MTTransport {
private lastCloseTime: number;
private connection: MTConnection;
private autoReconnect = true;
private reconnectTimeout: number;
//private debugPayloads: MTPNetworker['debugRequests'] = [];
constructor(private Connection: MTConnectionConstructable,
@ -115,31 +118,80 @@ export default class TcpObfuscated implements MTTransport {
this.connection.removeEventListener('message', this.onMessage);
this.connection = undefined;
const time = Date.now();
const diff = time - this.lastCloseTime;
const needTimeout = !isNaN(diff) && diff < this.retryTimeout ? this.retryTimeout - diff : 0;
let needTimeout: number;
if(this.autoReconnect) {
const time = Date.now();
const diff = time - this.lastCloseTime;
needTimeout = !isNaN(diff) && diff < this.retryTimeout ? this.retryTimeout - diff : 0;
}
if(this.networker) {
this.networker.setConnectionStatus(false, needTimeout);
this.pending.length = 0;
}
if(this.autoReconnect) {
this.log('will try to reconnect after timeout:', needTimeout / 1000);
this.reconnectTimeout = self.setTimeout(this.reconnect, needTimeout);
} else {
this.log('reconnect isn\'t needed');
}
};
/**
* invoke only when closed
*/
public reconnect = () => {
if(this.reconnectTimeout !== undefined) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = undefined;
}
if(this.connected) {
return;
}
this.log('trying to reconnect...');
this.lastCloseTime = Date.now();
this.log('will try to reconnect after timeout:', needTimeout / 1000);
setTimeout(() => {
this.log('trying to reconnect...');
this.lastCloseTime = Date.now();
if(!this.networker) {
for(const pending of this.pending) {
if(pending.bodySent) {
pending.bodySent = false;
}
if(!this.networker) {
for(const pending of this.pending) {
if(pending.bodySent) {
pending.bodySent = false;
}
}
this.connect();
}, needTimeout);
};
}
this.connect();
}
public destroy() {
this.setAutoReconnect(false);
this.close();
}
public close() {
if(this.connection) {
this.connection.close();
}
}
/**
* Will connect if enable and disconnected \
* Will reset reconnection timeout if disable
*/
public setAutoReconnect(enable: boolean) {
this.autoReconnect = enable;
if(!enable) {
if(this.reconnectTimeout !== undefined) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = undefined;
}
} else if(!this.connection && this.reconnectTimeout === undefined) {
this.reconnect();
}
}
private connect() {
this.connection = new this.Connection(this.dcId, this.url, this.logSuffix);