Skip to content

节点控制平面

字数
963 字
阅读时间
5 分钟

概述

Gateway 的节点控制平面提供节点注册、心跳监控、安全配对、权限管理和能力声明等功能,支持多节点分布式部署。

节点注册

节点类型与状态

节点类型:

类型说明
gateway网关节点
worker工作节点
channel频道节点

节点状态:

状态说明
online在线
busy忙碌
idle空闲
offline离线
draining排水中(不接受新任务)

注册与查找

python
from nbot.gateway.nodes.registry import NodeRegistry

registry = NodeRegistry()

# 注册节点
registry.register(node_id="gw-1", node_type="gateway", version="1.0.0")

# 查找最佳节点(优先空闲、低负载)
best = registry.find_best_node(node_type="worker")

# 健康检查(标记超时节点为 offline)
registry.check_health()

心跳管理

HeartbeatManager 接收节点心跳,更新注册表,检测离线节点。

python
from nbot.gateway.nodes.heartbeat import HeartbeatManager, HeartbeatPayload

manager = HeartbeatManager(
    registry=registry,
    interval_seconds=30,     # 节点上报间隔
    timeout_seconds=120,     # 超时阈值
    check_interval=15,       # 检查间隔
)

# 心跳载荷
payload = HeartbeatPayload(
    node_id="gw-1",
    timestamp=datetime.now(),
    status="online",
    load=0.3,
    tasks_completed=100,
    tasks_failed=2,
)

节点配对

安全的一次性配对流程:

节点请求配对 → Gateway 生成 8 位一次性码 → 管理员审批 → 颁发 token
python
from nbot.gateway.nodes.pairing import PairingManager

manager = PairingManager()

# 发起配对
request = manager.create_request(node_id="gw-2", node_type="worker")

# 审批
manager.approve(request.request_id)  # 颁发 token
# 或拒绝
manager.reject(request.request_id, reason="未授权节点")

配对状态:

状态说明
PENDING等待审批
APPROVED已批准
REJECTED已拒绝
COMPLETED已完成(token 已颁发)
EXPIRED已过期(默认 300 秒)

权限管理

权限范围(PermissionScope)

18 种权限,按类别分组:

类别权限
adminadmin(全部权限)
gatewaygateway.manage, gateway.read
eventsevents.subscribe, events.query, events.publish
channelschannels.manage, channels.read, channels.send
queuequeue.manage, queue.read
workerworker.manage, worker.read
nodenode.register, node.heartbeat, node.manage, node.read

层级关系:admin 包含所有权限,manage 包含 read + write

Token 管理

python
from nbot.gateway.nodes.permissions import PermissionChecker

checker = PermissionChecker()

# 颁发 token
token = checker.issue_token(
    token_type="node",
    scopes=["node.heartbeat", "events.subscribe"],
    node_id="gw-1",
)

# 验证权限
checker.check_permission(token.token_id, "events.subscribe")

# 撤销 token
checker.revoke_token(token.token_id)

能力声明

通道能力(ChannelCapability)

27 种能力,涵盖:

  • 消息收发: webhook.receive, message.send, message.edit, message.delete
  • 富文本: rich_text, markdown
  • 媒体: image, file, voice, video
  • 互动: reactions, keyboards
  • 群组: group.manage, group.members
  • 会话: conversation.create, conversation.history

工具能力(ToolCapability)

16 种能力,涵盖:

  • 搜索、代码执行、计算器
  • 图片生成、TTS/STT、翻译
  • 天气/地图/新闻/股票
  • 文件操作、数据库/知识库搜索

WebSocket 控制面

客户端命令

命令说明
node.register注册节点
node.unregister注销节点
node.heartbeat上报心跳
gateway.stats查询统计
gateway.health健康检查
event.subscribe订阅事件
event.unsubscribe取消订阅
event.query查询事件历史
pairing.create发起配对
pairing.approve审批配对
pairing.reject拒绝配对
pairing.list列出待审批

服务端推送

事件说明
event.published事件通知
node.online节点上线
node.offline节点离线
node.status_changed节点状态变化
system.notification系统通知
command.response命令响应
error错误通知

事件总线(EventBus)

内部发布/订阅总线,支持通配符匹配:

python
from nbot.gateway.bus.event_bus import get_event_bus

bus = get_event_bus()

# 订阅(支持通配符)
@bus.on("event.received.*")
async def on_event(data):
    print(f"Event received: {data}")

# 一次性订阅
bus.once("node.online", lambda data: print("Node online!"))

# 查询历史
history = bus.query_history(
    topic="event.received",
    since=datetime.now() - timedelta(hours=1),
)

26 个预定义 topic,覆盖:网关生命周期、事件处理阶段、队列事件、Worker 事件、节点事件、安全事件。

错误体系

所有异常继承自 GatewayError(message, code, status_code)

异常CodeHTTP
UnknownChannelErrorunknown_channel404
DisabledChannelErrordisabled_channel403
SecurityVerificationError可配置401
InvalidSignatureErrorinvalid_signature401
InvalidTokenErrorinvalid_token401
TimestampExpiredErrortimestamp_expired401
ReplayDetectedErrorreplay_detected401
RateLimitedErrorrate_limited429
DuplicatedMessageErrorduplicated200
ParseFailedErrorparse_failed400
DispatchFailedErrordispatch_failed502
DeliveryFailedErrordelivery_failed502

页面历史