const ws = require('ws') const { randomBytes } = require('crypto') // 建议把 onopen, onerror, onclose, onmessage 保留给系统使用,例如reconnecting。用户使用 on(event, listener) const DAD = (module.exports = class RpcSocket extends ws { constructor(address, protocols, options) { let ws = super(address, protocols, options) DAD.upgrade(ws) } static upgrade(ws) { ws.__proto__ = DAD.prototype ws.sid = randomBytes(16).toString('hex') ws.onmessage = async ({ data }) => { // console.log('[onmessage] 被调用在 data =', data) try { let { rpcType, rpcHow, rpcWhat, randomEvent, rpcResult } = JSON.parse(data) // console.log('message data parsed to ', {rpcType, rpcHow, rpcWhat, randomEvent, rpcResult}) switch (rpcType) { case 'SEND_REQUEST': // 接收到异步的远程调用 // console.log(`被调方 收到 SEND_REQUEST: rpcHow=${rpcHow}, rpcWhat=${JSON.stringify(rpcWhat)}`) if (ws.hasOwnProperty(rpcHow)) rpcResult = await ws[rpcHow](rpcWhat) else rpcResult = { _state: 'ERROR', _stateMsg: `unknown rpcHow=${rpcHow}` } if (randomEvent) { ws.send(JSON.stringify({ rpcType: 'SEND_RESULT', randomEvent, rpcResult })) // 返回结果给远程 } break case 'SEND_RESULT': // 接收到远程返回的结果 // console.log('主调方 收到 SEND_RESULT: rpcResult=', rpcResult) ws.emit(randomEvent, rpcResult) break case 'SEND_NOTIFY': default: // 接收到同步的远程调用 或者 标准ws的send(...) // console.log(`被调方 收到 SEND_NOFITY: rpcHow=${rpcHow}, rpcWhat=${JSON.stringify(rpcWhat)}`) if (ws.hasOwnProperty(rpcHow)) ws[rpcHow](rpcWhat) else if (ws.eventNames().indexOf(rpcHow) >= 0) ws.emit(rpcHow, rpcWhat) else console.error('[onmessage] unknown rpc: ', rpcHow, rpcWhat) break } } catch (exception) { console.error('[onmessage] invalid rpc data: ', data, exception) return } } return ws } static connectAsync(url) { return new Promise(function (resolve, reject) { let socket = new DAD(url) socket.onopen = () => resolve(socket) socket.onerror = (err) => reject(err) }) } reconnectAsync(url) { if (this.readyState === ws.CONNECTING || this.readyState === ws.OPEN) { return Promise.resolve(this) } return new Promise((resolve, reject) => { let socket = new DAD(url || this.url) socket.once('open', () => resolve(socket)) socket.once('error', (err) => reject(err)) }) } sendNotify({ rpcHow, rpcWhat, options, callback } = {}) { // 发起同步的远程调用 this.send(JSON.stringify({ rpcType: 'SEND_NOTIFY', rpcHow, rpcWhat }), options, callback) } sendRequest({ rpcHow, rpcWhat, rpcCallback, timeout = 5000 } = {}) { // 发起异步的远程调用 let randomEvent = randomBytes(16).toString('hex') // console.log('randomEvent is randomized: ', randomEvent) if (typeof rpcCallback === 'function') { // 有回调 this.send(JSON.stringify({ rpcType: 'SEND_REQUEST', rpcHow, rpcWhat, randomEvent }), () => { // console.log('in callback, randomEvent=', randomEvent) this.once(randomEvent, rpcCallback) setTimeout(() => { if (this.eventNames().indexOf(randomEvent) >= 0) { this.removeAllListeners(randomEvent) rpcCallback({ _state: 'RPC_REQUEST_TIMEOUT', _stateMsg: `RpcSocket sendRequest timeout: ${rpcHow} ${rpcWhat}` }) } }, timeout) }) } else { // 没有回调 return new Promise((resolve, reject) => { this.send(JSON.stringify({ rpcType: 'SEND_REQUEST', rpcHow, rpcWhat, randomEvent }), () => { this.once(randomEvent, resolve) setTimeout(() => { if (this.eventNames().indexOf(randomEvent) >= 0) { this.removeAllListeners(randomEvent) resolve({ _state: 'RPC_REQUEST_TIMEOUT', _stateMsg: `RpcSocket sendRequest timeout: ${rpcHow} ${rpcWhat}` }) } }, timeout) }) }) } } }) // (async ()=>{ // new RpcSocket('http://localhost:9007') // await sleep(20000) // new RpcSocket.Server({port:9007}) // await sleep(20000) // })() /* const sleep = (ms) => new Promise((resolve, reject) => setTimeout(resolve, ms)) //RpcSocket=require('./Ling/RpcSocket.js'); server = new RpcSocket.Server({port:6000}); var serverSocket; server.on('connection', (socket)=>{console.log('socket coming'); serverSocket = RpcSocket.upgrade(socket).on('close', (code, reason)=>console.log('socket is closed for code and reason: ', code, reason)) .on('error', (errer)=>console.log('socket has error: ', error)) }) RpcSocket.connectAsync('http://localhost:6000').then((clientSocket)=>{ clientSocket.request('testfunc', {height:99}, (rpcResult)=>console.log('using request: ', rpcResult), 3000) return clientSocket }).then((clientSocket)=>{ console.log('sleeping...') sleep(2000) return clientSocket }).then(async (clientSocket)=>{ let rpcResult = await clientSocket.sendRequest('testfunc', {height:99}, 3000).catch(console.log) console.log('using sendRequest:', rpcResult) process.exit() }).catch((e)=>{ console.log('connect error happens:', e) }) */