Skip to content

新增频道指南

字数
1729 字
阅读时间
8 分钟

概述

Ncatbot 使用 统一 AI 管道中间件AIPipeline)处理所有频道的 AI 请求。新增一个频道时,你只需要关注 传输层(怎么收消息、怎么发消息),管道会自动提供:

  • 知识库检索(RAG)
  • 工具调用(搜索、天气、命令执行、工作区操作等)
  • 技能系统
  • 工作区文件管理
  • 会话历史管理
  • exec_command 确认/拒绝流程

架构总览

外部平台 (Discord / 钉钉 / ... )


┌─────────────────────┐
│  ① 频道适配器        │  ChannelAdapter: parse → build_chat_request
│  channels/<name>.py │
└──────┬──────────────┘
       │  ChatRequest

┌─────────────────────┐
│  ② 管道回调          │  XxxCallbacks(PipelineCallbacks)
│  services/xxx.py    │  → 覆写 3~6 个方法
└──────┬──────────────┘


┌─────────────────────┐
│  ③ 统一管道          │  AIPipeline.process()
│  core/ai_pipeline.py│  → 附件 → 知识库 → 上下文 → AI(工具循环/流式) → 回复
└──────┬──────────────┘
       │  PipelineResult

┌─────────────────────┐
│  ④ 发送回复          │  callbacks.send_response()
└─────────────────────┘

第一步:创建频道适配器

nbot/channels/ 下创建 <name>.py,继承 BaseChannelAdapter

必须覆写

方法说明
channel_name频道唯一标识,如 "discord"
get_capabilities()声明传输层能力
build_envelope(**kwargs)构建 ChannelEnvelope(会话 ID、发送者等)

可选覆写

方法说明
normalize_inbound_message(content)归一化输入(去除 @提及、Markdown 等)
parse_xxx(raw)解析原始消息载荷的便捷方法

示例

python
# nbot/channels/discord.py

from typing import Any, Dict, Optional
from nbot.channels.base import BaseChannelAdapter, ChannelCapabilities, ChannelEnvelope

class DiscordChannelAdapter(BaseChannelAdapter):
    channel_name = "discord"

    def get_capabilities(self) -> ChannelCapabilities:
        return ChannelCapabilities(
            supports_stream=False,
            supports_progress_updates=False,
            supports_file_send=False,
            supports_stop=False,
        )

    def build_envelope(self, **kwargs) -> ChannelEnvelope:
        metadata = dict(kwargs.get("metadata") or {})
        chat_id = metadata.get("channel_id") or kwargs.get("conversation_id", "")
        return ChannelEnvelope(
            channel=self.channel_name,
            conversation_id=f"discord:{chat_id}",
            user_id=kwargs.get("user_id", ""),
            sender=kwargs.get("sender", "discord_user"),
            attachments=list(kwargs.get("attachments") or []),
            metadata=metadata,
        )

    def parse_message(self, event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """解析 Discord 消息事件 → 标准格式"""
        # 根据实际 API 格式调整
        msg = event.get("d", {})
        author = msg.get("author", {})
        content = msg.get("content", "")
        if not content:
            return None

        return {
            "channel_id": msg.get("channel_id"),
            "message_id": msg.get("id"),
            "user_id": author.get("id"),
            "sender": author.get("username", "discord_user"),
            "content": self.normalize_inbound_message(content),
            "metadata": {"guild_id": msg.get("guild_id")},
        }

能力声明说明

ChannelCapabilities 描述的是传输层能做什么,不是 AI 能力:

字段含义何时设为 True
supports_stream传输层支持流式推送WebSocket 可实时推送 token
supports_progress_updates传输层可展示进度UI 有进度卡片等组件
supports_file_send传输层可发送文件API 支持文件上传/发送
supports_stop传输层可中断生成有取消机制(如 SocketIO stop 事件)

工具调用、知识库、工作区等是管道自动提供的,不依赖能力声明

第二步:实现管道回调

nbot/services/ 下创建 xxx_service.py,实现 PipelineCallbacks 子类。

最少覆写(3 个方法 = 全套 AI 能力)

python
# nbot/services/discord_service.py

from typing import Any, Dict, List
from nbot.core.ai_pipeline import (
    PipelineContext, PipelineCallbacks, PipelineResult,
    AIPipeline, handle_tool_confirmation,
)

class DiscordCallbacks(PipelineCallbacks):

    def __init__(self, server, token, parsed):
        self.server = server
        self.token = token
        self.parsed = parsed

    # ---- 必须覆写 ----

    def get_system_prompt(self, ctx: PipelineContext) -> str:
        """返回系统提示词。"""
        return str(
            getattr(self.server, "personality", {}).get("systemPrompt") or ""
        ).strip()

    def send_response(self, ctx: PipelineContext, message: Dict[str, Any]) -> None:
        """发送回复到 Discord。"""
        send_discord_message(
            self.token,
            self.parsed["channel_id"],
            message.get("content", ""),
        )

    def get_workspace_context(self, ctx: PipelineContext) -> Dict[str, Any]:
        """提供 session_id 给工具和确认流程。"""
        return {
            "session_id": f"discord:{self.parsed['channel_id']}",
            "session_type": "discord",
        }

这 3 个方法就足够让频道拥有:知识库检索 + 工具调用 + 工作区操作 + exec 确认流程

增强覆写(加会话历史)

python
    def load_messages(self, ctx: PipelineContext) -> List[Dict[str, Any]]:
        """加载历史会话。"""
        messages = []
        system = self.get_system_prompt(ctx)
        if system:
            messages.append({"role": "system", "content": system})
        # 可从 session_store 或外部存储加载历史
        history = get_discord_history(self.parsed["channel_id"])
        messages.extend(history)
        return messages

    def save_assistant_message(
        self, ctx: PipelineContext, message: Dict[str, Any]
    ) -> None:
        """保存 AI 回复到历史。"""
        save_discord_message(self.parsed["channel_id"], message)

全部可覆写方法

方法默认行为覆写场景
load_messages(ctx)仅返回 system + 当前消息需要会话持久化
get_system_prompt(ctx)返回 ""必须覆写
save_assistant_message(ctx, msg)无操作需要会话持久化
send_response(ctx, msg)抛出 NotImplementedError必须覆写
build_model_call(ctx, tools)标准 HTTP API 调用使用自定义 API 端点
build_model_call_streaming(ctx, tools)返回 None支持流式输出
on_stream_start/chunk/end(ctx, ...)无操作支持流式输出
get_progress_reporter(ctx)返回 NoOpProgressReporter需要进度展示
on_confirmation_required(ctx, id, cmd)无操作自定义确认通知方式
check_confirmation(ctx, input)返回 None自定义确认检测(通常用共用函数)
search_knowledge(ctx, query)返回 ""自定义知识库检索逻辑
ensure_workspace(ctx)返回 ""启用工作区文件管理
get_workspace_context(ctx)返回 {}必须覆写(让工具能用)
resolve_attachment_data(ctx, att)返回 None平台特定的附件下载/解析
on_response_complete(ctx, result)无操作统计、通知、后处理

第三步:编写入口函数

python
def answer_discord_event(server, channel_config, event):
    """Discord Webhook 入口。"""
    adapter = DiscordChannelAdapter()
    parsed = adapter.parse_message(event or {})
    if not parsed:
        return {"ok": True, "ignored": True}

    # ---- 确认/拒绝检测(共性问题,用共用函数一行搞定) ----
    content = handle_tool_confirmation(
        parsed["content"],
        f"discord:{parsed['channel_id']}",
        log_prefix="Discord",
    )

    # ---- 构建管道上下文 ----
    chat_request = adapter.build_chat_request(
        conversation_id=f"discord:{parsed['channel_id']}",
        user_id=parsed["user_id"],
        content=content,
        sender=parsed.get("sender", "discord_user"),
        metadata=parsed.get("metadata", {}),
    )

    ctx = PipelineContext(chat_request=chat_request, adapter=adapter)
    callbacks = DiscordCallbacks(server, token, parsed)

    # ---- 获取工具列表 ----
    tools = None
    try:
        from nbot.services.tools import get_enabled_tools
        tools = get_enabled_tools()
    except Exception:
        pass

    # ---- 运行管道 ----
    pipeline = AIPipeline()
    result = pipeline.process(ctx, callbacks, tools=tools)

    if result.error:
        return {"ok": False, "error": result.error}
    return {"ok": True, "result": result.final_content}

第四步:注册适配器

nbot/channels/__init__.py 中添加:

python
from nbot.channels.discord import DiscordChannelAdapter
register_channel_adapter(DiscordChannelAdapter.channel_name, DiscordChannelAdapter)

第五步:挂载入口(按需)

Webhook 模式

nbot/web/routes/channels.py 添加 webhook 路由:

python
@channels_bp.route("/discord/<channel_id>/webhook", methods=["POST"])
def discord_webhook(channel_id):
    from nbot.services.discord_service import answer_discord_event
    return answer_discord_event(server, channel_config, request.json)

WebSocket / 长连接模式

nbot/services/ 下创建 xxx_ws_service.py,管理连接生命周期,收消息时调用入口函数。

预设注册

nbot/web/routes/channels.pyPRESET_CHANNELS 中添加预设,让用户可在 Web UI 界面中添加频道:

python
PRESET_CHANNELS = [
    # ... 已有预设
    {
        "id": "discord",
        "name": "Discord",
        "type": "discord",
        "transport": "webhook",
        "config": {
            "bot_token": "",
            "webhook_url": "",
        },
    },
]

完整文件清单

新增一个频道需要创建/修改的文件:

文件操作行数估计
nbot/channels/<name>.py新建 - 适配器~40 行
nbot/services/<name>_service.py新建 - 回调 + 入口~80 行
nbot/channels/__init__.py修改 - 注册+2 行
nbot/web/routes/channels.py修改 - 路由 + 预设+30 行

管道处理流程

入口函数

  ├─ handle_tool_confirmation()   ← 确认/拒绝检测

  └─ pipeline.process(ctx, callbacks)

       ├─ Phase 1: 附件解析
       │    → callbacks.resolve_attachment_data()

       ├─ Phase 2: 知识库检索
       │    → callbacks.search_knowledge() → inject_knowledge_context()

       ├─ Phase 3: 上下文准备
       │    → callbacks.load_messages() → prepare_chat_context() → trim

       ├─ Phase 4: AI 响应
       │    ├─ 有 tools → run_tool_loop_session()   ← 工具循环
       │    ├─ 有 streamer → 流式输出              ← 逐 token 推送
       │    └─ 其他 → callbacks.build_model_call()  ← 单次调用

       └─ Phase 5: 结果组装
            → build_assistant_message()
            → callbacks.save_assistant_message()
            → callbacks.send_response()
            → callbacks.on_response_complete()

工具确认流程

exec_command 等危险工具需要用户确认后才能执行。管道共用函数 handle_tool_confirmation() 已处理此流程,所有频道统一使用:

用户: "运行 python --version"
  → AI 调用 exec_command
  → 返回确认请求: "⚠️ 命令需要确认: python --version"
  
用户: "确认"
  → handle_tool_confirmation() 检测关键词
  → execute_pending_command()
  → 生成系统消息: "[系统] 用户已确认执行命令 `python --version`"
  → 管道继续,AI 获得执行结果

确认/拒绝关键词:

确认拒绝
确认、同意、确认执行、是、yes、y、ok、执行取消、拒绝、否、不执行、no、n、cancel

页面历史