wo-core-rpcsocket/rpcsocket.js

144 lines
5.4 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const ws = require('ws')
const { randomBytes } = require('crypto')
// 建议把 onopen, onerror, onclose, onmessage 保留给系统使用例如reconnecting。用户使用 on(event, listener)
const DAD = (module.exports = class RpcSocket extends ws {
constructor(address, protocols, options) {
let ws = super(address, protocols, options)
DAD.upgrade(ws)
}
static upgrade(ws) {
ws.__proto__ = DAD.prototype
ws.sid = randomBytes(16).toString('hex')
ws.onmessage = async ({ data }) => {
// console.log('[onmessage] 被调用在 data =', data)
try {
let { rpcType, rpcHow, rpcWhat, randomEvent, rpcResult } = JSON.parse(data)
// console.log('message data parsed to ', {rpcType, rpcHow, rpcWhat, randomEvent, rpcResult})
switch (rpcType) {
case 'SEND_REQUEST':
// 接收到异步的远程调用
// console.log(`被调方 收到 SEND_REQUEST: rpcHow=${rpcHow}, rpcWhat=${JSON.stringify(rpcWhat)}`)
if (ws.hasOwnProperty(rpcHow)) rpcResult = await ws[rpcHow](rpcWhat)
else rpcResult = { _state: 'ERROR', _stateMsg: `unknown rpcHow=${rpcHow}` }
if (randomEvent) {
ws.send(JSON.stringify({ rpcType: 'SEND_RESULT', randomEvent, rpcResult })) // 返回结果给远程
}
break
case 'SEND_RESULT':
// 接收到远程返回的结果
// console.log('主调方 收到 SEND_RESULT: rpcResult=', rpcResult)
ws.emit(randomEvent, rpcResult)
break
case 'SEND_NOTIFY':
default:
// 接收到同步的远程调用 或者 标准ws的send(...)
// console.log(`被调方 收到 SEND_NOFITY: rpcHow=${rpcHow}, rpcWhat=${JSON.stringify(rpcWhat)}`)
if (ws.hasOwnProperty(rpcHow)) ws[rpcHow](rpcWhat)
else if (ws.eventNames().indexOf(rpcHow) >= 0) ws.emit(rpcHow, rpcWhat)
else console.error('[onmessage] unknown rpc: ', rpcHow, rpcWhat)
break
}
} catch (exception) {
console.error('[onmessage] invalid rpc data: ', data, exception)
return
}
}
return ws
}
static connectAsync(url) {
return new Promise(function (resolve, reject) {
let socket = new DAD(url)
socket.onopen = () => resolve(socket)
socket.onerror = (err) => reject(err)
})
}
reconnectAsync(url) {
if (this.readyState === ws.CONNECTING || this.readyState === ws.OPEN) {
return Promise.resolve(this)
}
return new Promise((resolve, reject) => {
let socket = new DAD(url || this.url)
socket.once('open', () => resolve(socket))
socket.once('error', (err) => reject(err))
})
}
sendNotify({ rpcHow, rpcWhat, options, callback } = {}) {
// 发起同步的远程调用
this.send(JSON.stringify({ rpcType: 'SEND_NOTIFY', rpcHow, rpcWhat }), options, callback)
}
sendRequest({ rpcHow, rpcWhat, rpcCallback, timeout = 5000 } = {}) {
// 发起异步的远程调用
let randomEvent = randomBytes(16).toString('hex')
// console.log('randomEvent is randomized: ', randomEvent)
if (typeof rpcCallback === 'function') {
// 有回调
this.send(JSON.stringify({ rpcType: 'SEND_REQUEST', rpcHow, rpcWhat, randomEvent }), () => {
// console.log('in callback, randomEvent=', randomEvent)
this.once(randomEvent, rpcCallback)
setTimeout(() => {
if (this.eventNames().indexOf(randomEvent) >= 0) {
this.removeAllListeners(randomEvent)
rpcCallback({ _state: 'RPC_REQUEST_TIMEOUT', _stateMsg: `RpcSocket sendRequest timeout: ${rpcHow} ${rpcWhat}` })
}
}, timeout)
})
} else {
// 没有回调
return new Promise((resolve, reject) => {
this.send(JSON.stringify({ rpcType: 'SEND_REQUEST', rpcHow, rpcWhat, randomEvent }), () => {
this.once(randomEvent, resolve)
setTimeout(() => {
if (this.eventNames().indexOf(randomEvent) >= 0) {
this.removeAllListeners(randomEvent)
resolve({ _state: 'RPC_REQUEST_TIMEOUT', _stateMsg: `RpcSocket sendRequest timeout: ${rpcHow} ${rpcWhat}` })
}
}, timeout)
})
})
}
}
})
// (async ()=>{
// new RpcSocket('http://localhost:9007')
// await sleep(20000)
// new RpcSocket.Server({port:9007})
// await sleep(20000)
// })()
/*
const sleep = (ms) => new Promise((resolve, reject) => setTimeout(resolve, ms))
//RpcSocket=require('./Ling/RpcSocket.js');
server = new RpcSocket.Server({port:6000});
var serverSocket; server.on('connection', (socket)=>{console.log('socket coming');
serverSocket = RpcSocket.upgrade(socket).on('close', (code, reason)=>console.log('socket is closed for code and reason: ', code, reason))
.on('error', (errer)=>console.log('socket has error: ', error))
})
RpcSocket.connectAsync('http://localhost:6000').then((clientSocket)=>{
clientSocket.request('testfunc', {height:99}, (rpcResult)=>console.log('using request: ', rpcResult), 3000)
return clientSocket
}).then((clientSocket)=>{
console.log('sleeping...')
sleep(2000)
return clientSocket
}).then(async (clientSocket)=>{
let rpcResult = await clientSocket.sendRequest('testfunc', {height:99}, 3000).catch(console.log)
console.log('using sendRequest:', rpcResult)
process.exit()
}).catch((e)=>{
console.log('connect error happens:', e)
})
*/