
多轮对话系统真正的门槛,不是接入大模型,而是把上下文、分支、RAG、工具调用、流式恢复和失败收敛变成一套可控的工程流程。本文从生产系统视角出发,拆解一个可靠对话链路该如何设计:消息如何落库,分支如何隔离,上下文如何规划,工具如何闭环,异常如何恢复,以及为什么 trace 必须成为业务证据。
pending 消息。目标 | 含义 |
|---|---|
状态可收敛 | 成功、失败、取消、超时都能落到明确终态 |
上下文可解释 | 模型看到的内容都有来源和边界 |
分支不串线 | 重试、编辑、重新生成不会污染默认续聊 |
依赖可降级 | RAG、工具、上游 stateful 失败后有回退路径 |
资源有上限 | Token、工具次数、LLM 调用次数、事件缓存都有限制 |
过程可追踪 | 能还原本轮用了什么上下文、工具和回退策略 |
Run,并创建两条占位消息:user 消息:表示系统已接收输入。assistant 消息:表示本轮即将生成回答。type RunStatus string
const (
Pending RunStatus = "pending"
Success RunStatus = "success"
Error RunStatus = "error"
Canceled RunStatus = "canceled"
)
type Run struct {
ID string
ConversationID uint
UserID uint
Status RunStatus
ErrorCode string
InputTokens int64
OutputTokens int64
StartedAt time.Time
EndedAt *time.Time
}
type Message struct {
ID uint
ConversationID uint
ParentID *uint
RunID string
Role string
Content string
Status RunStatus
ErrorCode string
}send(input):
run = create_run(status=pending)
user_msg = create_message(role=user, status=pending)
assistant_msg = create_message(role=assistant, status=pending)
try:
result = generate(run, user_msg, assistant_msg, input)
mark_success(run, user_msg, assistant_msg, result)
return result
catch GenerationCanceled:
save_partial_text(assistant_msg)
mark_canceled(run, assistant_msg)
catch err:
mark_error(run, user_msg, assistant_msg, classify(err))success。error。canceled。pending,必须归类成明确错误。// 容易串分支:只按时间取消息。
history := repo.ListRecentMessages(conversationID, 20)func BuildPath(messages []Message, leafID uint) []Message {
byID := map[uint]Message{}
for _, m := range messages {
byID[m.ID] = m
}
var path []Message
seen := map[uint]bool{}
for id := leafID; id != 0; {
m, ok := byID[id]
if !ok || seen[id] {
break
}
seen[id] = true
path = append(path, m)
if m.ParentID == nil {
break
}
id = *m.ParentID
}
for i, j := 0, len(path)-1; i < j; i, j = i+1, j-1 {
path[i], path[j] = path[j], path[i]
}
return path
}pending、error、canceled。func DefaultParent(recent []Message) *Message {
for i := len(recent) - 1; i >= 0; i-- {
m := recent[i]
if m.Role == "assistant" && m.Status == Success {
return &m
}
}
return nil
}PromptPlan。结构可以很简单:type BlockKind string
const (
SystemPolicy BlockKind = "system_policy"
Transcript BlockKind = "transcript"
Stable BlockKind = "stable_context"
Dynamic BlockKind = "dynamic_context"
ToolGuide BlockKind = "tool_guidance"
)
type PromptBlock struct {
Kind BlockKind
Content string
Tokens int64
Cacheable bool
Sources []SourceRef
}
type PromptPlan struct {
Messages []LLMMessage
Trace PromptTrace
}PromptPlan 不只是拼装器,更是审计边界。它应该记录每个 block 的 token、来源、是否可缓存。type PromptTrace struct {
TotalTokens int64
Blocks []BlockTrace
}
type BlockTrace struct {
Kind BlockKind
Tokens int64
Cacheable bool
SourceCount int
}PromptTrace,线上排障时只能看完整 prompt;有了 PromptTrace,可以直接看到本轮是否包含文件、RAG、记忆、摘要、工具规则,以及各自占了多少预算。内容 | 推荐位置 |
|---|---|
平台规则、模型行为约束 | system |
用户输入、资料型上下文 | user |
工具执行结果 | tool |
模型回答 | assistant |
<ctx>
<summary>用户正在讨论企业知识库权限模型。</summary>
<rag source="pricing.pdf#chunk-12">
企业版支持 SSO、审计日志和自定义数据保留周期。
</rag>
<memory key="language">用户偏好中文回答。</memory>
</ctx>
<user_input>
企业版和团队版有什么区别?
</user_input>prompt injection,但能建立更清晰的权限层级:资料可以被引用,但不能天然覆盖系统策略。层级 | 作用 |
|---|---|
最近消息 | 保留局部指代和对话节奏 |
会话摘要 | 保留远期主线 |
语义召回 | 找回被截断但相关的细节 |
上下文证据 | 保存曾经用过的 RAG、工具结果、摘要 |
type Slot struct {
Kind string
Content string
Tokens int64
Priority int
Required bool
}
func SelectSlots(slots []Slot, budget int64) []Slot {
sort.SliceStable(slots, func(i, j int) bool {
if slots[i].Required != slots[j].Required {
return slots[i].Required
}
return slots[i].Priority > slots[j].Priority
})
var used int64
var out []Slot
for _, s := range slots {
if s.Required || used+s.Tokens <= budget {
out = append(out, s)
used += s.Tokens
}
}
return out
}内容 | 优先级 |
|---|---|
当前输入 | 100 |
用户偏好 | 90 |
会话摘要 | 80 |
RAG 证据 | 70 |
语义召回 | 60 |
长期记忆 | 50 |
更早历史 | 40 |
type RetrieveStatus string
const (
Hit RetrieveStatus = "hit"
Empty RetrieveStatus = "empty"
LowScore RetrieveStatus = "low_score"
Timeout RetrieveStatus = "timeout"
Failed RetrieveStatus = "failed"
)
type RetrieveResult struct {
Status RetrieveStatus
Reason string
Chunks []Chunk
CandidateCount int
MaxScore float64
}len(chunks)。retrieve_context(query, files):
result = rag.retrieve(query, files)
if result.error:
trace("rag failed; fallback to full text")
return full_text_fallback(files)
if result.chunks is empty:
trace("rag miss", status=result.status, reason=result.reason)
return evidence_miss(result.reason)
trace("rag hit", count=len(result.chunks), max_score=result.max_score)
return rag_chunks(result.chunks)run_tool_loop(messages, tools):
ledger = {}
llm_calls = 0
tool_calls = 0
while llm_calls < max_llm_calls:
output = llm.generate(messages, tools)
llm_calls += 1
if output.tool_calls is empty:
return output
for call in output.tool_calls:
if tool_calls >= max_tool_calls:
disable_tools("tool limit reached")
return llm.generate(messages, tools=[])
key = call.name + canonical_json(call.arguments)
if key not in ledger:
validate(call.arguments, schema=call.schema)
result = execute_tool(call)
ledger[key] = truncate_for_model(result)
tool_calls += 1
messages.append(tool_result(call.id, ledger[key]))
disable_tools("llm call limit reached")
return llm.generate(messages, tools=[])previous_response_id 这类上游有状态能力可以省 token,但不能成为事实来源。func CanUsePrevious(prevID, storedFP, currentFP string) bool {
return prevID != "" && storedFP != "" && storedFP == currentFP
}if can_use_previous_response:
output = llm.generate(messages=latest_user_only, previous_response_id=prev_id)
if previous_response_rejected(output.error):
clear_previous_response_id()
output = llm.generate(messages=full_context)
else:
output = llm.generate(messages=full_context)run_id 作为稳定标识,把事件写进短期缓存。type StreamEvent struct {
RunID string
Seq int64
Type string
Payload json.RawMessage
}
type StreamStore interface {
Append(ctx context.Context, runID string, payload []byte) (int64, error)
ListAfter(ctx context.Context, runID string, afterSeq int64) ([]StreamEvent, error)
Cancel(ctx context.Context, runID string) error
IsCanceled(ctx context.Context, runID string) bool
}run_id 和 after_seq。canceled。这比直接丢弃更符合用户预期。type TraceEvent struct {
Seq int
Stage string // prompt / rag / tool / llm / persist
Status string // streaming / completed / error
Title string
Summary string
Payload map[string]any
}阶段 | 应记录内容 |
|---|---|
Prompt | full/stateful、消息数、token、上下文来源 |
RAG | query、命中数、最高分、失败原因、回退策略 |
Tool | 工具名、参数摘要、状态、耗时、是否复用 |
LLM | 模型、路由、首 token 延迟、usage |
Persist | 消息状态、run 状态、错误码 |
type PromptShape struct {
Mode string // full / stateful / full_retry
MessageCount int
Tokens int64
HasFiles bool
HasRAG bool
HasMemory bool
HasTools bool
}persist_success(result):
update_assistant_message(result.answer)
create_tool_call_records(result.tool_calls)
update_run(status=success, usage=result.usage)
async_with_timeout("embed_messages"):
index_message_pair(result.messages)
async_with_timeout("generate_title"):
generate_conversation_title(result.conversation_id)Run 和消息状态机承接请求。PromptPlan 管理上下文来源和预算。请登录后发表您的评论
还没有评论,来抢个沙发吧。