取消 socketPool,使用内置的 wsServer.clients 来管理;设置 socket.skid/usid 来管理究竟要 sendToOne 给谁
This commit is contained in:
parent
3529789787
commit
e9e7b2a63e
133
basesocket.js
133
basesocket.js
@ -1,29 +1,40 @@
|
|||||||
const ws = require('ws')
|
const ws = require('ws')
|
||||||
const webtoken = require('wo-base-webtoken')
|
const webtoken = require('wo-base-webtoken')
|
||||||
|
const crypto = require('crypto')
|
||||||
|
|
||||||
const my = {
|
const my = {
|
||||||
wsServer: undefined,
|
wsServer: undefined,
|
||||||
socketPool: {},
|
// socketPool: {},
|
||||||
listeners: {},
|
listeners: {},
|
||||||
|
heartbeat: false,
|
||||||
|
heartbeatInterval: 30000,
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
initSocket (webServer) {
|
initSocket ({ webServer, heartbeat, heartbeatInterval }) {
|
||||||
my.wsServer = new ws.Server({ server: webServer })
|
my.wsServer = new ws.Server({ server: webServer })
|
||||||
//console.info({ _at: new Date().toJSON(), _from: 'Socket:initSocket', _type: 'CLOG', about: 'Base Socket Server is initialized.' }, '\n,')
|
//console.info({ _at: new Date().toJSON(), _from: 'Socket:initSocket', _type: 'CLOG', about: 'Base Socket Server is initialized.' }, '\n,')
|
||||||
|
|
||||||
my.wsServer.on('connection', (socket, req) => {
|
my.wsServer.on('connection', (socket, req) => {
|
||||||
//console.info({_at:new Date().toJSON(), _from: 'basesocket:onConnection', _type:'CLOG', about: `A socket is connecting from ${req.connection.remoteAddress}:${req.connection.remotePort}.`},'\n,')
|
console.info(
|
||||||
|
{
|
||||||
|
_at: new Date().toJSON(),
|
||||||
|
_from: 'basesocket:onConnection',
|
||||||
|
_type: 'CLOG',
|
||||||
|
about: `A socket is connecting from ${req.connection.remoteAddress}:${req.connection.remotePort}.`,
|
||||||
|
},
|
||||||
|
'\n,'
|
||||||
|
)
|
||||||
|
|
||||||
// socket.isAlive = true
|
// socket.isAlive = true
|
||||||
// socket.on('pong', function() { this.isAlive = true })
|
// socket.on('ping', () => { socket.isAlive = true }) // Most WebSocket server implementations, including the ws library, automatically respond to ping frames with pong frames. However, the server can listen for ping events using socket.on('ping', ...) if it wants to perform additional actions.
|
||||||
|
|
||||||
socket.on('message', (data) => {
|
socket.on('message', (data) => {
|
||||||
// 在这里统一分发消息
|
|
||||||
let dataObj
|
let dataObj
|
||||||
try {
|
try {
|
||||||
dataObj = JSON.parse(data)
|
dataObj = JSON.parse(data)
|
||||||
if (dataObj?.skevent === 'PING') {
|
if (dataObj?.skevent === 'PING') {
|
||||||
|
// socket.isAlive = true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
//console.log({ _at: new Date().toJSON(), _from: 'basesocket:onMessage', _type: 'CLOG', usid: socket.usid, skevent: dataObj?.skevent }, '\n,')
|
//console.log({ _at: new Date().toJSON(), _from: 'basesocket:onMessage', _type: 'CLOG', usid: socket.usid, skevent: dataObj?.skevent }, '\n,')
|
||||||
@ -31,48 +42,77 @@ module.exports = {
|
|||||||
console.error({ _at: new Date().toJSON(), _from: 'basesocket:onMessage', _type: 'CERROR', about: 'Unable to parse socket message', data }, '\n,')
|
console.error({ _at: new Date().toJSON(), _from: 'basesocket:onMessage', _type: 'CERROR', about: 'Unable to parse socket message', data }, '\n,')
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
console.log('WebSocket-onMessage: dataObj', dataObj)
|
||||||
if (['SOCKET_OWNER', 'SOCKET_OWNER_RECONNECT'].includes(dataObj.skevent)) {
|
if (['SOCKET_OWNER', 'SOCKET_OWNER_RECONNECT'].includes(dataObj.skevent)) {
|
||||||
// 前端断线重连时,并不会自动再次提供 _passtoken。在前端的initSocket时,应当把_passtoken送过来,而后台则对_passtoken做验证后再加socketPool。
|
// 前端断线重连时,并不会自动再次提供 _passtoken。在前端的initSocket时,应当把_passtoken送过来。
|
||||||
dataObj._passtokenSource = webtoken.verifyToken(dataObj._passtoken)
|
dataObj._passtokenSource = webtoken.verifyToken(dataObj._passtoken)
|
||||||
|
console.log('WebSocket-onMessge: _passtokenSource', dataObj._passtokenSource)
|
||||||
if (typeof dataObj._passtokenSource?.usid === 'string') {
|
if (typeof dataObj._passtokenSource?.usid === 'string') {
|
||||||
my.socketPool[dataObj._passtokenSource.usid] = socket
|
|
||||||
socket.usid = dataObj._passtokenSource.usid
|
socket.usid = dataObj._passtokenSource.usid
|
||||||
// console.log({
|
socket.skid = socket.skid || dataObj._passtokenSource.clid || 'skid' + crypto.randomBytes(16).toString('hex') // 注意,skid 这个名字 仅限在本文件内使用,在外部都使用 clid (client id)
|
||||||
// _at: new Date().toJSON(), _from: 'basesocket:onMessage', _type: 'CLOG',
|
// my.socketPool[socket.skid] = socket
|
||||||
// skevent: dataObj.skevent,
|
console.log(
|
||||||
// usid: dataObj._passtokenSource.usid,
|
{
|
||||||
// socketPool: Object.keys(my.socketPool)?.length,
|
_at: new Date().toJSON(),
|
||||||
// socketClients: my.wsServer.clients.size
|
_from: 'basesocket:onMessage',
|
||||||
// }, '\n,')
|
_type: 'CLOG',
|
||||||
|
skevent: dataObj.skevent,
|
||||||
|
usid: socket.usid,
|
||||||
|
skid: socket.skid,
|
||||||
|
// socketPool: Object.keys(my.socketPool),
|
||||||
|
clientsSize: my.wsServer.clients.size,
|
||||||
|
},
|
||||||
|
'\n,'
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
|
|
||||||
const listeners = my.listeners[dataObj.skevent] || []
|
const listeners = my.listeners[dataObj.skevent] || []
|
||||||
for (const listener of listeners) {
|
for (const listener of listeners) {
|
||||||
listener(dataObj)
|
listener(dataObj)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// const heartbeat = setInterval(() => {
|
|
||||||
// my.wsServer.clients.forEach((socket) => {
|
|
||||||
// if (socket.isAlive === false) return socket.terminate()
|
|
||||||
// socket.isAlive = false
|
|
||||||
// socket.ping(function() { wo.cclog('👉 ASS: sent Ping') })
|
|
||||||
// })
|
|
||||||
// }, 60000)
|
|
||||||
|
|
||||||
socket.on('close', () => {
|
socket.on('close', () => {
|
||||||
//console.log({_at:new Date().toJSON(), _from: 'basesocket:onClose', _type:'CLOG', usid: socket?.usid},'\n,') // don't know why, but this output happens too often without usid.
|
console.log({ _at: new Date().toJSON(), _from: 'basesocket:onClose', _type: 'CLOG', usid: socket?.usid, skid: socket?.skid }, '\n,') // don't know why, but this output happens too often without usid.
|
||||||
delete my.socketPool[socket?.usid]
|
//delete my.socketPool[socket?.usid] // 20240917 恍然大悟,同一个用户可以多次登录,多个socket会互相覆盖。如果仅仅根据 usid 来删除,其他尚在连接的socket也会被删了。
|
||||||
// clearInterval(heartbeat)
|
// delete my.socketPool[socket?.skid]
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
my.wsServer.on('close', () => {})
|
||||||
|
|
||||||
|
// 一个全局的 heartbeat,不必给每个 socket 一个 heartbeat
|
||||||
|
if (heartbeat) {
|
||||||
|
console.log('WebSocket_heartbeat: starting...')
|
||||||
|
setInterval(() => {
|
||||||
|
my.wsServer.clients.forEach((socket) => {
|
||||||
|
if (socket.readyState !== ws.OPEN) {
|
||||||
|
//socket.isAlive = false
|
||||||
|
console.log('WebSocket_heartbeat: not open', { usid: socket.usid, skid: socket.skid, readyState: socket.readyState })
|
||||||
|
} else {
|
||||||
|
//socket.ping()
|
||||||
|
console.log('WebSocket_heartbeat: ping', { usid: socket.usid, skid: socket.skid, readyState: socket.readyState })
|
||||||
|
}
|
||||||
|
})
|
||||||
|
console.log('WebSocket_heartbeat: clientsSize =', my.wsServer.clients.size)
|
||||||
|
}, heartbeatInterval || my.heartbeatInterval)
|
||||||
|
}
|
||||||
|
|
||||||
return this
|
return this
|
||||||
},
|
},
|
||||||
|
|
||||||
removeUserSocket (usid) {
|
removeUserSocket (data = {}) {
|
||||||
delete my.socketPool[usid]
|
if (typeof data === 'string') {
|
||||||
|
// delete my.socketPool[usid]
|
||||||
|
} else if (typeof data === 'object') {
|
||||||
|
my.wsServer.clients.forEach((socket) => {
|
||||||
|
if (data.clid && socket.skid === data.clid) {
|
||||||
|
delete socket.skid
|
||||||
|
delete socket.usid
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
addListener (skevent, listener) {
|
addListener (skevent, listener) {
|
||||||
@ -85,38 +125,49 @@ module.exports = {
|
|||||||
},
|
},
|
||||||
|
|
||||||
sendToAll (dataObj) {
|
sendToAll (dataObj) {
|
||||||
|
console.log('sendToAll: dataObj =', dataObj)
|
||||||
my.wsServer.clients.forEach((socket) => {
|
my.wsServer.clients.forEach((socket) => {
|
||||||
//if (socket.readyState === socket.OPEN) {
|
|
||||||
try {
|
try {
|
||||||
|
console.log('sendToOne: socket', { usid: socket.usid, skid: socket.skid })
|
||||||
|
if (socket.readyState === ws.OPEN) {
|
||||||
socket.send(typeof dataObj !== 'string' ? JSON.stringify(dataObj) : dataObj)
|
socket.send(typeof dataObj !== 'string' ? JSON.stringify(dataObj) : dataObj)
|
||||||
|
} else {
|
||||||
|
console.warn({ _at: new Date().toJSON(), _from: 'Socket:sendToOne', _type: 'CWARN', msg: 'sendToOne: socket not open', dataObj }, '\n,')
|
||||||
|
}
|
||||||
} catch (expt) {
|
} catch (expt) {
|
||||||
console.error(
|
console.error(
|
||||||
{
|
{
|
||||||
_at: new Date().toJSON(),
|
_at: new Date().toJSON(),
|
||||||
_from: 'Socket:sendToAll',
|
_from: 'Socket:sendToAll',
|
||||||
_type: 'CWARN',
|
_type: 'CERROR',
|
||||||
msg: 'sendToAll: Failed sending to unconnected socket',
|
msg: 'sendToAll: Failed sending to unconnected socket',
|
||||||
dataObj,
|
dataObj,
|
||||||
usid: socket.usid,
|
usid: socket.usid,
|
||||||
|
skid: socket.skid,
|
||||||
},
|
},
|
||||||
'\n,'
|
'\n,'
|
||||||
)
|
)
|
||||||
delete my.socketPool[socket.usid]
|
// delete my.socketPool[socket.skid]
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
|
|
||||||
sendToOne (dataObj, usid) {
|
sendToOne (dataObj) {
|
||||||
const socket = my.socketPool[usid]
|
console.log('sendToOne: dataObj =', dataObj)
|
||||||
// if (socket && socket.readyState === socket.OPEN) { // 20240916 注意到,前端显示仍然连接着 socket,但后台却判断为不是 OPEN。所以把 if 改成 try.
|
my.wsServer.clients.forEach((socket) => {
|
||||||
|
console.log('sendToOne: socket', { usid: socket.usid, skid: socket.skid })
|
||||||
|
if ((dataObj.clid && socket.skid === dataObj.clid) || (dataObj.usid && socket.usid === dataObj.usid)) {
|
||||||
try {
|
try {
|
||||||
|
if (socket.readyState === ws.OPEN) {
|
||||||
socket.send(typeof dataObj !== 'string' ? JSON.stringify(dataObj) : dataObj)
|
socket.send(typeof dataObj !== 'string' ? JSON.stringify(dataObj) : dataObj)
|
||||||
} catch (expt) {
|
} else {
|
||||||
console.error(
|
console.warn({ _at: new Date().toJSON(), _from: 'Socket:sendToOne', _type: 'CWARN', msg: 'sendToOne: socket not open', dataObj }, '\n,')
|
||||||
{ _at: new Date().toJSON(), _from: 'Socket:sendToOne', _type: 'CWARN', msg: 'sendToOne: Failed sending to unconnected socket', dataObj, usid },
|
|
||||||
'\n,'
|
|
||||||
)
|
|
||||||
delete my.socketPool[usid]
|
|
||||||
}
|
}
|
||||||
|
} catch (expt) {
|
||||||
|
console.error({ _at: new Date().toJSON(), _from: 'Socket:sendToOne', _type: 'CERROR', msg: 'sendToOne: Failed sending to socket', dataObj }, '\n,')
|
||||||
|
// delete my.socketPool[socket.skid]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user