Skip to content

存储与追踪

字数
839 字
阅读时间
4 分钟

SQLite 持久化

GatewayStorage 使用 SQLite(默认路径 data/web/gateway.db)存储核心表。

统一日志迁移:自 v0.2-log 起,_record_event() 写入 gateway_logs(通过 GatewayLogService)。gateway_events 仅用于历史兼容。Bot 启动时自动迁移旧数据(一次性,_gateway_log_migrations 表标记)。

gateway_logs — 统一日志(主表)

所有事件、MCP 工具调用统一写入此表。

字段类型说明
idTEXT PK日志 ID(log_{hex16} 或迁移的 evt_{id}
trace_idTEXT追踪 ID,索引
sourceTEXT来源:gateway / mcp / web
typeTEXT类型:receive / mcp_tool / message
levelTEXT级别:info / warning / error
stageTEXT阶段:receive_start / auth_passed / parsed
actionTEXT动作:receive / verify / parse / dispatch
statusTEXT状态:received / verified / delivered
messageTEXT描述信息
tool_nameTEXTMCP 工具名(仅 source=mcp
channel_idTEXT频道 ID
conversation_idTEXT会话 ID
user_id / message_idTEXT用户/消息 ID
metadata_jsonTEXT元数据 JSON(自动脱敏)
error_messageTEXT错误信息
created_atTEXT创建时间,索引
python
# 查询
records = gateway.log_service.query(source="gateway", status="delivered", limit=50)

# Trace 聚合(跨 events + deliveries + mcp_logs)
result = gateway.log_service.aggregate_trace(trace_id, ...)

# ID 类型识别
result = gateway.log_service.lookup_id("gw_20260601_xxx", ...)

gateway_events — 事件生命周期(旧表,已迁移)

已废弃:新事件写入 gateway_logs。此表仅保留历史数据。

记录消息和内部任务在管线中每一步的状态变化。

字段类型说明
idINTEGER PK自增主键
trace_idTEXT追踪 ID,索引
channel_idTEXT频道 ID
conversation_idTEXT会话 ID
user_idTEXT用户 ID
message_idTEXT消息 ID
event_typeTEXTmessage / internal_task / operation
statusTEXT事件状态
raw_event_jsonTEXT原始事件 JSON
metadata_jsonTEXT元数据 JSON
errorTEXT错误信息
created_atTEXT创建时间
python
# 查询某条消息的完整追踪链
events = gateway.event_store.get_by_trace("gw_20260525_153000_a1b2c3d4")

# 按条件查询
events = gateway.event_store.query(
    channel_id="qq",
    status="failed",
    event_type="message",
    limit=50,
)

gateway_deliveries — 回复投递记录

字段类型说明
idINTEGER PK自增主键
trace_idTEXT追踪 ID
channel_idTEXT频道 ID
conversation_idTEXT会话 ID
statusTEXTpending / sending / delivered / failed / dead
contentTEXT回复内容
request_jsonTEXT请求数据
response_jsonTEXT响应数据
errorTEXT错误信息
attemptsINTEGER重试次数
created_atTEXT创建时间
updated_atTEXT更新时间

gateway_dedupe — 去重记录

字段类型说明
dedupe_keyTEXT PK去重 key
channel_idTEXT频道 ID
message_idTEXT消息 ID
created_atTEXT创建时间
expires_atTEXT过期时间,索引

数据清理

python
# 清理 30 天前的统一日志
gateway.log_service.cleanup(keep_days=30)

# 清理 30 天前的投递记录
gateway.storage.delivery_cleanup(keep_days=30)

# 清理过期去重记录
gateway.storage.dedupe_cleanup_expired()

# 压缩数据库
gateway.storage.vacuum()

统计信息

python
stats = gateway.storage.get_stats()
# => {
#     "events": {"count": 12345, "file_size_mb": 12.3},
#     "deliveries": {"count": 890, "file_size_mb": 5.1},
#     "dedupe": {"count": 234, "file_size_mb": 0.5},
# }

# 统一日志条数
count = gateway.log_service.count()

追踪链路

TraceFactory

生成格式为 gw_{YYYYMMDD}_{HHMMSS}_{8位hex} 的追踪 ID:

python
from nbot.gateway.trace import TraceFactory

factory = TraceFactory(prefix="gw")
trace_id = factory.new_trace_id()
# => "gw_20260525_153000_a1b2c3d4"

线程上下文

使用 trace_context 上下文管理器,确保同一处理管线内的所有日志携带相同 trace_id:

python
from nbot.gateway.trace import trace_context, get_current_trace_id

with trace_context(trace_id):
    # 此作用域内 get_current_trace_id() 返回该 trace_id
    do_something()

事件元数据

_record_event() 写入 gateway_logs(通过 GatewayLogService),自动映射 statusaction/stage/level。记录字段:trace_idchannel_idstatusconversation_iduser_idmessage_idevent_typeerrormetadata(自动合并 remote_addr)。

页面历史