mediasoup-demo server源码分析
mediasoup-demo server源码分析
0. 目录
- mediasoup-demo是什么?
- mediasoup-demo代码结构
- mediasoup-demo server代码分析
- config.js代码解析
- server.js主要逻辑
- Room.js具体代码解析
相关文章:
1. mediasoup-demo是什么?
- mediasoup-demo是一个使用mediasoup库实现的视频会议应用程序的示例代码。
- mediasoup-demo示例应用程序提供了一个基于WebRTC的视频会议解决方案,包括房间管理、媒体流管理、网络传输和音视频编解码等功能,同时提供了前端和后端代码示例,方便进行学习和参考。
- 其中mediasoup是一个开源的WebRTC信令和媒体服务器,用于构建实时音视频通信应用程序。
2. mediasoup-demo代码结构
- mediasoup-demo代码结构图如下:


- 其中各目录或文件作用如下:
| 目录 | 下一级目录或文件 | 作用 |
|---|---|---|
| app | 客户端代码 | |
| broadcasters | 广播,推流或者拉流 | |
| server | 服务端Demo | |
| server.js | 服务端Demo主程序 | |
| config.js | 配置文件 | |
| cert | 证书及秘钥 | |
| connect.js | 对后面的interactiveClient.js文件进行封装 | |
| lib | server.js使用的库文件 | |
| Logger.js | 打印日志 | |
| Room.js | 房间管理及信令处理 | |
| interactiveClient.js | 运行时内部信息查询客户端 | |
| interactiveServer.js | 运行时内部信息查询服务端 |
- 重点关注server目录文件内容。
3. mediasoup-demo server代码分析
- mediasoup-demo项目中server目录实现了视频会议应用程序的后端功能,包括与mediasoup的集成、WebSocket通信、房间管理、日志记录等功能。

1. config.js代码解析
- config.js是配置文件,用于定义mediasoup的配置和服务器的端口等参数。
- 配置文件如下:
/**
* IMPORTANT (PLEASE READ THIS):
*
* This is not the "configuration file" of mediasoup. This is the configuration
* file of the mediasoup-demo app. mediasoup itself is a server-side library, it
* does not read any "configuration file". Instead it exposes an API. This demo
* application just reads settings from this file (once copied to config.js) and
* calls the mediasoup API with those settings when appropriate.
*/
const os = require('os');
module.exports =
{
// Listening hostname (just for `gulp live` task).
domain : process.env.DOMAIN || 'localhost',
// Signaling settings (protoo WebSocket server and HTTP API server).
https : //
{
listenIp : '0.0.0.0',
// NOTE: Don't change listenPort (client app assumes 4443).
listenPort : process.env.PROTOO_LISTEN_PORT || 4443,
// NOTE: Set your own valid certificate files.
tls :
{
cert : process.env.HTTPS_CERT_FULLCHAIN || `${__dirname}/certs/fullchain.pem`,
key : process.env.HTTPS_CERT_PRIVKEY || `${__dirname}/certs/privkey.pem`
}
},
// mediasoup settings.
mediasoup :
{
// Number of mediasoup workers to launch.
numWorkers : Object.keys(os.cpus()).length,
// mediasoup WorkerSettings.
// See https://mediasoup.org/documentation/v3/mediasoup/api/#WorkerSettings
workerSettings :
{
logLevel : 'warn',
logTags :
[
'info',
'ice',
'dtls',
'rtp',
'srtp',
'rtcp',
'rtx',
'bwe',
'score',
'simulcast',
'svc',
'sctp'
],
rtcMinPort : process.env.MEDIASOUP_MIN_PORT || 40000,
rtcMaxPort : process.env.MEDIASOUP_MAX_PORT || 49999
},
// mediasoup Router options.
// See https://mediasoup.org/documentation/v3/mediasoup/api/#RouterOptions
routerOptions :
{
mediaCodecs :
[
{
kind : 'audio',
mimeType : 'audio/opus',
clockRate : 48000,
channels : 2
},
{
kind : 'video',
mimeType : 'video/VP8',
clockRate : 90000,
parameters :
{
'x-google-start-bitrate' : 1000
}
},
{
kind : 'video',
mimeType : 'video/VP9',
clockRate : 90000,
parameters :
{
'profile-id' : 2,
'x-google-start-bitrate' : 1000
}
},
{
kind : 'video',
mimeType : 'video/h264',
clockRate : 90000,
parameters :
{
'packetization-mode' : 1,
'profile-level-id' : '4d0032',
'level-asymmetry-allowed' : 1,
'x-google-start-bitrate' : 1000
}
},
{
kind : 'video',
mimeType : 'video/h264',
clockRate : 90000,
parameters :
{
'packetization-mode' : 1,
'profile-level-id' : '42e01f',
'level-asymmetry-allowed' : 1,
'x-google-start-bitrate' : 1000
}
}
]
},
// mediasoup WebRtcServer options for WebRTC endpoints (mediasoup-client,
// libmediasoupclient).
// See https://mediasoup.org/documentation/v3/mediasoup/api/#WebRtcServerOptions
// NOTE: mediasoup-demo/server/lib/Room.js will increase this port for
// each mediasoup Worker since each Worker is a separate process.
webRtcServerOptions :
{
listenInfos :
[
{
protocol : 'udp',
ip : process.env.MEDIASOUP_LISTEN_IP || '0.0.0.0',
announcedIp : process.env.MEDIASOUP_ANNOUNCED_IP,
port : 44444
},
{
protocol : 'tcp',
ip : process.env.MEDIASOUP_LISTEN_IP || '0.0.0.0',
announcedIp : process.env.MEDIASOUP_ANNOUNCED_IP,
port : 44444
}
],
},
// mediasoup WebRtcTransport options for WebRTC endpoints (mediasoup-client,
// libmediasoupclient).
// See https://mediasoup.org/documentation/v3/mediasoup/api/#WebRtcTransportOptions
webRtcTransportOptions :
{
// listenIps is not needed since webRtcServer is used.
// However passing MEDIASOUP_USE_WEBRTC_SERVER=false will change it.
listenIps :
[
{
ip : process.env.MEDIASOUP_LISTEN_IP || '0.0.0.0',
announcedIp : process.env.MEDIASOUP_ANNOUNCED_IP
}
],
initialAvailableOutgoingBitrate : 1000000,
minimumAvailableOutgoingBitrate : 600000,
maxSctpMessageSize : 262144,
// Additional options that are not part of WebRtcTransportOptions.
maxIncomingBitrate : 1500000
},
// mediasoup PlainTransport options for legacy RTP endpoints (FFmpeg,
// GStreamer).
// See https://mediasoup.org/documentation/v3/mediasoup/api/#PlainTransportOptions
plainTransportOptions :
{
listenIp :
{
ip : process.env.MEDIASOUP_LISTEN_IP || '0.0.0.0',
announcedIp : process.env.MEDIASOUP_ANNOUNCED_IP
},
maxSctpMessageSize : 262144
}
}
};
- config.js配置信息注释:
| 父字段 | 子字段 | 作用 |
|---|---|---|
| https | ||
| listenIp | 服务器监听的IP地址 | |
| listenPort | 服务器监听的端口 | |
| mediasoup | ||
| workerSettings | mediasoup的Worker配置参数,包括worker的数量、Worker进程的文件路径、logLevel等 | |
| routerOptions | mediasoup的Router配置参数,包括媒体编解码器配置参数 | |
| webRtcServerOptions | mediasoup的WebRtcServer配置参数,包括监听协议、IP和端口 | |
| webRtcTransportOptions | mediasoup的WebRtcTransport配置参数,包括UDP/TCP转发的监听端口、最大带宽、最大数据包等 | |
| plainTransportOptions | mediasoup的PlainTransport配置参数,包括UDP/TCP转发的监听端口、最大带宽、最大数据包等 |
2. server.js代码解析
- server.js是mediasoup-demo应用程序的核心部分,负责启动和管理各个组件,以及处理音视频数据的传输和处理。
1. server.js主要逻辑
- 加载配置文件:读取config.js文件中的配置参数,包括HTTP和WebSocket服务器的端口号、mediasoup Worker进程的数量等等。
- 启动HTTP服务器:创建一个HTTP服务器,并在指定端口上监听请求。
- HTTP服务器主要用于提供静态资源,例如前端页面、CSS和JS文件等。
- 启动WebSocket服务器:创建一个WebSocket服务器,并在指定端口上监听连接请求。
- WebSocket服务器用于处理实时音视频数据的传输。
- 启动mediasoup Worker进程:根据配置文件中的worker.numWorkers参数,创建指定数量的mediasoup Worker进程。
- 每个Worker进程负责管理mediasoup Router对象和Transport对象,以及处理音视频流的转发和处理。
- 处理WebSocket连接:当有新的WebSocket连接建立时,server.js会创建一个新的mediasoup Router对象,并将其绑定到一个指定的mediasoup Worker进程上。
- 同时,server.js会为该WebSocket连接创建一个Transport对象,并将其绑定到相应的mediasoup Router对象上。
- 这样,WebSocket客户端就可以通过Transport对象和mediasoup Router对象进行音视频流的传输和处理。
- 处理HTTP请求:当有HTTP请求到达时,server.js会根据请求路径返回相应的静态资源。
- 例如,请求"/"路径时返回index.html页面。
- 错误处理:server.js还负责处理各种错误,包括HTTP和WebSocket服务器的错误、mediasoup Worker进程的错误以及WebSocket客户端的错误等等。
- 当出现错误时,server.js会将错误信息记录到日志文件中,并尝试恢复或重新启动相关的服务或进程。
- 当出现错误时,server.js会将错误信息记录到日志文件中,并尝试恢复或重新启动相关的服务或进程。
2. server.js具体代码解析
- server目录中server.js是服务端应用程序的入口文件,负责启动应用程序、创建HTTP和WebSocket服务器以及管理mediasoup Worker进程。
- 其中run()函数是server.js入口函数:
async function run()
{
// 开启交互式服务器
await interactiveServer();
// 如果设置了环境变量 INTERACTIVE 为 true 或 1,则开启交互式客户端
if (process.env.INTERACTIVE === 'true' || process.env.INTERACTIVE === '1')
await interactiveClient();
// 运行 mediasoup worker
await runMediasoupWorkers();
// 创建 Express 应用
await createExpressApp();
// 运行 HTTPS 服务器
await runHttpsServer();
// 运行 protoo WebSocket 服务器
await runProtooWebSocketServer();
// 每 X 秒记录一次房间的状态
setInterval(() =>
{
for (const room of rooms.values())
{
room.logStatus();
}
}, 120000);
}
1. runMediasoupWorkers() 函数解析
- mediasoup-demo中,mediasoup Worker是mediasoup的核心部分,它们负责处理音视频流并管理mediasoup的底层资源。
- 由于mediasoup Worker的创建和销毁比较耗时,因此在mediasoup-demo中会创建多个mediasoup Worker来提高系统的处理能力。
- 在runMediasoupWorkers() 函数中,可以创建多个mediasoup Worker,并对其进行管理和监控,从而保证mediasoup-demo的正常运行。
- 具体代码和注释如下:
/**
* Launch as many mediasoup Workers as given in the configuration file.
*/
async function runMediasoupWorkers()
{
// 从配置文件中获取mediasoup的配置项numWorkers,即需要启动的mediasoup Worker数量。
const { numWorkers } = config.mediasoup;
logger.info('running %d mediasoup Workers...', numWorkers);
// 使用mediasoup.createWorker()创建指定数量的mediasoup Worker,并将其存储到全局数组mediasoupWorkers中。
for (let i = 0; i < numWorkers; ++i)
{
const worker = await mediasoup.createWorker(
{
logLevel : config.mediasoup.workerSettings.logLevel,
logTags : config.mediasoup.workerSettings.logTags,
rtcMinPort : Number(config.mediasoup.workerSettings.rtcMinPort),
rtcMaxPort : Number(config.mediasoup.workerSettings.rtcMaxPort)
});
// 每个mediasoup Worker创建成功后,为其添加一个died事件监听器,当Worker死亡时自动退出进程。
// 该事件监听器通过setTimeout()设置了一个2秒的定时器,定时器到期后退出进程。
worker.on('died', () =>
{
logger.error(
'mediasoup Worker died, exiting in 2 seconds... [pid:%d]', worker.pid);
setTimeout(() => process.exit(1), 2000);
});
mediasoupWorkers.push(worker);
// 对于每个mediasoup Worker,如果配置文件中的MEDIASOUP_USE_WEBRTC_SERVER环境变量的值不为false,则在其中创建一个WebRtcServer。
// Create a WebRtcServer in this Worker.
if (process.env.MEDIASOUP_USE_WEBRTC_SERVER !== 'false')
{
// Each mediasoup Worker will run its own WebRtcServer, so those cannot
// share the same listening ports. Hence we increase the value in config.js
// for each Worker.
// 为了避免多个mediasoup Worker之间端口冲突,配置文件中WebRtcServer的监听端口需要逐个增加,这样每个mediasoup Worker创建的
// WebRtcServer监听端口才不会相同。因此,在为每个mediasoup Worker创建WebRtcServer时,会逐个增加WebRtcServer配置中的端口号。
const webRtcServerOptions = utils.clone(config.mediasoup.webRtcServerOptions);
const portIncrement = mediasoupWorkers.length - 1;
for (const listenInfo of webRtcServerOptions.listenInfos)
{
listenInfo.port += portIncrement;
}
const webRtcServer = await worker.createWebRtcServer(webRtcServerOptions);
//每个mediasoup Worker创建的WebRtcServer将被存储到其appData属性中。
worker.appData.webRtcServer = webRtcServer;
}
// Log worker resource usage every X seconds.
// 每个mediasoup Worker将会定时记录其资源使用情况,并使用setInterval()函数设置一个时间间隔,
// 即每隔一段时间输出一次mediasoup Worker的资源使用情况。
setInterval(async () =>
{
const usage = await worker.getResourceUsage();
logger.info('mediasoup Worker resource usage [pid:%d]: %o', worker.pid, usage);
}, 120000);
}
}
2. mediasoup.createWorker() 函数解析
- runMediasoupWorkers() 函数中mediasoup.createWorker()方法是mediasoup库提供的一个异步方法,用于创建mediasoup Worker实例,其具体实现是在mediasoup项目的node/src/index.ts代码中。
- 在mediasoup-demo v3版本中,mediasoup.createWorker()方法被调用多次,每次调用都会创建一个Worker实例,并把该实例推入全局变量mediasoupWorkers数组中。
- 在v3版本的实现中,每个Worker实例将拥有自己的独立的mediasoup Router实例,从而保证不同的房间使用独立的Router实例进行媒体处理,提高系统的可扩展性和稳定性。
- 同时,每个Worker实例都会在其对应的端口上创建一个WebRtcServer实例,用于管理WebRTC客户端的连接。
- 下面是mediasoup.createWorker() 方法的详细解析:
- options: Object类型,表示创建Worker实例的选项。该选项包括:
- logLevel: String类型,表示Worker实例的日志级别。默认值为warn。
- logTags: Array类型,表示Worker实例的日志标签。默认值为[‘info’, ‘ice’, ‘dtls’, ‘rtp’, ‘srtp’, ‘rtcp’, ‘rtx’, ‘bwe’, ‘score’, ‘simulcast’, ‘svc’]。
- rtcMinPort: Number类型,表示Worker实例使用的最小UDP端口。默认值为10000。
- rtcMaxPort: Number类型,表示Worker实例使用的最大UDP端口。默认值为59999。
- dtlsCertificateFile 和 dtlsPrivateKeyFile 是 DTLS 的证书和私钥文件的路径。
- 如果未设置这些文件的路径,则 mediasoup 将使用默认的证书和私钥。
- libwebrtcFieldTrials 是 Google WebRTC 库中的实验性功能。
- appData 可以用于存储与 worker 实例相关的应用程序数据。
- 如果设置了 appData,则其必须是一个对象。默认情况下,appData 为 undefined。
- 返回值: Promise类型,表示创建的Worker实例对象。
- options: Object类型,表示创建Worker实例的选项。该选项包括:
/**
* Create a Worker.
*/
export async function createWorker(
{
logLevel = 'error',
logTags,
rtcMinPort = 10000,
rtcMaxPort = 59999,
dtlsCertificateFile,
dtlsPrivateKeyFile,
libwebrtcFieldTrials,
appData
}: WorkerSettings = {}
): Promise<Worker>
{
// 在函数中打印了一个调试信息,表示正在创建一个 Worker 实例。
logger.debug('createWorker()');
// 函数检查了 appData 是否是一个对象。如果 appData 存在但不是对象,则抛出 TypeError 异常。
if (appData && typeof appData !== 'object')
throw new TypeError('if given, appData must be an object');
// 创建了一个新的 Worker 实例,将配置对象作为参数传递给该实例的构造函数。
const worker = new Worker(
{
logLevel,
logTags,
rtcMinPort,
rtcMaxPort,
dtlsCertificateFile,
dtlsPrivateKeyFile,
libwebrtcFieldTrials,
appData
});
// createWorker 函数返回一个 Promise,当 worker 进程启动成功时会调用 resolve 回调函数,并将创建的 Worker 实例作为参数传递进去;
// 如果 worker 进程启动失败,则会调用 reject 回调函数。
return new Promise((resolve, reject) =>
{
worker.on('@success', () =>
{
// Emit observer event.
// 在 resolve 回调函数中,会发射一个 newworker 事件,通知 mediasoup 监听该事件的观察者,表示创建了一个新的 worker 进程。
observer.safeEmit('newworker', worker);
resolve(worker);
});
worker.on('@failure', reject);
});
}
- 其中,new Worker会创建一个 Worker 工作进程,并在工作进程与主进程之间建立IPC通道,从而实现进程间通信。
- Worker 用于处理mediasoup的核心功能,如RTP流传输,房间管理等。
- Worker 用于处理mediasoup的核心功能,如RTP流传输,房间管理等。
3. runHttpsServer() 函数解析
- runHttpsServer() 函数的主要功能是创建和运行一个基于 HTTPS 的 Web 服务器,并监听配置文件中指定的IP地址和端口。
- 具体代码和注释如下:
/**
* Create a Node.js HTTPS server. It listens in the IP and port given in the
* configuration file and reuses the Express application as request listener.
*/
async function runHttpsServer()
{
logger.info('running an HTTPS server...');
// HTTPS server for the protoo WebSocket server.
// 定义了tls的对象,用于存储HTTPS服务器所需的TLS证书和密钥
const tls =
{
cert : fs.readFileSync(config.https.tls.cert),
key : fs.readFileSync(config.https.tls.key)
};
// 创建一个HTTPS服务器实例,并将tls对象和expressApp作为参数传递进去。
httpsServer = https.createServer(tls, expressApp);
// 监听HTTPS服务器的端口和IP地址,并返回一个Promise。
await new Promise((resolve) =>
{
httpsServer.listen(
Number(config.https.listenPort), config.https.listenIp, resolve);
});
}
4. runProtooWebSocketServer() 函数解析
- runProtooWebSocketServer() 函数主要用于启动一个 WebSocket 服务器,来允许浏览器通过WebSocket连接到服务器。
/**
* Create a protoo WebSocketServer to allow WebSocket connections from browsers.
*/
async function runProtooWebSocketServer()
{
logger.info('running protoo WebSocketServer...');
// Create the protoo WebSocket server.
// 创建一个 WebSocket 服务器 protooWebSocketServer
protooWebSocketServer = new protoo.WebSocketServer(httpsServer,
{
maxReceivedFrameSize : 960000, // 960 KBytes.
maxReceivedMessageSize : 960000,
fragmentOutgoingMessages : true,
fragmentationThreshold : 960000
});
// Handle connections from clients.
// 注册 connectionrequest 事件的监听器,有新的 WebSocket 连接请求时被触发。
protooWebSocketServer.on('connectionrequest', (info, accept, reject) =>
{
// The client indicates the roomId and peerId in the URL query.
// 在监听器中,从连接请求的 URL 中解析出房间 ID 和 peer ID,如果解析失败或者没有这些参数,就拒绝连接请求。
const u = url.parse(info.request.url, true);
const roomId = u.query['roomId'];
const peerId = u.query['peerId'];
if (!roomId || !peerId)
{
reject(400, 'Connection request without roomId and/or peerId');
return;
}
// 从URL中解析出消费者实例的数量 consumerReplicas,如果解析失败则将其设置为默认值 0。
let consumerReplicas = Number(u.query['consumerReplicas']);
if (isNaN(consumerReplicas))
{
consumerReplicas = 0;
}
logger.info(
'protoo connection request [roomId:%s, peerId:%s, address:%s, origin:%s]',
roomId, peerId, info.socket.remoteAddress, info.origin);
// Serialize this code into the queue to avoid that two peers connecting at
// the same time with the same roomId create two separate rooms with same
// roomId.
// 为了避免同时有两个连接请求使用相同的roomId创建两个不同的房间,使用了一个queue队列将代码序列化。
// 在队列中,调用 getOrCreateRoom() 函数获取或创建一个房间,并返回一个 Room 实例。
// 如果房间创建或加入失败,就会在 catch 代码块中将错误信息返回给客户端,并拒绝连接请求。
queue.push(async () =>
{
const room = await getOrCreateRoom({ roomId, consumerReplicas });
// Accept the protoo WebSocket connection.
// 如果任务执行成功,使用 accept() 接受 WebSocket 连接
const protooWebSocketTransport = accept();
room.handleProtooConnection({ peerId, protooWebSocketTransport });
})
.catch((error) =>
{
logger.error('room creation or room joining failed:%o', error);
reject(error);
});
});
}
3. Room.js代码解析
- Room.js实现了应用程序中单个房间的管理逻辑,负责管理房间成员的加入/离开、生产者/消费者的创建和管理等一系列操作。
1. Room.js具体代码解析
1. create() 函数解析
- create() 函数创建一个新的Room实例。
- 具体代码和注释如下:
/**
* Factory function that creates and returns Room instance.
*
* @async
*
* @param {mediasoup.Worker} mediasoupWorker - The mediasoup Worker in which a new
* mediasoup Router must be created.
* @param {String} roomId - Id of the Room instance.
*/
static async create({ mediasoupWorker, roomId, consumerReplicas })
{
logger.info('create() [roomId:%s]', roomId);
// Create a protoo Room instance.
// 创建了一个新的protoo Room实例,protoo是一种基于WebSocket的信令协议,用于在客户端和服务端之间传递信令消息。
const protooRoom = new protoo.Room();
// Router media codecs.
//根据配置文件中的mediasoup router选项,使用mediasoup worker创建一个mediasoup Router实例
const { mediaCodecs } = config.mediasoup.routerOptions;
// Create a mediasoup Router.
const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs });
// Create a mediasoup AudioLevelObserver.
// 使用mediasoup Router实例,创建一个mediasoup AudioLevelObserver实例,用于观察媒体流中的音频级别。
// 这个观察者的作用是用来检测房间中正在说话的用户,以便对其进行相关操作,如推送媒体流等。
const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver(
{
maxEntries : 1,
threshold : -80,
interval : 800
});
// Create a mediasoup ActiveSpeakerObserver.
// 使用mediasoup Router实例上,创建一个mediasoup ActiveSpeakerObserver实例,用于观察房间中当前发言者的情况。
// 这个观察者的作用是用来检测房间中的活跃发言者,以便在客户端中实时显示。
const activeSpeakerObserver = await mediasoupRouter.createActiveSpeakerObserver();
// 最后,使用同样的mediasoup Router实例,创建一个Bot实例,Bot是一个虚拟的用户,用于在房间中扮演一个音频流的源。
// 它的主要作用是在测试和演示中使用,例如在没有实际用户的情况下测试房间的性能等。
const bot = await Bot.create({ mediasoupRouter });
// 最终返回一个新的Room实例,包含上述创建的protoo Room实例、mediasoup Router实例、以及用于观察音频级别和活跃发言者的mediasoup观察者实例等信息。
return new Room(
{
roomId,
protooRoom,
webRtcServer : mediasoupWorker.appData.webRtcServer,
mediasoupRouter,
audioLevelObserver,
activeSpeakerObserver,
consumerReplicas,
bot
});
}
2. handleProtooConnection() 函数解析
- handleProtooConnection() 函数用于管理与客户端之间的连接,确保每个连接都是唯一的,并且能够处理客户端发送的请求并返回相应的结果。
- 在server.js中runProtooWebSocketServer() 函数创建一个 WebSocket 服务器后,这个服务器会注册 connectionrequest 事件的监听器,有新的 WebSocket 连接请求时被触发,最终会调用到handleProtooConnection() 函数。
- 具体代码和注释如下:
/**
* Called from server.js upon a protoo WebSocket connection request from a
* browser.
*
* @param {String} peerId - The id of the protoo peer to be created.
* @param {Boolean} consume - Whether this peer wants to consume from others.
* @param {protoo.WebSocketTransport} protooWebSocketTransport - The associated
* protoo WebSocket transport.
*/
handleProtooConnection({ peerId, consume, protooWebSocketTransport })
{
// 为了确保每个连接都是唯一的,根据peerId查找是否已经存在同名的protoo Peer对象,如果存在,则关闭该protoo Peer对象。
const existingPeer = this._protooRoom.getPeer(peerId);
if (existingPeer)
{
logger.warn(
'handleProtooConnection() | there is already a protoo Peer with same peerId, closing it [peerId:%s]',
peerId);
existingPeer.close();
}
// 如果不存在同名的protoo Peer对象,则创建一个新的protoo Peer对象。
// 并通过peer.data对象存储相关数据,如是否已经加入房间、用户名称等等。
// 同时,也创建了几个map对象,用于存储房间中的生产者、消费者、数据生产者和数据消费者对象。
let peer;
// Create a new protoo Peer with the given peerId.
try
{
peer = this._protooRoom.createPeer(peerId, protooWebSocketTransport);
}
catch (error)
{
logger.error('protooRoom.createPeer() failed:%o', error);
}
// Use the peer.data object to store mediasoup related objects.
// Not joined after a custom protoo 'join' request is later received.
peer.data.consume = consume;
peer.data.joined = false;
peer.data.displayName = undefined;
peer.data.device = undefined;
peer.data.rtpCapabilities = undefined;
peer.data.sctpCapabilities = undefined;
// Have mediasoup related maps ready even before the Peer joins since we
// allow creating Transports before joining.
peer.data.transports = new Map();
peer.data.producers = new Map();
peer.data.consumers = new Map();
peer.data.dataProducers = new Map();
peer.data.dataConsumers = new Map();
// 监听protoo Peer对象的request事件,当有请求到达时,执行_handleProtooRequest函数来处理请求,并返回相应的结果。
// 如果出现异常,则通过reject函数返回错误信息。
peer.on('request', (request, accept, reject) =>
{
logger.debug(
'protoo Peer "request" event [method:%s, peerId:%s]',
request.method, peer.id);
this._handleProtooRequest(peer, request, accept, reject)
.catch((error) =>
{
logger.error('request failed:%o', error);
reject(error);
});
});
// 当一个protoo连接关闭时,触发protoo Peer对象的close事件。
peer.on('close', () =>
{
// 首先判断房间是否已经关闭,如果已经关闭则直接返回,避免重复关闭。
if (this._closed)
return;
logger.debug('protoo Peer "close" event [peerId:%s]', peer.id);
// If the Peer was joined, notify all Peers.
// 如果该Peer已经加入房间,通知所有其他Peers该Peer已经关闭。
if (peer.data.joined)
{
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
{
otherPeer.notify('peerClosed', { peerId: peer.id })
.catch(() => {});
}
}
// Iterate and close all mediasoup Transport associated to this Peer, so all
// its Producers and Consumers will also be closed.
// 遍历所有该Peer创建的Transport并关闭,这样该Peer创建的所有Producers和Consumers都将被关闭。
for (const transport of peer.data.transports.values())
{
transport.close();
}
// If this is the latest Peer in the room, close the room.
// 判断该Peer是否是房间中的最后一个Peer,如果是,则关闭整个房间。
if (this._protooRoom.peers.length === 0)
{
logger.info(
'last Peer in the room left, closing the room [roomId:%s]',
this._roomId);
this.close();
}
});
}
3. _handleProtooRequest() 函数解析
- _handleProtooRequest 函数是处理客户端通过 WebSocket 连接发送的消息。
- _handleProtooRequest 函数定义如下:
async _handleProtooRequest(peer, request, accept, reject)
- 参数说明:
- peer:发送消息的客户端连接。
- request:客户端发送的消息内容。
- accept:回调函数,用于向客户端发送响应消息。
- reject:回调函数,用于向客户端发送错误响应消息。
- request 参数是客户端发送的消息内容,其包含了 request.method 和 request.data 两个字段。
- 其中 request.method 表示消息类型,request.data 表示消息携带的数据。
- 根据 request.method 的不同,将消息分配到不同的处理函数中。
- 消息类型总有有22种,分别的含义为:
| 信令 | 说明 |
|---|---|
| getRouterRtpCapabilities | 客户端请求获取 mediasoupRouter 的 rtpCapabilities,即媒体流传输的相关能力参数。 |
| join | 客户端加入房间 |
| createWebRtcTransport | 创建 WebRTC 传输对象并将其存储到 peer.data.transports 中,以供后续的音视频传输使用。 |
| connectWebRtcTransport | 在建立 WebRtcTransport 之后,客户端使用本地生成的 DTLS 参数连接 WebRtcTransport 时发送 |
| restartIce | 重新启动WebRtcTransport中的ICE Agent,从而获取新的ICE Candidates并更新连接 |
| produce | 创建一个新的音频或视频的 Producer 并与客户端关联 |
| closeProducer | 关闭一个指定的生产者,并从 peer.data.producers Map 中删除该生产者 |
| pauseProducer | 尝试暂停一个指定的生产者 |
| resumeProducer | 尝试恢复一个指定的生产者 |
| pauseConsumer | 尝试暂停一个指定的消费者 |
| resumeConsumer | 尝试恢复一个指定的消费者 |
| setConsumerPreferredLayers | 尝试设置指定消费者的首选图层 |
| setConsumerPriority | 尝试设置消费者优先级(priority) |
| requestConsumerKeyFrame | 尝试获取某个消费者的关键帧(Key Frame) |
| produceData | 创建数据生产者,并存储在 peer.data.dataProducers Map的数据对象中 |
| changeDisplayName | 处理客户端请求更改显示名称 |
| getTransportStats | 获取某个transport的统计信息 |
| getProducerStats | 获取某个消费者的统计信息 |
| getDataProducerStats | 获取某个数据生产者的统计信息 |
| getDataConsumerStats | 获取某个数据消费者的统计信息 |
| applyNetworkThrottle | 修改网络带宽和延迟模拟网络限制 |
| resetNetworkThrottle | 重置网络限制 |
- 具体代码解析和注释如下:
/**
* Handle protoo requests from browsers.
*
* @async
*/
async _handleProtooRequest(peer, request, accept, reject)
{
switch (request.method)
{
// 客户端请求获取 mediasoupRouter 的 rtpCapabilities,即媒体流传输的相关能力参数。
case 'getRouterRtpCapabilities':
{
accept(this._mediasoupRouter.rtpCapabilities);
break;
}
case 'join':
{
// Ensure the Peer is not already joined.
// 检查该客户端是否已经加入了房间。如果已经加入了,就抛出异常。
if (peer.data.joined)
throw new Error('Peer already joined');
// 从 request.data 中获取客户端传入的 displayName,device,rtpCapabilities 和 sctpCapabilities 参数,
// 并将这些参数保存到该客户端对象的 data 属性中。
const {
displayName,
device,
rtpCapabilities,
sctpCapabilities
} = request.data;
// Store client data into the protoo Peer data object.
peer.data.joined = true;
peer.data.displayName = displayName;
peer.data.device = device;
peer.data.rtpCapabilities = rtpCapabilities;
peer.data.sctpCapabilities = sctpCapabilities;
// Tell the new Peer about already joined Peers.
// And also create Consumers for existing Producers.
// 告诉新 Peer 关于已加入房间的 Peer, 并为现有的 Producer 创建 Consumer。
const joinedPeers =
[
...this._getJoinedPeers(),
...this._broadcasters.values()
];
// Reply now the request with the list of joined peers (all but the new one).
// 回复请求并返回除了新加入的 Peer 外的所有已加入的 Peer 列表。
// 即获取已经加入房间的其他客户端对象和广播者对象,然后将这些对象的信息打包成一个列表,
// 发送给新加入的客户端,告诉他有哪些其他客户端和广播者已经在房间中。
const peerInfos = joinedPeers
.filter((joinedPeer) => joinedPeer.id !== peer.id)
.map((joinedPeer) => ({
id : joinedPeer.id,
displayName : joinedPeer.data.displayName,
device : joinedPeer.data.device
}));
accept({ peers: peerInfos });
// Mark the new Peer as joined.
// 将该客户端对象标记为已经加入房间。
peer.data.joined = true;
for (const joinedPeer of joinedPeers)
{
// Create Consumers for existing Producers.
// 为现有的 Producer 创建 Consumer。
// 即对于已经加入房间的其他客户端对象,为其创建对应的消费者对象,并将新加入的客户端对象标记为其对应的消费者。
for (const producer of joinedPeer.data.producers.values())
{
this._createConsumer(
{
consumerPeer : peer,
producerPeer : joinedPeer,
producer
});
}
// Create DataConsumers for existing DataProducers.
// 为现有的 DataProducer 创建 DataConsumer。
// 即对于已经加入房间的其他客户端对象,为其创建对应的数据消费者对象,并将新加入的客户端对象标记为其对应的数据消费者。
for (const dataProducer of joinedPeer.data.dataProducers.values())
{
// 排除 bot DataProducer,即如果数据生产者的标签是 'bot',则不会创建数据消费者。
if (dataProducer.label === 'bot')
continue;
this._createDataConsumer(
{
dataConsumerPeer : peer,
dataProducerPeer : joinedPeer,
dataProducer
});
}
}
// Create DataConsumers for bot DataProducer.
// 为 bot DataProducer 创建 DataConsumer。
this._createDataConsumer(
{
dataConsumerPeer : peer,
dataProducerPeer : null,
dataProducer : this._bot.dataProducer
});
// Notify the new Peer to all other Peers.
// 将新加入的客户端对象的信息通知给其他已经加入房间的客户端对象,告诉它们新客户端的信息。
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
{
otherPeer.notify(
'newPeer',
{
id : peer.id,
displayName : peer.data.displayName,
device : peer.data.device
})
.catch(() => {});
}
break;
}
case 'createWebRtcTransport':
{
// NOTE: Don't require that the Peer is joined here, so the client can
// initiate mediasoup Transports and be ready when he later joins.
// 从请求数据中解析出所需的参数
const {
forceTcp,
producing,
consuming,
sctpCapabilities
} = request.data;
// 设置 WebRtcTransport 的选项
const webRtcTransportOptions =
{
...config.mediasoup.webRtcTransportOptions,
enableSctp : Boolean(sctpCapabilities),
numSctpStreams : (sctpCapabilities || {}).numStreams,
appData : { producing, consuming }
};
// 如果设置了 forceTcp,则使用 TCP 连接,否则使用 UDP 连接
if (forceTcp)
{
webRtcTransportOptions.enableUdp = false;
webRtcTransportOptions.enableTcp = true;
}
// 创建 WebRtcTransport 对象
const transport = await this._mediasoupRouter.createWebRtcTransport(
{
...webRtcTransportOptions,
webRtcServer : this._webRtcServer
});
// 监听一些事件,用于处理异常情况和跟踪传输状态
transport.on('sctpstatechange', (sctpState) =>
{
logger.debug('WebRtcTransport "sctpstatechange" event [sctpState:%s]', sctpState);
});
transport.on('dtlsstatechange', (dtlsState) =>
{
if (dtlsState === 'failed' || dtlsState === 'closed')
logger.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s]', dtlsState);
});
// NOTE: For testing.
// await transport.enableTraceEvent([ 'probation', 'bwe' ]);
// 启用一些跟踪事件
await transport.enableTraceEvent([ 'bwe' ]);
transport.on('trace', (trace) =>
{
logger.debug(
'transport "trace" event [transportId:%s, trace.type:%s, trace:%o]',
transport.id, trace.type, trace);
if (trace.type === 'bwe' && trace.direction === 'out')
{
peer.notify(
'downlinkBwe',
{
desiredBitrate : trace.info.desiredBitrate,
effectiveDesiredBitrate : trace.info.effectiveDesiredBitrate,
availableBitrate : trace.info.availableBitrate
})
.catch(() => {});
}
});
// Store the WebRtcTransport into the protoo Peer data Object.
// 将创建的 WebRtcTransport 对象存储到 peer.data.transports 中
peer.data.transports.set(transport.id, transport);
// 发送响应消息,包含一些重要参数
accept(
{
id : transport.id,
iceParameters : transport.iceParameters,
iceCandidates : transport.iceCandidates,
dtlsParameters : transport.dtlsParameters,
sctpParameters : transport.sctpParameters
});
const { maxIncomingBitrate } = config.mediasoup.webRtcTransportOptions;
// If set, apply max incoming bitrate limit.
// 如果设置了 maxIncomingBitrate,则将其应用于创建的 WebRtcTransport 对象
if (maxIncomingBitrate)
{
try { await transport.setMaxIncomingBitrate(maxIncomingBitrate); }
catch (error) {}
}
break;
}
case 'connectWebRtcTransport':
{
// 从请求中获取 transportId 和 dtlsParameters。
const { transportId, dtlsParameters } = request.data;
// 从 Peer 数据对象中获取 ID 为 transportId 的 WebRtcTransport 对象。
const transport = peer.data.transports.get(transportId);
// 如果没有找到对应的 WebRtcTransport,则抛出一个错误。
if (!transport)
throw new Error(`transport with id "${transportId}" not found`);
// 调用 transport.connect() 建立一个 DTLS 连接。
await transport.connect({ dtlsParameters });
// 发送 accept 消息给客户端,表明连接建立成功。
accept();
break;
}
case 'restartIce':
{
// 从请求中获取transportId,并通过该id从peer.data.transports中获取transport对象。
const { transportId } = request.data;
const transport = peer.data.transports.get(transportId);
// 如果找不到该transport对象,则抛出错误。
if (!transport)
throw new Error(`transport with id "${transportId}" not found`);
// 否则,调用transport的restartIce方法重新启动ICE Agent,并返回新的ICE Parameters。
const iceParameters = await transport.restartIce();
// 将新的ICE Parameters传递给accept方法返回给客户端。
accept(iceParameters);
break;
}
case 'produce':
{
// Ensure the Peer is joined.
// 首先需要检查客户端是否已经加入到 Room 中,如果没有加入则抛出异常。
if (!peer.data.joined)
throw new Error('Peer not yet joined');
// 解析客户端传来的请求数据,包括 transportId(传输 ID)、kind(音频还是视频)、
// rtpParameters(RTP 参数)和 appData(应用数据)。
const { transportId, kind, rtpParameters } = request.data;
let { appData } = request.data;
// 从 peer 对象中获取传输对象,如果不存在则抛出异常。
const transport = peer.data.transports.get(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" not found`);
// Add peerId into appData to later get the associated Peer during
// the 'loudest' event of the audioLevelObserver.
// 将 peerId 加入到 appData 中以便稍后在 audioLevelObserver(音频级别观察器)的“loudest”的事件中获取关联的 Peer。
appData = { ...appData, peerId: peer.id };
// 使用 transport.produce() 方法创建一个新的 Producer 对象。并将 Producer 存储到 peer.data.producers 中以便稍后使用。
const producer = await transport.produce(
{
kind,
rtpParameters,
appData
// keyFrameRequestDelay: 5000
});
// Store the Producer into the protoo Peer data Object.
peer.data.producers.set(producer.id, producer);
// Set Producer events.
// 设置 Producer 的事件监听器,包括 score、videoorientationchange 和 trace 事件。
// 其中,score 事件在 Producer 的“质量”改变时触发,通常用于在客户端中显示相关信息;
producer.on('score', (score) =>
{
// logger.debug(
// 'producer "score" event [producerId:%s, score:%o]',
// producer.id, score);
peer.notify('producerScore', { producerId: producer.id, score })
.catch(() => {});
});
// videoorientationchange 事件在视频流的方向改变时触发;
producer.on('videoorientationchange', (videoOrientation) =>
{
logger.debug(
'producer "videoorientationchange" event [producerId:%s, videoOrientation:%o]',
producer.id, videoOrientation);
});
// NOTE: For testing.
// await producer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]);
// await producer.enableTraceEvent([ 'pli', 'fir' ]);
// await producer.enableTraceEvent([ 'keyframe' ]);
// trace 事件用于记录调试信息。
producer.on('trace', (trace) =>
{
logger.debug(
'producer "trace" event [producerId:%s, trace.type:%s, trace:%o]',
producer.id, trace.type, trace);
});
// 向客户端发送 accept 响应,携带创建的 Producer 对象的 id。
accept({ id: producer.id });
// Optimization: Create a server-side Consumer for each Peer.
// 为每个已经加入 Room 的 Peer 创建一个新的 Consumer 对象。这是为了让每个客户端都能接收到新创建的 Producer 产生的音视频流。
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
{
this._createConsumer(
{
consumerPeer : otherPeer,
producerPeer : peer,
producer
});
}
// Add into the AudioLevelObserver and ActiveSpeakerObserver.
// 如果创建的 Producer 是音频流,则还会将其添加到 _audioLevelObserver 和 _activeSpeakerObserver 中,
// 这两个对象都是用来监控音频流的。_audioLevelObserver 用于监测音频的音量级别,_activeSpeakerObserver
// 用于监测当前说话人的身份。当有新的音频流加入时,需要将其添加到这两个对象中以进行监控。
if (producer.kind === 'audio')
{
this._audioLevelObserver.addProducer({ producerId: producer.id })
.catch(() => {});
this._activeSpeakerObserver.addProducer({ producerId: producer.id })
.catch(() => {});
}
break;
}
case 'closeProducer':
{
// Ensure the Peer is joined.
// 确保对等端已经加入房间,如果对等端未加入房间,则会抛出错误。这是为了确保对等端具有足够的权限来关闭生产者。
if (!peer.data.joined)
throw new Error('Peer not yet joined');
// 从 peer.data.producers Map 中查找具有相应 producerId 的生产者对象。如果没有找到该生产者对象,则会抛出错误。
const { producerId } = request.data;
const producer = peer.data.producers.get(producerId);
if (!producer)
throw new Error(`producer with id "${producerId}" not found`);
// 关闭生产者。
producer.close();
// Remove from its map.
// 从 peer.data.producers Map 中删除该生产者对象,以确保不再使用该生产者对象。
peer.data.producers.delete(producer.id);
// 向客户端发送成功响应
accept();
break;
}
case 'pauseProducer':
{
// Ensure the Peer is joined.
// 确保对等端已经加入房间,如果对等端未加入房间,则会抛出错误。这是为了确保对等端具有足够的权限来暂停生产者。
if (!peer.data.joined)
throw new Error('Peer not yet joined');
// 从 peer.data.producers Map 中查找具有相应 producerId 的生产者对象。如果没有找到该生产者对象,则会抛出错误。
const { producerId } = request.data;
const producer = peer.data.producers.get(producerId);
if (!producer)
throw new Error(`producer with id "${producerId}" not found`);
// 暂停生产者,此时,生产者将停止发送媒体流,但不会关闭生产者对象。
// await 关键字来等待 producer.pause() 方法完成后再继续执行。
await producer.pause();
// 向客户端发送成功响应。
accept();
break;
}
case 'resumeProducer':
{
// Ensure the Peer is joined.
// 保对等端已经加入房间,如果对等端未加入房间,则会抛出错误。这是为了确保对等端具有足够的权限来恢复生产者。
if (!peer.data.joined)
throw new Error('Peer not yet joined');
// 从 peer.data.producers Map 中查找具有相应 producerId 的生产者对象。如果没有找到该生产者对象,则会抛出错误。
const { producerId } = request.data;
const producer = peer.data.producers.get(producerId);
if (!producer)
throw new Error(`producer with id "${producerId}" not found`);
// 恢复生产者,此时,生产者将重新开始发送媒体流。
await producer.resume();
// 向客户端发送成功响应。
accept();
break;
}
case 'pauseConsumer':
{
// Ensure the Peer is joined.
// 确保对等端已经加入房间,如果对等端未加入房间,则会抛出错误。这是为了确保对等端具有足够的权限来暂停消费者。
if (!peer.data.joined)
throw new Error('Peer not yet joined');
// 从 peer.data.consumers Map 中查找具有相应 consumerId 的消费者对象。如果没有找到该消费者对象,则会抛出错误。
const { consumerId } = request.data;
const consumer = peer.data.consumers.get(consumerId);
if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`);
// 暂停消费者,此时,消费者将停止接收媒体流。
await consumer.pause();
// 向客户端发送成功响应。
accept();
break;
}
case 'resumeConsumer':
{
// Ensure the Peer is joined.
// 确保对等端已经加入房间,如果对等端未加入房间,则会抛出错误。这是为了确保对等端具有足够的权限来恢复消费者。
if (!peer.data.joined)
throw new Error('Peer not yet joined');
// 从 peer.data.consumers Map 中查找具有相应 consumerId 的消费者对象。如果没有找到该消费者对象,则会抛出错误。
const { consumerId } = request.data;
const consumer = peer.data.consumers.get(consumerId);
if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`);
// 找到了该消费者对象,则调用其 resume() 方法来恢复消费者。此时,消费者将重新开始接收媒体流。
await consumer.resume();
// 向客户端发送成功响应。
accept();
break;
}
case 'setConsumerPreferredLayers':
{
// Ensure the Peer is joined.
// 确保对等端已经加入房间,如果对等端未加入房间,则会抛出错误。这是为了确保对等端具有足够的权限来设置消费者的首选图层。
if (!peer.data.joined)
throw new Error('Peer not yet joined');
// 检查传入的请求中是否包含消费者的 consumerId、spatialLayer 和 temporalLayer。
// 然后,函数会从 peer.data.consumers Map 中查找具有相应 consumerId 的消费者对象。如果没有找到该消费者对象,则会抛出错误。
const { consumerId, spatialLayer, temporalLayer } = request.data;
const consumer = peer.data.consumers.get(consumerId);
if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`);
// 设置消费者的首选图层,此时,消费者将会尝试根据指定的首选图层来选择合适的媒体流。
await consumer.setPreferredLayers({ spatialLayer, temporalLayer });
// 向客户端发送成功响应。
accept();
break;
}
case 'setConsumerPriority':
{
// Ensure the Peer is joined.
// 判断该 Peer 是否已经加入了房间,如果没有加入,则抛出异常
if (!peer.data.joined)
throw new Error('Peer not yet joined');
// 检查传入的请求中是否包含消费者的 consumerId、priority。
// 然后,函数会从 peer.data.consumers Map 中查找具有相应 consumerId 的消费者对象。如果没有找到该消费者对象,则会抛出错误。
const { consumerId, priority } = request.data;
const consumer = peer.data.consumers.get(consumerId);
if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`);
// 设置优先级,参数 priority,表示优先级,其取值范围为 1~10。
// 该方法会向消费者所在的生产者(Producer)发送一个 RTCP feedback message,以通知生产者当前消费者的优先级已发生变化。
await consumer.setPriority(priority);
// 向客户端发送成功响应。
accept();
break;
}
case 'requestConsumerKeyFrame':
{
// Ensure the Peer is joined.
// 确保 Peer 已经加入房间,否则抛出异常。
if (!peer.data.joined)
throw new Error('Peer not yet joined');
// 检查传入的请求中是否包含消费者的 consumerId。
// 然后,函数会从 peer.data.consumers Map 中查找具有相应 consumerId 的消费者对象。如果没有找到该消费者对象,则会抛出错误。
const { consumerId } = request.data;
const consumer = peer.data.consumers.get(consumerId);
if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`);
// 向生产者(Producer)发送请求,要求它在当前视频流中生成关键帧。
await consumer.requestKeyFrame();
// 向客户端发送成功响应。
accept();
break;
}
case 'produceData':
{
// Ensure the Peer is joined.
// 首先判断Peer是否已经加入房间,如果没有加入则抛出错误。
if (!peer.data.joined)
throw new Error('Peer not yet joined');
// 从请求中获取所需参数,包括transportId、sctpStreamParameters、label、protocol和appData。
const {
transportId,
sctpStreamParameters,
label,
protocol,
appData
} = request.data;
// 根据transportId获取Transport对象,如果不存在则抛出错误。
const transport = peer.data.transports.get(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" not found`);
// 调用transport对象的produceData方法创建数据生产者,传入sctpStreamParameters、label、protocol和appData等参数。
const dataProducer = await transport.produceData(
{
sctpStreamParameters,
label,
protocol,
appData
});
// Store the Producer into the protoo Peer data Object.
// 将创建的数据生产者存储在Peer对象的dataProducers属性中,使用dataProducer.id作为key。
peer.data.dataProducers.set(dataProducer.id, dataProducer);
// 使用accept方法向客户端返回数据生产者的id。
accept({ id: dataProducer.id });
// 根据数据生产者的label值进行相应的处理:
switch (dataProducer.label)
{
// 如果是"chat",则为每个已加入的Peer创建一个数据消费者。
case 'chat':
{
// Create a server-side DataConsumer for each Peer.
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
{
this._createDataConsumer(
{
dataConsumerPeer : otherPeer,
dataProducerPeer : peer,
dataProducer
});
}
break;
}
// 如果是"bot",则将数据生产者传递给bot进行处理。
case 'bot':
{
// Pass it to the bot.
this._bot.handlePeerDataProducer(
{
dataProducerId : dataProducer.id,
peer
});
break;
}
}
break;
}
case 'changeDisplayName':
{
// Ensure the Peer is joined.
// 首先判断Peer是否已经加入房间,如果没有加入则抛出错误。
if (!peer.data.joined)
throw new Error('Peer not yet joined');
// 从请求数据中获取新的显示名称,并保存到protoo Peer的自定义数据对象中。同时,获取旧的显示名称。
const { displayName } = request.data;
const oldDisplayName = peer.data.displayName;
// Store the display name into the custom data Object of the protoo
// Peer.
// 更新peer对象中的displayName字段为新的显示名称。
peer.data.displayName = displayName;
// Notify other joined Peers.
// 通知其他已加入房间的用户,该用户的显示名称已经更改。遍历其他加入的用户并使用notify函数发送通知。
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
{
otherPeer.notify(
'peerDisplayNameChanged',
{
peerId : peer.id,
displayName,
oldDisplayName
})
.catch(() => {});
}
// 向客户端发送成功响应。
accept();
break;
}
case 'getTransportStats':
{
// 从request.data中获取transportId,然后从peer.data.transports中查找对应的transport对象。如果找不到,将会抛出一个异常。
const { transportId } = request.data;
const transport = peer.data.transports.get(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" not found`);
// 获取统计信息,并将返回的结果作为accept函数的参数进行回复给客户端。
const stats = await transport.getStats();
accept(stats);
break;
}
case 'getProducerStats':
{
const { producerId } = request.data;
const producer = peer.data.producers.get(producerId);
if (!producer)
throw new Error(`producer with id "${producerId}" not found`);
const stats = await producer.getStats();
accept(stats);
break;
}
case 'getConsumerStats':
{
// 从请求数据中获取要获取统计信息的消费者的ID。然后,使用该ID从peer对象的data属性中获取消费者实例。
// 如果没有找到该ID对应的消费者实例,则抛出异常。
const { consumerId } = request.data;
const consumer = peer.data.consumers.get(consumerId);
if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`);
// 获取该消费者的统计信息。
const stats = await consumer.getStats();
// 使用accept方法将获取到的统计信息作为参数返回给客户端。
accept(stats);
break;
}
case 'getDataProducerStats':
{
// 从请求的数据中获取dataProducerId。
// 通过查找peer.data.dataProducers Map中的数据,找到对应的DataProducer。如果没有找到,则抛出异常。
const { dataProducerId } = request.data;
const dataProducer = peer.data.dataProducers.get(dataProducerId);
if (!dataProducer)
throw new Error(`dataProducer with id "${dataProducerId}" not found`);
// 获取该DataProducer的统计信息。
const stats = await dataProducer.getStats();
// 将获取到的统计信息作为参数调用accept函数,向请求的Peer返回获取到的数据。
accept(stats);
break;
}
case 'getDataConsumerStats':
{
// 从请求中获取DataConsumer的ID,然后通过该ID从客户端对应的Peer对象的dataConsumers Map中获取对应的DataConsumer对象。
// 如果没有找到该对象,则抛出一个错误。
const { dataConsumerId } = request.data;
const dataConsumer = peer.data.dataConsumers.get(dataConsumerId);
if (!dataConsumer)
throw new Error(`dataConsumer with id "${dataConsumerId}" not found`);
// 使用获取到的DataConsumer对象调用getStats方法获取统计数据,并将结果通过accept方法返回给客户端。
// 统计数据包括:包的数量、字节数、丢失的包数量等等。
const stats = await dataConsumer.getStats();
accept(stats);
break;
}
case 'applyNetworkThrottle':
{
const DefaultUplink = 1000000;
const DefaultDownlink = 1000000;
const DefaultRtt = 0;
// 检查请求是否包含正确的“secret”值。如果未提供或与预期值不匹配,则服务器将拒绝该请求并返回一个403 Forbidden错误。
const { uplink, downlink, rtt, secret } = request.data;
if (!secret || secret !== process.env.NETWORK_THROTTLE_SECRET)
{
reject(403, 'operation NOT allowed, modda fuckaa');
return;
}
try
{
// 解析请求中提供的上行、下行带宽和往返延迟等参数,并将它们传递给一个名为“throttle”的模块。
// 该模块的作用是模拟网络限制的效果。
await throttle.start(
{
up : uplink || DefaultUplink,
down : downlink || DefaultDownlink,
rtt : rtt || DefaultRtt
});
// 在成功应用网络限制后,服务器会将新的带宽和延迟值记录在日志中,并返回一个成功响应给客户端。
logger.warn(
'network throttle set [uplink:%s, downlink:%s, rtt:%s]',
uplink || DefaultUplink,
downlink || DefaultDownlink,
rtt || DefaultRtt);
accept();
}
catch (error)
{
// 在发生任何错误时,服务器将记录错误并返回一个500 Internal Server Error响应给客户端。
logger.error('network throttle apply failed: %o', error);
reject(500, error.toString());
}
break;
}
case 'resetNetworkThrottle':
{
const { secret } = request.data;
// 如果 secret 值不存在或者不等于环境变量 NETWORK_THROTTLE_SECRET 的值,则拒绝请求并返回 403 错误状态码和相应的错误信息
if (!secret || secret !== process.env.NETWORK_THROTTLE_SECRET)
{
reject(403, 'operation NOT allowed, modda fuckaa');
return;
}
try
{
// 调用 throttle.stop() 方法停止网络限制
await throttle.stop({});
// 记录日志,表示网络限制已停止
logger.warn('network throttle stopped');
// 返回成功状态和空的数据对象
accept();
}
catch (error)
{
// 如果停止网络限制失败,则记录错误日志并返回 500 错误状态码和相应的错误信息
logger.error('network throttle stop failed: %o', error);
reject(500, error.toString());
}
break;
}
default:
{
logger.error('unknown request.method "%s"', request.method);
reject(500, `unknown request.method "${request.method}"`);
}
}
}