Browse Source

Socket proxy class for Safari

master
Eduard Kuzmenko 3 years ago
parent
commit
debac6f0e5
  1. 3
      src/lib/mtproto/authorizer.ts
  2. 113
      src/lib/mtproto/dcConfigurator.ts
  3. 14
      src/lib/mtproto/mtproto.worker.ts
  4. 59
      src/lib/mtproto/mtprotoworker.ts
  5. 2
      src/lib/mtproto/networker.ts
  6. 153
      src/lib/mtproto/transports/tcpObfuscated.ts
  7. 18
      src/lib/mtproto/transports/transport.ts
  8. 7
      src/lib/mtproto/transports/websocket.ts

3
src/lib/mtproto/authorizer.ts

@ -105,7 +105,8 @@ export class Authorizer { @@ -105,7 +105,8 @@ export class Authorizer {
this.log('mtpSendPlainRequest: creating requestPromise');
}
return transport.send(resultArray).then(result => {
const promise = transport.send(resultArray) as any as Promise<Uint8Array>;
return promise.then(result => {
if(DEBUG) {
this.log('mtpSendPlainRequest: in good sector', result);
}

113
src/lib/mtproto/dcConfigurator.ts

@ -1,21 +1,16 @@ @@ -1,21 +1,16 @@
import MTTransport from './transports/transport';
import MTTransport, { MTConnection, MTConnectionConstructable } from './transports/transport';
import Modes from '../../config/modes';
/// #if MTPROTO_HTTP_UPLOAD
// @ts-ignore
import TcpObfuscated from './transports/tcpObfuscated';
// @ts-ignore
/// #if MTPROTO_HTTP || MTPROTO_HTTP_UPLOAD
import HTTP from './transports/http';
/// #elif !MTPROTO_HTTP
// @ts-ignore
/// #endif
/// #if !MTPROTO_HTTP
import Socket from './transports/websocket';
import TcpObfuscated from './transports/tcpObfuscated';
import EventListenerBase from '../../helpers/eventListenerBase';
import { isSafari } from '../../helpers/userAgent';
import type MTPNetworker from './networker';
import { notifyAll, isWebWorker } from '../../helpers/context';
import { CancellablePromise, deferredPromise } from '../../helpers/cancellablePromise';
/// #else
// @ts-ignore
import HTTP from './transports/http';
/// #endif
export type TransportType = 'websocket' | 'https' | 'http';
@ -31,6 +26,48 @@ type Servers = { @@ -31,6 +26,48 @@ type Servers = {
let socketId = 0;
const TEST_SUFFIX = Modes.test ? '_test' : '';
class SocketProxied extends EventListenerBase<{
open: () => void,
message: (buffer: ArrayBuffer) => any,
close: () => void,
}> implements MTConnection {
private id: number;
constructor(protected dcId: number, protected url: string, logSuffix: string) {
super();
this.id = ++socketId;
socketsProxied.set(this.id, this);
notifyAll({
type: 'socketProxy',
payload: {
type: 'setup',
payload: {
dcId,
url,
logSuffix
},
id: this.id
}
});
}
send = (payload: Uint8Array) => {
const task: any = {
type: 'socketProxy',
payload: {
type: 'send',
payload,
id: this.id
}
};
notifyAll(task);
};
}
export const socketsProxied: Map<number, SocketProxied> = new Map();
export class DcConfigurator {
private sslSubdomains = ['pluto', 'venus', 'aurora', 'vesta', 'flora'];
@ -58,58 +95,10 @@ export class DcConfigurator { @@ -58,58 +95,10 @@ export class DcConfigurator {
const logSuffix = connectionType === 'upload' ? '-U' : connectionType === 'download' ? '-D' : '';
const retryTimeout = connectionType === 'client' ? 30000 : 10000;
if(isSafari && isWebWorker && false) {
class P implements MTTransport {
private id: number;
private taskId = 0;
public networker: MTPNetworker;
public promises: Map<number, CancellablePromise<Uint8Array>> = new Map();
constructor(dcId: number, url: string) {
this.id = ++socketId;
notifyAll({
type: 'socketProxy',
payload: {
type: 'setup',
payload: {
dcId,
url,
logSuffix,
retryTimeout
},
id: this.id
}
});
}
send = (payload: Uint8Array) => {
const task: any = {
type: 'socketProxy',
payload: {
type: 'send',
payload,
id: this.id
}
};
if(this.networker) {
notifyAll(task);
return null;
}
task.payload.taskId = ++this.taskId;
const deferred = deferredPromise<Uint8Array>();
this.promises.set(task.id, deferred);
notifyAll(task);
return deferred;
};
}
const oooohLetMeLive: MTConnectionConstructable = (isSafari && isWebWorker) || true ? SocketProxied : Socket;
return new P(dcId, chosenServer);
} else {
return new TcpObfuscated(dcId, chosenServer, logSuffix, retryTimeout);
}
return new TcpObfuscated(oooohLetMeLive, dcId, chosenServer, logSuffix, retryTimeout);
};
/// #endif

14
src/lib/mtproto/mtproto.worker.ts

@ -8,6 +8,7 @@ import apiFileManager from './apiFileManager'; @@ -8,6 +8,7 @@ import apiFileManager from './apiFileManager';
//import { logger, LogLevels } from '../logger';
import type { ServiceWorkerTask, ServiceWorkerTaskResponse } from './mtproto.service';
import { ctx } from '../../helpers/userAgent';
import { socketsProxied } from './dcConfigurator';
//const log = logger('DW', LogLevels.error);
@ -109,6 +110,19 @@ const onMessage = async(e: any) => { @@ -109,6 +110,19 @@ const onMessage = async(e: any) => {
} else if(task.type === 'webpSupport') {
webpSupported = task.payload;
return;
} else if(task.type === 'socketProxy') {
const socketTask = task.payload;
const id = socketTask.id;
const socketProxied = socketsProxied.get(id);
if(socketTask.type === 'message') {
socketProxied.setListenerResult('message', socketTask.payload);
} else if(socketTask.type === 'open') {
socketProxied.setListenerResult('open');
} else if(socketTask.type === 'close') {
socketProxied.setListenerResult('close');
socketsProxied.delete(id);
}
}
if(!task.task) {

59
src/lib/mtproto/mtprotoworker.ts

@ -1,5 +1,4 @@ @@ -1,5 +1,4 @@
import MTProtoWorker from 'worker-loader!./mtproto.worker';
import SocketWorker from 'worker-loader!./transports/websocket';
//import './mtproto.worker';
import { isObject } from '../../helpers/object';
import type { MethodDeclMap } from '../../layer';
@ -16,6 +15,7 @@ import type { MTMessage } from './networker'; @@ -16,6 +15,7 @@ import type { MTMessage } from './networker';
import referenceDatabase from './referenceDatabase';
import appDocsManager from '../appManagers/appDocsManager';
import DEBUG, { MOUNT_CLASS_TO } from '../../config/debug';
import Socket from './transports/websocket';
type Task = {
taskId: number,
@ -59,7 +59,7 @@ export class ApiManagerProxy extends CryptoWorkerMethods { @@ -59,7 +59,7 @@ export class ApiManagerProxy extends CryptoWorkerMethods {
private debug = DEBUG;
private socketsWorkers: Map<number, SocketWorker> = new Map();
private sockets: Map<number, Socket> = new Map();
constructor() {
super();
@ -193,20 +193,53 @@ export class ApiManagerProxy extends CryptoWorkerMethods { @@ -193,20 +193,53 @@ export class ApiManagerProxy extends CryptoWorkerMethods {
} else if(task.type === 'socketProxy') {
const socketTask = task.payload;
const id = socketTask.id;
console.log('socketProxy', socketTask, id);
//console.log('socketProxy', socketTask, id);
if(socketTask.type === 'send') {
const socketWorker = this.socketsWorkers.get(id);
socketWorker.postMessage(socketTask);
const socket = this.sockets.get(id);
socket.send(socketTask.payload);
} else if(socketTask.type === 'setup') {
const socketWorker = new SocketWorker();
socketWorker.postMessage(socketTask);
socketWorker.addEventListener('message', (e) => {
const task = e.data;
});
const socket = new Socket(socketTask.payload.dcId, socketTask.payload.url, socketTask.payload.logSuffix);
const onOpen = () => {
//console.log('socketProxy onOpen');
this.postMessage({
type: 'socketProxy',
payload: {
type: 'open',
id
}
});
};
const onClose = () => {
this.postMessage({
type: 'socketProxy',
payload: {
type: 'close',
id
}
});
socket.removeListener('open', onOpen);
socket.removeListener('close', onClose);
socket.removeListener('message', onMessage);
this.sockets.delete(id);
};
const onMessage = (buffer: ArrayBuffer) => {
this.postMessage({
type: 'socketProxy',
payload: {
type: 'message',
id,
payload: buffer
}
});
};
this.socketsWorkers.set(id, socketWorker);
socket.addListener('open', onOpen);
socket.addListener('close', onClose);
socket.addListener('message', onMessage);
this.sockets.set(id, socket);
}
} else if(task.hasOwnProperty('result') || task.hasOwnProperty('error')) {
this.finalizeTask(task.taskId, task.result, task.error);

2
src/lib/mtproto/networker.ts

@ -1013,7 +1013,7 @@ export default class MTPNetworker { @@ -1013,7 +1013,7 @@ export default class MTPNetworker {
public sendEncryptedRequest(message: MTMessage) {
return this.getEncryptedOutput(message).then(requestData => {
const promise = this.transport.send(requestData);
const promise: Promise<Uint8Array> = this.transport.send(requestData) as any;
/// #if !MTPROTO_HTTP && !MTPROTO_HTTP_UPLOAD
return promise;
/// #else

153
src/lib/mtproto/transports/tcpObfuscated.ts

@ -2,8 +2,7 @@ import Modes from "../../../config/modes"; @@ -2,8 +2,7 @@ import Modes from "../../../config/modes";
import { logger, LogLevels } from "../../logger";
import MTPNetworker from "../networker";
import Obfuscation from "./obfuscation";
import MTTransport from "./transport";
import Socket from "./websocket";
import MTTransport, { MTConnection, MTConnectionConstructable } from "./transport";
import intermediatePacketCodec from './intermediate';
export default class TcpObfuscated implements MTTransport {
@ -27,11 +26,11 @@ export default class TcpObfuscated implements MTTransport { @@ -27,11 +26,11 @@ export default class TcpObfuscated implements MTTransport {
private lastCloseTime: number;
private socket: Socket;
private connection: MTConnection;
//private debugPayloads: MTPNetworker['debugRequests'] = [];
constructor(private dcId: number, private url: string, private logSuffix: string, public retryTimeout: number) {
constructor(private Connection: MTConnectionConstructable, private dcId: number, private url: string, private logSuffix: string, public retryTimeout: number) {
let logLevel = LogLevels.error | LogLevels.log;
if(this.debug) logLevel |= LogLevels.debug;
this.log = logger(`WS-${dcId}` + logSuffix, logLevel);
@ -40,92 +39,100 @@ export default class TcpObfuscated implements MTTransport { @@ -40,92 +39,100 @@ export default class TcpObfuscated implements MTTransport {
this.connect();
}
private connect() {
this.socket = new Socket(this.dcId, this.url, this.logSuffix);
this.socket.addListener('open', () => {
this.connected = true;
private onOpen = () => {
this.connected = true;
const initPayload = this.obfuscation.init(this.codec);
const initPayload = this.obfuscation.init(this.codec);
if(this.networker) {
this.networker.setConnectionStatus(true);
if(this.networker) {
this.networker.setConnectionStatus(true);
if(this.lastCloseTime) {
this.networker.cleanupSent();
this.networker.resend();
}
if(this.lastCloseTime) {
this.networker.cleanupSent();
this.networker.resend();
}
}
setTimeout(() => {
this.releasePending();
}, 0);
this.socket.send(initPayload);
});
this.socket.addListener('message', (buffer) => {
let data = this.obfuscation.decode(new Uint8Array(buffer));
data = this.codec.readPacket(data);
setTimeout(() => {
this.releasePending();
}, 0);
if(this.networker) { // authenticated!
//this.pending = this.pending.filter(p => p.body); // clear pending
this.connection.send(initPayload);
};
this.debug && this.log.debug('redirecting to networker', data.length);
this.networker.parseResponse(data).then(response => {
this.debug && this.log.debug('redirecting to networker response:', response);
private onMessage = (buffer: ArrayBuffer) => {
let data = this.obfuscation.decode(new Uint8Array(buffer));
data = this.codec.readPacket(data);
try {
this.networker.processMessage(response.response, response.messageId, response.sessionId);
} catch(err) {
this.log.error('handleMessage networker processMessage error', err);
}
if(this.networker) { // authenticated!
//this.pending = this.pending.filter(p => p.body); // clear pending
//this.releasePending();
}).catch(err => {
this.log.error('handleMessage networker parseResponse error', err);
});
this.debug && this.log.debug('redirecting to networker', data.length);
this.networker.parseResponse(data).then(response => {
this.debug && this.log.debug('redirecting to networker response:', response);
//this.dd();
return;
}
try {
this.networker.processMessage(response.response, response.messageId, response.sessionId);
} catch(err) {
this.log.error('handleMessage networker processMessage error', err);
}
//console.log('got hex:', data.hex);
const pending = this.pending.shift();
if(!pending) {
this.debug && this.log.debug('no pending for res:', data.hex);
return;
}
//this.releasePending();
}).catch(err => {
this.log.error('handleMessage networker parseResponse error', err);
});
pending.resolve(data);
});
//this.dd();
return;
}
this.socket.addListener('close', () => {
this.connected = false;
this.socket = undefined;
//console.log('got hex:', data.hex);
const pending = this.pending.shift();
if(!pending) {
this.debug && this.log.debug('no pending for res:', data.hex);
return;
}
const time = Date.now();
const diff = time - this.lastCloseTime;
const needTimeout = !isNaN(diff) && diff < this.retryTimeout ? this.retryTimeout - diff : 0;
pending.resolve(data);
};
if(this.networker) {
this.networker.setConnectionStatus(false);
private onClose = () => {
this.connected = false;
const time = Date.now();
const diff = time - this.lastCloseTime;
const needTimeout = !isNaN(diff) && diff < this.retryTimeout ? this.retryTimeout - diff : 0;
if(this.networker) {
this.networker.setConnectionStatus(false);
}
this.log('will try to reconnect after timeout:', needTimeout / 1000);
setTimeout(() => {
this.log('trying to reconnect...');
this.lastCloseTime = Date.now();
for(const pending of this.pending) {
if(pending.bodySent) {
pending.bodySent = false;
}
}
this.connect();
}, needTimeout);
this.connection.removeListener('open', this.onOpen);
this.connection.removeListener('close', this.onClose);
this.connection.removeListener('message', this.onMessage);
this.connection = undefined;
};
this.log('will try to reconnect after timeout:', needTimeout / 1000);
setTimeout(() => {
this.log('trying to reconnect...');
this.lastCloseTime = Date.now();
for(const pending of this.pending) {
if(pending.bodySent) {
pending.bodySent = false;
}
}
private connect() {
this.connection = new this.Connection(this.dcId, this.url, this.logSuffix);
this.connect();
}, needTimeout);
});
this.connection.addListener('open', this.onOpen);
this.connection.addListener('close', this.onClose);
this.connection.addListener('message', this.onMessage);
}
private encodeBody = (body: Uint8Array) => {
@ -202,7 +209,7 @@ export default class TcpObfuscated implements MTTransport { @@ -202,7 +209,7 @@ export default class TcpObfuscated implements MTTransport {
//this.lol.push(body);
//setTimeout(() => {
this.socket.send(encoded);
this.connection.send(encoded);
//}, 100);
//this.dd();

18
src/lib/mtproto/transports/transport.ts

@ -1,3 +1,17 @@ @@ -1,3 +1,17 @@
export default abstract class MTTransport {
abstract send: (data: Uint8Array) => Promise<Uint8Array>;
import type EventListenerBase from "../../../helpers/eventListenerBase";
export default interface MTTransport {
send: (data: Uint8Array) => void;
}
export interface MTConnection extends EventListenerBase<{
open: () => void,
message: (buffer: ArrayBuffer) => any,
close: () => void,
}> {
send: (data: Uint8Array) => void;
}
export interface MTConnectionConstructable {
new(dcId: number, url: string, logSuffix: string): MTConnection;
}

7
src/lib/mtproto/transports/websocket.ts

@ -1,13 +1,14 @@ @@ -1,13 +1,14 @@
import { logger, LogLevels } from '../../logger';
import Modes from '../../../config/modes';
import EventListenerBase from '../../../helpers/eventListenerBase';
import { MTConnection } from './transport';
export default class Socket extends EventListenerBase<{
open: () => void,
message: (buffer: ArrayBuffer) => any,
close: () => void,
}> {
public ws: WebSocket;
}> implements MTConnection {
private ws: WebSocket;
private log: ReturnType<typeof logger>;
private debug = Modes.debug && false;
@ -19,6 +20,8 @@ export default class Socket extends EventListenerBase<{ @@ -19,6 +20,8 @@ export default class Socket extends EventListenerBase<{
this.log = logger(`WS-${dcId}` + logSuffix, logLevel);
this.log('constructor');
this.connect();
return this;
}
private removeListeners() {

Loading…
Cancel
Save