使用集群将 Socket.IO 扩展到多个 Node.js 进程

IT技术 javascript node.js redis socket.io node-redis
2021-01-15 22:51:26

用这个把我的头发扯掉……有没有人设法将Socket.IO 扩展到由 Node.js 的集群module产生的多个“工作”进程

假设我有以下四个工作进程(伪):

// on the server
var express = require('express');
var server = express();
var socket = require('socket.io');
var io = socket.listen(server);

// socket.io
io.set('store', new socket.RedisStore);

// set-up connections...
io.sockets.on('connection', function(socket) {

  socket.on('join', function(rooms) {
    rooms.forEach(function(room) {
      socket.join(room);
    });
  });

  socket.on('leave', function(rooms) {
    rooms.forEach(function(room) {
      socket.leave(room);
    });
  });

});

// Emit a message every second
function send() {
  io.sockets.in('room').emit('data', 'howdy');
}

setInterval(send, 1000);

然后在浏览器...

// on the client
socket = io.connect();
socket.emit('join', ['room']);

socket.on('data', function(data){
  console.log(data);
});

问题:由于四个单独的工作进程发送消息,我每秒都会收到四条消息。

如何确保消息只发送一次?

4个回答

编辑:在 Socket.IO 1.0+ 中,现在可以使用更简单的 Redis 适配器module,而不是设置具有多个 Redis 客户端的存储。

var io = require('socket.io')(3000);
var redis = require('socket.io-redis');
io.adapter(redis({ host: 'localhost', port: 6379 }));

下面显示的示例看起来更像这样:

var cluster = require('cluster');
var os = require('os');

if (cluster.isMaster) {
  // we create a HTTP server, but we do not use listen
  // that way, we have a socket.io server that doesn't accept connections
  var server = require('http').createServer();
  var io = require('socket.io').listen(server);
  var redis = require('socket.io-redis');

  io.adapter(redis({ host: 'localhost', port: 6379 }));

  setInterval(function() {
    // all workers will receive this in Redis, and emit
    io.emit('data', 'payload');
  }, 1000);

  for (var i = 0; i < os.cpus().length; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  }); 
}

if (cluster.isWorker) {
  var express = require('express');
  var app = express();

  var http = require('http');
  var server = http.createServer(app);
  var io = require('socket.io').listen(server);
  var redis = require('socket.io-redis');

  io.adapter(redis({ host: 'localhost', port: 6379 }));
  io.on('connection', function(socket) {
    socket.emit('data', 'connected to worker: ' + cluster.worker.id);
  });

  app.listen(80);
}

如果您有一个主节点需要发布到其他 Socket.IO 进程,但本身不接受套接字连接,请使用socket.io-emitter而不是socket.io-redis

如果您在扩展时遇到问题,请使用DEBUG=*. Socket.IO 现在实现了debug,它也会打印出 Redis 适配器调试消息。示例输出:

socket.io:server initializing namespace / +0ms
socket.io:server creating engine.io instance with opts {"path":"/socket.io"} +2ms
socket.io:server attaching client serving req handler +2ms
socket.io-parser encoding packet {"type":2,"data":["event","payload"],"nsp":"/"} +0ms
socket.io-parser encoded {"type":2,"data":["event","payload"],"nsp":"/"} as 2["event","payload"] +1ms
socket.io-redis ignore same uid +0ms

如果您的主进程和子进程都显示相同的解析器消息,那么您的应用程序正在正确扩展。


如果您是从单个工作人员发出的,那么您的设置应该没有问题。您正在做的是从所有四个工作人员发出,并且由于 Redis 发布/订阅,消息不会重复,而是按照您要求应用程序的方式写入四次。这是Redis所做的简单图表:

Client  <--  Worker 1 emit -->  Redis
Client  <--  Worker 2  <----------|
Client  <--  Worker 3  <----------|
Client  <--  Worker 4  <----------|

如您所见,当您从一个 worker 发出时,它会将发出的内容发布到 Redis,并且会从其他订阅了 Redis 数据库的 worker 镜像。这也意味着您可以使用多个连接到同一个实例的套接字服务器,并且一个服务器上的发射将在所有连接的服务器上被触发。

使用集群,当客户端连接时,它将连接到您的四个工作人员之一,而不是全部四个。这也意味着您从该工作人员发出的任何内容都只会向客户端显示一次。所以是的,应用程序正在扩展,但是你这样做的方式,你从所有四个工作人员发出,Redis 数据库使它好像你在一个工作人员上调用它四次。如果客户端实际上连接到所有四个套接字实例,他们将每秒接收 16 条消息,而不是 4 条。

套接字处理的类型取决于您将拥有的应用程序类型。如果您要单独处理客户端,那么您应该没有问题,因为连接事件只会为每个客户端的一个工作人员触发。如果您需要全局“心跳”,那么您可以在主进程中有一个套接字处理程序。由于工人在主进程死亡时死亡,因此您应该抵消主进程的连接负载,并让子进程处理连接。下面是一个例子:

var cluster = require('cluster');
var os = require('os');

if (cluster.isMaster) {
  // we create a HTTP server, but we do not use listen
  // that way, we have a socket.io server that doesn't accept connections
  var server = require('http').createServer();
  var io = require('socket.io').listen(server);

  var RedisStore = require('socket.io/lib/stores/redis');
  var redis = require('socket.io/node_modules/redis');

  io.set('store', new RedisStore({
    redisPub: redis.createClient(),
    redisSub: redis.createClient(),
    redisClient: redis.createClient()
  }));

  setInterval(function() {
    // all workers will receive this in Redis, and emit
    io.sockets.emit('data', 'payload');
  }, 1000);

  for (var i = 0; i < os.cpus().length; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  }); 
}

if (cluster.isWorker) {
  var express = require('express');
  var app = express();

  var http = require('http');
  var server = http.createServer(app);
  var io = require('socket.io').listen(server);

  var RedisStore = require('socket.io/lib/stores/redis');
  var redis = require('socket.io/node_modules/redis');

  io.set('store', new RedisStore({
    redisPub: redis.createClient(),
    redisSub: redis.createClient(),
    redisClient: redis.createClient()
  }));

  io.sockets.on('connection', function(socket) {
    socket.emit('data', 'connected to worker: ' + cluster.worker.id);
  });

  app.listen(80);
}

在示例中,有五个 Socket.IO 实例,一个是主实例,四个是子实例。主服务器从不调用,listen()因此该进程没有连接开销。但是,如果您在主进程上调用一个发射,它将被发布到 Redis,四个工作进程将在它们的客户端上执行发射。这会抵消工作人员的连接负载,如果工作人员死亡,您的主要应用程序逻辑将在主服务器中保持不变。

请注意,使用 Redis,即使在命名空间或房间中的所有发出都将由其他工作进程处理,就好像您从该进程触发了发出一样。换句话说,如果您有两个 Socket.IO 实例和一个 Redis 实例,则调用emit()第一个工作线程中的套接字会将数据发送到其客户端,而第二工作线程将执行相同的操作,就像您从该工作线程调用发射一样。

仅供参考:它不再适用于 socket.io > 1.0。必须使用 redis 适配器。socket.io/docs/using-multiple-nodes我还没有成功得到一个使用集群和 socket.io 1.1.0 运行的例子。
2021-03-13 22:51:26
@DerM 我都没有。运行 socket.io 1.3.5,我没有发现任何有效的东西。添加粘性会话,更改 HAProxy 配置......这些都无法让套接字与集群一起工作。
2021-03-16 22:51:26
好答案。谢谢!起到了一定的作用。当我发出 io.sockets.emit('userstreamssock', postid); 从主人那里,我没有从工人那里得到它。不知道为什么。
2021-04-05 22:51:26
我在前端遇到错误... socket.io.min.js:2 GET http://localhost:3000/socket.io/?EIO=3&transport=polling&t=LYqSrsK 404 (Not Found)
2021-04-06 22:51:26
我为 Socket.IO 1.0+ 添加了一个示例,并在 1.3.5 上对其进行了测试。请注意,对于主节点,应该使用socket.io-emitter,因为它是一个非侦听过程,但为了使答案更简单,我省略了它。
2021-04-09 22:51:26

让 master 处理您的心跳(下面的示例)或在内部启动不同端口上的多个进程,并使用 nginx(它还支持 V1.3 以上的 websockets)对它们进行负载平衡。

集群与主

// on the server
var express = require('express');
var server = express();
var socket = require('socket.io');
var io = socket.listen(server);
var cluster = require('cluster');
var numCPUs = require('os').cpus().length;

// socket.io
io.set('store', new socket.RedisStore);

// set-up connections...
io.sockets.on('connection', function(socket) {
    socket.on('join', function(rooms) {
        rooms.forEach(function(room) {
            socket.join(room);
        });
    });

    socket.on('leave', function(rooms) {
        rooms.forEach(function(room) {
            socket.leave(room);
        });
    });

});

if (cluster.isMaster) {
    // Fork workers.
    for (var i = 0; i < numCPUs; i++) {
        cluster.fork();
    }

    // Emit a message every second
    function send() {
        console.log('howdy');
        io.sockets.in('room').emit('data', 'howdy');
    }

    setInterval(send, 1000);


    cluster.on('exit', function(worker, code, signal) {
        console.log('worker ' + worker.process.pid + ' died');
    }); 
}
当我使用var socket = require('socket.io')(1338);时,我收到此错误错误:听 EADDRINUSE :::1338 how to implementation on same !
2021-03-17 22:51:26
一个不错的建议,但它仍然只是一个主进程,负责潜在的 500,000 个 websocket 连接......并没有真正解决每个服务器跨多个服务器/进程的“可扩展性”问题
2021-03-23 22:51:26
怎么样:使用 2 层负载均衡器。AWS 示例:第一层使用弹性负载均衡器在多台机器之间分配工作负载。第二层在机器上的多个实例之间分配工作负载。您可以运行 cpu.count 节点实例并通过 nginx 或使用节点集群将工作负载分配给它们(在这种情况下不需要 nginx)。我更喜欢 nginx 版本。对于自动扩展,请使用 OpsWork 并让它根据 CPU 负载处理您的扩展。它会自动添加和删除机器,并且非常容易设置。
2021-04-03 22:51:26

这实际上看起来像 Socket.IO 在扩展方面取得了成功。您希望来自一台服务器的消息发送到该房间中的所有套接字,而不管它们碰巧连接到哪台服务器。

最好的办法是让一个主进程每秒发送一条消息。例如,您可以通过仅运行它来做到这一点cluster.isMaster

我的目标实际上是弄清楚如何做到这一点,但要规模化。现在,对于 10,000 个客户来说,它根本不征税……但是如果是 100 万呢?我正在构建的应用程序具有大量 Web 套接字连接,可用于相当高需求的统计应用程序,并且该 API 可以在短时间内轻松达到 1000 万个以上的套接字事务/天。我只是想准备好在必要时扩展它 - 仍然不确定如何在 1 个服务器、1 个进程模型之外做到这一点。
2021-03-13 22:51:26
@Lee 您希望它使用什么逻辑来决定是否“复制”消息?当您向房间发送消息时,它会发送给房间中的每个人 - 这是预期的行为。如果您希望每个进程按时间间隔发送消息,您可以为每个进程留出一个空间。
2021-03-19 22:51:26
它成功地“共享”了套接字,但未能确定哪些消息不重复。集群是一个好主意,但它并不是真正的“扩展”……它是一个管理 4 个工作的进程
2021-03-22 22:51:26
@Lee 通常使用 Socket.IO 的方式是,在一台服务器上发生的某些事件(例如,http 请求)会向房间触发消息。您希望这条消息会发送给房间中的每个人,而不仅仅是碰巧连接到同一台服务器的人。“一个进程管理 4 人的工作” - 我不确定你的实际逻辑是什么,但每秒发送一条消息不会很费力。
2021-04-10 22:51:26
我想更好的逻辑是 socket.emit 以某种方式跨进程同步。不知道如何实现。当 10 台不同的服务器各有 4 个内核时,“每个进程一个房间”的方法并不能解决可扩展性问题……但是当只涉及一台服务器时,这可能是一个好主意。
2021-04-12 22:51:26

进程间通信不足以使 socket.io 1.4.5 与集群一起工作。强制websocket模式也是必须的。请参阅Node.JS 中的 WebSocket 握手、Socket.IO 和集群不起作用