LangChain Skills 模式实战:构建按需加载知识的 SQL 助手

2/13/2026
7 min read

在先前的文章中,我们探讨了如何通过 Deep Agents CLI 模拟 Deep Agent 使用 Skills 的模式。如今,LangChain 已原生支持这一特性,极大地简化了开发流程。本文将带领大家深入体验这一功能,构建一个更智能的 SQL 助手。

构建复杂的 AI Agent 时,开发者往往陷入两难境地:是将所有上下文(数据库表结构、API 文档、业务规则)一次性注入 System Prompt,导致上下文窗口(Context Window)溢出且分散模型注意力?还是选择成本高昂的频繁微调(Fine-tuning)?

**Skills 模式(Skills Pattern)**提供了一条优雅的中间路线。它通过动态加载所需知识,实现了上下文的高效利用。LangChain 对此模式的原生支持,意味着我们可以更轻松地构建具备“按需学习”能力的 Agent。

本文将结合官方文档 Build a SQL assistant with on-demand skills,引导读者从零开始,构建一个支持“按需加载知识”的 SQL Assistant。

1. 核心概念:为何选择 Skills 模式?

传统 SQL Agent 的局限性

在传统的 SQL Agent 架构中,我们通常需要在 System Prompt 中提供完整的 Database Schema。随着业务发展,当表数量扩展到数百张时,这种方式会带来显著问题:

  • Token 消耗巨大:每次对话都携带大量无关的表结构,造成资源浪费。

  • 幻觉风险增加:过多的无关干扰信息会降低模型的推理准确性。

  • 维护困难:所有业务线的知识紧密耦合,难以独立迭代。

Skills 模式:基于渐进式披露的解决方案

Skills 模式基于**渐进式披露(Progressive Disclosure)**原则,将知识获取过程分层处理:

  • Agent 初始状态:仅掌握有哪些“技能”(Skills)及其简要描述(Description),保持轻量级。

  • 运行时加载:当面对具体问题(如“查询库存”)时,Agent 主动调用工具(load_skill)加载该技能详细的上下文(Schema + Prompt)。

  • 执行任务:基于加载的精确上下文,执行具体的任务(如编写并执行 SQL)。

这种模式有效支持了无限扩展团队解耦,使 Agent 能够适应日益复杂的业务场景。

2. 系统架构设计

本实战项目将构建一个包含两个核心 Skills 的 SQL Assistant,以演示该模式的实际应用:

  • Sales Analytics(销售分析):负责sales_data表,处理收入统计、订单趋势分析等。

  • Inventory Management(库存管理):负责inventory_items表,处理库存水平监控、位置查询等。

3. 开发环境搭建

本项目采用 Pythonuv进行高效的依赖管理。

核心依赖安装

uv add langchain langchain-openai langgraph psycopg2-binary python-dotenv langchain-community

PostgreSQL 环境配置

本地启动一个 Postgres 实例,并创建agent_platform数据库。我们提供了setup_db.py脚本来自动初始化表结构和测试数据(详见文末源码)。

4. 核心实现步骤详解### Schritt 1: Definition der Domänen-Skills (The Knowledge)

Wir definieren die Skills als Dictionary-Struktur, um das Laden aus einem Dateisystem oder einer Datenbank zu simulieren. Bitte beachten Sie den Unterschied zwischen description (zur Verwendung durch den Agenten bei der Entscheidungsfindung) und content (dem tatsächlich geladenen detaillierten Kontext).

SKILLS = {"sales_analytics": {"description":"Nützlich für die Analyse von Umsatzerlösen, Trends...","content":"""... Tabellenschema: sales_data ..."" },"inventory_management": {"description":"Nützlich für die Überprüfung der Lagerbestände...","content":"""... Tabellenschema: inventory_items ..."" }}

Schritt 2: Implementierung der Kernwerkzeuge (The Capabilities)

Der Agent ist auf zwei wichtige Werkzeuge angewiesen, um Aufgaben zu erledigen:

  • load_skill(skill_name)**: Lädt zur Laufzeit dynamisch die Details des angegebenen Skills. **

  • run_sql_query(query)**: Führt die konkrete SQL-Anweisung aus. **

Schritt 3: Orchestrierung der Agentenlogik (The Brain)

Verwenden Sie LangGraph, um einen ReAct Agent zu erstellen. Der System Prompt spielt hier eine entscheidende Rolle, da er den Agenten anweist, strikt dem Standardarbeitsverfahren (SOP) Identify -> Load -> Query zu folgen.

system_prompt ="""1. Identify the relevant skill.2. Use 'load_skill' to get schema.3. Write and execute SQL using 'run_sql_query'....Do not guess table names. Always load the skill first."""

5. Überprüfung der Laufzeiteffekte

Durch Ausführen von test_agent.py haben wir Abfragen in zwei verschiedenen Bereichen, Sales und Inventory, getestet. Im Folgenden finden Sie die tatsächlichen Ausgabeprotokolle der Konsole, die zeigen, wie der Agent Skills dynamisch basierend auf der Frage lädt:

Testing Sales Query...Agent calling tools: [{'name': 'load_skill', 'args': {'skill_name': 'sales_analytics'}, 'id': 'call_f270d76b7ce4404cb5f61bf2', 'type': 'tool_call'}]Tool output:You are a Sales Analytics Expert.You have access to the 'sales_data' table.Table Schema:- id: integer...Agent calling tools: [{'name': 'run_sql_query', 'args': {'query': 'SELECT SUM(amount) as total_revenue FROM sales_data;'}, 'id': 'call_b4f3e686cc7f4f22b3bb9ea7', 'type': 'tool_call'}]Tool output: [(Decimal('730.50'),)]...Agent response: The total revenue is $730.50.Testing Inventory Query...Agent calling tools: [{'name': 'load_skill', 'args': {'skill_name': 'inventory_management'}, 'id': 'call_18c823b2d5064e95a0cfe2e3', 'type': 'tool_call'}]Tool output:You are an Inventory Management Expert.You have access to the 'inventory_items' table.Table Schema...Agent calling tools: [{'name': 'run_sql_query', 'args': {'query': "SELECT warehouse_location FROM inventory_items WHERE product_name = 'Laptop';"}, 'id': 'call_647ee3a444804bd98a045f00', 'type': 'tool_call'}]Tool output: [('Warehouse A',)]...Agent response: The Laptop is located in **Warehouse A**.## 6. Vollständiger Quellcode als Referenz

Im Folgenden finden Sie den vollständigen Quellcode des Projekts, einschließlich des Datenbankinitialisierungsskripts und des Agent-Hauptprogramms.

1. Datenbankinitialisierung (setup_db.py)

`import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import os from dotenv import load_dotenv

load_dotenv()

Bitte stellen Sie sicher, dass die Datenbankverbindungsinformationen in .env konfiguriert sind

DB_HOST = os.getenv("DB_HOST", "localhost") DB_PORT = os.getenv("DB_PORT", "5432") DB_USER = os.getenv("DB_USER", "postgres") DB_PASSWORD = os.getenv("DB_PASSWORD", "your_password") # Bitte durch das tatsächliche Passwort ersetzen DB_NAME = os.getenv("DB_NAME", "agent_platform")

def create_database(): try: # Verbindung zur Standarddatenbank 'postgres', um eine neue Datenbank zu erstellen conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD, dbname="postgres", ) conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = conn.cursor()

    # Prüfen, ob die Datenbank existiert
    cur.execute(f"SELECT 1 FROM pg_catalog.pg_database WHERE datname = '{DB_NAME}'")
    exists = cur.fetchone()

    if not exists:
        print(f"Creating database {DB_NAME}...")
        cur.execute(f"CREATE DATABASE {DB_NAME}")
    else:
        print(f"Database {DB_NAME} already exists.")

    cur.close()
    conn.close()
except Exception as e:
    print(f"Error creating database: {e}")

def create_tables_and_data(): try: conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD, dbname=DB_NAME, ) cur = conn.cursor()

    # Sales-Tabelle erstellen
    print("Creating sales_data table...")
    cur.execute(
        """
        CREATE TABLE IF NOT EXISTS sales_data (
            id SERIAL PRIMARY KEY,
            transaction_date DATE,
            product_id VARCHAR(50),
            amount DECIMAL(10, 2),
            region VARCHAR(50)
        )
        """
    )

    # Inventory-Tabelle erstellen
    print("Creating inventory_items table...")
    cur.execute(
        """
        CREATE TABLE IF NOT EXISTS inventory_items (
            id SERIAL PRIMARY KEY,
            product_id VARCHAR(50),
            product_name VARCHAR(100),
            stock_count INTEGER,
            warehouse_location VARCHAR(50)
        )
        """
    )

    # Mock-Daten einfügen
    print("Inserting mock data...")
    cur.execute("TRUNCATE sales_data, inventory_items")

    sales_data = [
        ('2023-01-01', 'P001', 100.00, 'North'),
        ('2023-01-02', 'P002', 150.50, 'South'),
        ('2023-01-03', 'P001', 120.00, 'East'),
        ('2023-01-04', 'P003', 200.00, 'West'),
        ('2023-01-05', 'P002', 160.00, 'North'),
    ]
    cur.executemany(
        "INSERT INTO sales_data (transaction_date, product_id, amount, region) VALUES (%s, %s, %s, %s)",
        sales_data,
    )

    inventory_data = [
        ('P001', 'Laptop', 50, 'Warehouse A'),
        ('P002', 'Mouse', 200, 'Warehouse B'),
        ('P003', 'Keyboard', 150, 'Warehouse A'),
        ('P004', 'Monitor', 30, 'Warehouse C'),
    ]
    cur.executemany(
        "INSERT INTO inventory_items (product_id, product_name, stock_count, warehouse_location) VALUES (%s, %s, %s, %s)",
        inventory_data,
    )

    conn.commit()
    cur.close()
    conn.close()
    print("Database setup complete.")
except Exception as e:
    print(f"Error setting up tables: {e}")

if name == "main": create_database() create_tables_and_data() `### 2. Agent Hauptprogramm (main.py)

`import os from typing import Annotated, Literal, TypedDict, Union, Dict from dotenv import load_dotenv from langchain_openai import ChatOpenAI from langchain_core.tools import tool from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage from langchain_community.utilities import SQLDatabase from langchain_community.agent_toolkits import SQLDatabaseToolkit from langgraph.graph import StateGraph, START, END, MessagesState from langgraph.prebuilt import ToolNode, tools_condition

load_dotenv()

--- Konfiguration ---

BASE_URL = os.getenv("BASIC_MODEL_BASE_URL") API_KEY = os.getenv("BASIC_MODEL_API_KEY") MODEL_NAME = os.getenv("BASIC_MODEL_MODEL") DB_URI = f"postgresql://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"

--- Datenbank Setup ---

db = SQLDatabase.from_uri(DB_URI)

--- Skill-Definitionen ---

SKILLS: Dict[str, Dict[str, str]] = { "sales_analytics": { "description": "Nützlich für die Analyse von Umsatzerlösen, Trends und regionaler Leistung.", "content": """ Du bist ein Experte für Vertriebsanalyse. Du hast Zugriff auf die Tabelle 'sales_data'. Tabellenschema:

  • id: integer (Primärschlüssel)
  • transaction_date: date
  • product_id: varchar(50)
  • amount: decimal(10, 2)
  • region: varchar(50)

Übliche Abfragen:

  • Gesamtumsatz: SUM(amount)
  • Umsatz nach Region: GROUP BY region
  • Verkaufstrend: GROUP BY transaction_date """ }, "inventory_management": { "description": "Nützlich für die Überprüfung von Lagerbeständen, Produktstandorten und Lagerverwaltung.", "content": """ Du bist ein Experte für Bestandsverwaltung. Du hast Zugriff auf die Tabelle 'inventory_itemTable Schema:
  • id: integer (primary key)
  • product_id: varchar(50)
  • product_name: varchar(100)
  • stock_count: integer
  • warehouse_location: varchar(50)

Common queries:

  • Check stock: WHERE product_name = '...'
  • Low stock: WHERE stock_count < threshold""" }}# --- Tools ---@tooldefload_skill(skill_name: str)-> str:""" Load the detailed prompt and schema for a specific skill. Available skills:
    • sales_analytics: For sales, revenue, and transaction analysis.
    • inventory_management: For stock, products, and warehouse queries. """ skill = SKILLS.get(skill_name) if not skill: return f"Error: Skill '{skill_name}' not found. Available skills: {list(SKILLS.keys())}" return skill["content"]

@tool def run_sql_query(query: str) -> str: """ Execute a SQL query against the database. Only use this tool AFTER loading the appropriate skill to understand the schema. """ try: return db.run(query) except Exception as e: return f"Error executing SQL: {e}"

@tool def list_tables() -> str: """List all available tables in the database.""" return str(db.get_usable_table_names())

tools = [load_skill, run_sql_query, list_tables]

--- Agent Setup ---

llm = ChatOpenAI( base_url=BASE_URL, api_key=API_KEY, model=MODEL_NAME, temperature=0 )

llm_with_tools = llm.bind_tools(tools)

--- Graph Definition ---

class AgentState(MessagesState):

We can add custom state if needed, but MessagesState is sufficient for simple chat

pass

def agent_node(state: AgentState): messages = state["messages"] response = llm_with_tools.invoke(messages) return {"messages": [response]}

workflow = StateGraph(AgentState) workflow.add_node("agent", agent_node)workflow.add_node("tools", ToolNode(tools))workflow.add_edge(START,"agent")workflow.add_conditional_edges("agent", tools_condition)workflow.add_edge("tools","agent")app = workflow.compile()# --- Main Execution ---if__name__ =="main": system_prompt ="""You are a helpful SQL Assistant.You have access to specialized skills that contain database schemas and domain knowledge.To answer a user's question:1. Identify the relevant skill (sales_analytics or inventory_management).2. Use the 'load_skill' tool to get the schema and instructions.3. Based on the loaded skill, write and execute a SQL query using 'run_sql_query'.4. Answer the user's question based on the query results.Do not guess table names. Always load the skill first.""" print("SQL Assistant initialized. Type 'quit' to exit.") print("-"*50) messages = [SystemMessage(content=system_prompt)]# Pre-warm connection checktry: print(f"Connected to database:{DB_URI.split('@')[-1]}")exceptExceptionase: print(f"Database connection warning:{e}")whileTrue:try: user_input = input("User: ")ifuser_input.lower()in["quit","exit"]:break messages.append(HumanMessage(content=user_input))# Stream the execution print("Agent: ", end="", flush=True) final_response =Noneforeventinapp.stream({"messages": messages}, stream_mode="values"):# In 'values' mode, we get the full state. We just want to see the last message if it's new. last_message = event["messages"][-1]# Update our message history with the latest statepass# After stream finishes, the last state has the final answer final_state = app.invoke({"messages": messages}) last_msg = final_state["messages"][-1]ifisinstance(last_msg, AIMessage): print(last_msg.content) messages = final_state["messages"]# Update history print("-"*50)exceptExceptionase: print(f"\nError:{e}")break`

Published in Technology

You Might Also Like