基于FastAPI+Dify+RAG 商业级 AI 客服系统落地研发

基于FastAPI+Dify+RAG 商业级 AI 客服系统落地研发

Stack: Dify | Python | FastAPI | Redis | RAG | Shopify | Feishu|237

设计目的

此系统已落地实施,下面是MVP(最小可行性产品)单个平台shoptify演示

针对跨境电商多时区、多语言、高并发的运营痛点,从 0 到 1 架构了基于事件驱动的 AI 自动化中间件。系统以 BFF 架构统一接管独立站、Zendesk 工单与 WhatsApp 实时社群流量,不仅实现了 5 大核心客服 SOP 的全自动闭环,引入主动营销唤醒,人工审核机制,将弃购挽回率提升 15%。

项目背景

随着公司跨境电商业务在全球市场的快速扩张,传统的纯人工客服模式暴露出严重的效率瓶颈。首先,跨越多个时区导致无法实现真正的 7x24 小时无缝覆盖,夜间客诉积压严重;其次,客诉渠道(Shopify、Zendesk、WhatsApp)极度碎片化且多语种交织,导致人工响应延迟、人力成本激增;最后,客服团队长期陷入“查物流、问库存”等高频重复劳动,无力开展“弃购挽回”等高价值转化工作。为此,本项目旨在构建一套全域智能 AI Agent 中台,用大模型彻底重塑业务流,实现降本与增收的双向突破。

整体架构

graph TD
    subgraph Platform[平台泳道]
        P1[TikTok]
        P2[Shopify]
    end

    subgraph Receiver[接收泳道 - FastAPI]
        R1[验签]
        R2[存入队列]
        R3[返回200]
    end

    subgraph Queue[队列泳道 - Redis]
        Q1[(Stream)]
    end

    subgraph Worker[消费泳道 - Worker]
        W1[拉取消息]
        W2[调用Dify]
        W3[限流重试]
        W4[发送回复]
    end

    subgraph AI[AI泳道 - Dify]
        A1[意图识别]
        A2[RAG检索]
        A3[生成回复]
    end

    subgraph External[外部系统]
        E1[平台API]
        E2[人工客服]
    end

    P1 --> R1
    P2 --> R1
    R1 --> R2 --> Q1
    R1 --> R3 --> P1 & P2

    Q1 --> W1 --> W2
    W2 --> A1 --> A2 --> A3 --> W2
    W2 --> W3 --> W4
    W4 --> E1 --> P1 & P2
    W4 --> E2

核心亮点

  • 全渠道 Omnichannel 统一网关: 基于 FastAPI 独立研发高可用网关,统一清洗来自 Shopify Webhook、Zendesk Ticket 库及 WhatsApp 的异构数据。通过 Redis Stream 削峰填谷,彻底解决因大模型延迟导致的平台 API 封杀问题。

  • 原生级多语言与文化适配: 在 Dify 工作流中植入多语言意图识别路由。系统自动侦测语种,不仅实现 15 种语言的无缝切换,更通过动态 Prompt 适配当地文化语境(如:德语严谨条理,拉美西语热情且自带 Emoji),大幅提升 CSAT(客诉满意度)。

  • 自动化主动营销闭环SendGrid : 跨越“被动防守”边界,利用 Python 定时任务监控 Shopify 的“弃购订单 (Abandoned Checkouts)”。AI 动态生成带有极强转化语气的个性化挽回邮件,并通过 SendGrid API 自动投递,实现由“客服中台”向“营销中台”的价值跃升。

    • 具体实现:利用 Shopify 的 checkouts/updateWebhook,发给 Dify 的一个专属营销 LLM 节点,拿到 Dify 写的邮件后,Python 调用 SendGrid 的发信 API
  • 分布式幂等防重锁: 针对海外复杂网络抖动及 Zendesk/WhatsApp 常见的重复推流机制,基于 Redis SETNXMD5 哈希算法实现细粒度消息指纹锁,拦截率达 100%,杜绝 AI 对海外客户的重复“骚扰”。 核心代码:

具体功能模块

1. FastAPI 异步网关

主流电商平台(如 Shopify)对 Webhook 的超时限制极为严苛。如果直接在网关中等待大模型推理,会导致严重的超时报错甚至接口被平台封杀。

我们的设计是:FastAPI 收到请求后,不做任何阻塞计算,立刻将其推入 Redis 队列,并在毫秒级返回 200 OK。此外,FastAPI 还充当了内部代理(BFF),将变态的平台鉴权封装为对内暴露的简单接口(如 /internal/get_logistics),供 Dify 工作流反向调用。

# receiver.py - 极速网关接收核心逻辑
@app.post("/webhook/ecommerce")
async def receive_webhook(request: Request):
    # 1. 读取原始请求
    raw_body = await request.body()
    
    # 2. 解析 JSON 数据并生成唯一 msg_id
    payload = json.loads(raw_body)
    msg_id = payload.get("msg_id", f"msg_{int(time.time()*1000)}")
    payload["msg_id"] = msg_id
    
    # 3. 【核心优化】使用 XADD 写入 Redis Streams 队列,取代极易丢失数据的 List
    stream_id = await r.xadd("webhook_stream", {"data": json.dumps(payload)})
    print(f"📥 极速接收工单: {msg_id},已存入可靠流 ID: {stream_id}")
    
    # 4. 毫秒级返回 200 OK,满足所有电商平台的严苛要求
    return {"status": "success"}

2. Redis Streaming 消息队列

为了彻底解耦“接收端”与“处理端”,我们引入了 Redis Stream 作为异步消息总线。

相比于传统的 Redis List (LPUSH/BRPOP),Redis Stream 提供了企业级的消费者组 (Consumer Group) 概念。它能确保在电商大促期间,即便瞬间涌入成千上万个 Webhook 并发,消息也不会丢失。Worker 节点可以根据自身的算力节奏,平滑地从队列中拉取任务,实现了完美的“削峰填谷”。

# worker.py - 消费者组拉取数据逻辑
async def worker_loop():
    await init_stream() # 初始化消费者组
    print("👷 Worker 适配器已上线 | 防重发铠甲已激活 | 准备剥茧抽丝...")
    
    while True:
        try:
            # 使用 xreadgroup 阻塞读取流数据
            messages = await redis_client.xreadgroup(
                GROUP_NAME, CONSUMER_NAME, {STREAM_KEY: ">"}, count=1, block=2000
            )
            if not messages: continue

            for _, msg_list in messages:
                for message_id, msg_data in msg_list:
                    raw_payload = json.loads(msg_data.get("data"))
                    # ... 交给后续清洗与 AI 处理环节 ...

Worker 数据清洗

worker.py 是整个系统中最核心的后台进程,承担着数据清洗 (Transformer)并发风控 (Idempotency Lock) 的职责。

为了防止平台网络抽风导致的 Webhook 重复推送(At least once 机制),我们在处理前会给消息生成 MD5 指纹,并使用 Redis SETNX 上锁,拦截 1 小时内的完全重复请求,杜绝 AI 变成“复读机”。

# worker.py - 核心防重发铠甲与数据清洗
def transform_raw_data(raw_payload):
    # 【增强版】数据清洗:抓取商家或客户填写的真实 Note,提取订单号
    platform = "shopify"
    order_name = raw_payload.get("name") or f"Order_{raw_payload.get('id')}"
    customer = raw_payload.get("customer") or {} 
    
    real_note = raw_payload.get("note") 
    if real_note and str(real_note).strip(): 
        query = str(real_note).strip()
    else:
        query = f"Hi, I am {customer.get('first_name')}. I just placed order {order_name}. Can you confirm the shipping status?"
    
    return {"platform": platform, "user_id": customer.get("email"), "query": query, "msg_id": f"shopify_{raw_payload.get('id')}"}

# 在 worker_loop 中触发防重发保护:
cleaned_data = transform_raw_data(raw_payload)

# 🛡️ 终极防重发铠甲 (Idempotency Lock)
query_hash = hashlib.md5(cleaned_data['query'].encode('utf-8')).hexdigest()
lock_key = f"lock:{cleaned_data['platform']}:{cleaned_data['msg_id']}:{query_hash}"

# SETNX: 存活3600秒(1小时)。1小时内收到相同的“单号+提问”,直接抛弃!
is_new_msg = await redis_client.set(lock_key, "locked", nx=True, ex=3600)

if not is_new_msg:
    print(f"🛑 [熔断拦截] Shopify 网络抽风重发通知!已自动丢弃重复消息!")
    await redis_client.xack(STREAM_KEY, GROUP_NAME, message_id)
    continue # 抛弃重复消息

4. 联动 Dify 工作流编排

在确保数据唯一、干净后,Worker 会封装请求调用 Dify API。 系统不仅实现了多轮对话的跨周期记忆挂载,还具备拦截特殊指令(如转人工)的能力。最后,利用 @retry 指数退避算法,安全地将 AI 生成的话术写回平台。

# worker.py - 唤醒 Dify 与写回机制
async def call_dify_and_send(cleaned_data):
    # ... 省略部分构造 Payload 代码 ...
    
    # 🔗 跨周期记忆挂载:确保多轮对话上下文不丢失
    redis_memory_key = f"session_map:{platform}:{user_id}"
    saved_conv_id = await redis_client.get(redis_memory_key)
    if saved_conv_id:
        payload["conversation_id"] = saved_conv_id

    try:
        ai_reply = ""
        # 流式请求 Dify 接口获取大模型处理结果
        async with httpx.AsyncClient(timeout=120.0, trust_env=False) as client:
            async with client.stream("POST", DIFY_API_URL, headers=headers, json=payload) as response:
                response.raise_for_status()
                async for line in response.aiter_lines():
                    if line.startswith('data: '):
                        event_data = json.loads(line[6:])
                        if event_data.get('event') == 'message':
                            ai_reply += event_data.get('answer', '')
        
        print(f"💡 AI 推理完毕: {ai_reply}")
        
        # 将生成的回复,使用重试机制写回 Shopify 订单备注
        await send_reply_to_platform(platform, "shop_id", user_id, ai_reply)
        return True
    except Exception as e:
        print(f"❌ 运行错误: {e}")
        return False

# 指数退避重试写回平台
@retry(stop=stop_after_attempt(4), wait=wait_exponential(multiplier=1, min=1, max=10))
async def send_reply_to_platform(platform, shop_id, user_id, reply_text):
    if platform == "shopify":
        # 解析真实订单 ID,调用 Shopify Admin API 更新 Note
        order_id = user_id.split('_')[-1] 
        api_url = f"https://testabchsy.myshopify.com/admin/api/2024-04/orders/{order_id}.json"
        payload = {"order": {"id": order_id, "note": f"【AI 客服回复】: {reply_text}"}}
        
        async with httpx.AsyncClient() as client:
            response = await client.put(api_url, headers=headers, json=payload)
            if response.status_code == 200:
                print(f"✅ Shopify 订单备注更新成功!")

5. RAG知识库

解决幻觉问题,当客户询问“退货运费谁出”、“满多少免邮”等极其确定的静态企业政策时,如果任由 LLM 自由发挥,将会引发严重的客诉。在 Dify 平台内嵌了强大的 RAG架构,将大模型的算力与企业的私有知识库结合。 使用采用 Dify 内置的高性能向量数据库(基于 Weaviate),通过混合检索负责理解语义,索引方式选择嵌入,Embedding 模型选择qwen3-embedding精准捕捉客户提问的深层意图。

实现效果截图

外部进来的脏数据 -> 网关-> 队列 -> Worker -> Dify -> Worker -> 出口流控 (转回各平台专有格式) -> 外部平台。 image.png image.png image.png image.png image.png

结语

如果你目前也正在痛苦地摸索大模型落地,或者正被各类平台的 Webhook 折磨得死去活来,希望这套“三明治架构”能给你带来一点启发。

© 2026 Personal Website
Developed by Ryan 🫡