LangChain Skills 模式实战:构建按需加载知识的 SQL 助手
在先前的文章中,我们探讨了如何通过 Deep Agents CLI 模拟 Deep Agent 使用 Skills 的模式。如今,LangChain 已原生支持这一特性,极大地简化了开发流程。本文将带领大家深入体验这一功能,构建一个更智能的 SQL 助手。
构建复杂的 AI Agent 时,开发者往往陷入两难境地:是将所有上下文(数据库表结构、API 文档、业务规则)一次性注入 System Prompt,导致上下文窗口(Context Window)溢出且分散模型注意力?还是选择成本高昂的频繁微调(Fine-tuning)?
**Skills 模式(Skills Pattern)**提供了一条优雅的中间路线。它通过动态加载所需知识,实现了上下文的高效利用。LangChain 对此模式的原生支持,意味着我们可以更轻松地构建具备“按需学习”能力的 Agent。
本文将结合官方文档 Build a SQL assistant with on-demand skills,引导读者从零开始,构建一个支持“按需加载知识”的 SQL Assistant。
1. 核心概念:为何选择 Skills 模式?
传统 SQL Agent 的局限性
在传统的 SQL Agent 架构中,我们通常需要在 System Prompt 中提供完整的 Database Schema。随着业务发展,当表数量扩展到数百张时,这种方式会带来显著问题:
-
Token 消耗巨大:每次对话都携带大量无关的表结构,造成资源浪费。
-
幻觉风险增加:过多的无关干扰信息会降低模型的推理准确性。
-
维护困难:所有业务线的知识紧密耦合,难以独立迭代。
Skills 模式:基于渐进式披露的解决方案
Skills 模式基于**渐进式披露(Progressive Disclosure)**原则,将知识获取过程分层处理:
-
Agent 初始状态:仅掌握有哪些“技能”(Skills)及其简要描述(Description),保持轻量级。
-
运行时加载:当面对具体问题(如“查询库存”)时,Agent 主动调用工具(
load_skill)加载该技能详细的上下文(Schema + Prompt)。 -
执行任务:基于加载的精确上下文,执行具体的任务(如编写并执行 SQL)。
这种模式有效支持了无限扩展和团队解耦,使 Agent 能够适应日益复杂的业务场景。
2. 系统架构设计
本实战项目将构建一个包含两个核心 Skills 的 SQL Assistant,以演示该模式的实际应用:
-
Sales Analytics(销售分析):负责
sales_data表,处理收入统计、订单趋势分析等。 -
Inventory Management(库存管理):负责
inventory_items表,处理库存水平监控、位置查询等。
3. 开发环境搭建
本项目采用 Pythonuv进行高效的依赖管理。
核心依赖安装
uv add langchain langchain-openai langgraph psycopg2-binary python-dotenv langchain-community
PostgreSQL 环境配置
本地启动一个 Postgres 实例,并创建agent_platform数据库。我们提供了setup_db.py脚本来自动初始化表结构和测试数据(详见文末源码)。
4. 核心实现步骤详解### Étape 1 : Définir les compétences du domaine (The Knowledge)
Nous définissons les compétences comme une structure de dictionnaire, simulant le processus de chargement à partir d'un système de fichiers ou d'une base de données. Veuillez noter la distinction entre description (utilisée par l'Agent pour la sélection des options) et content (le contexte détaillé réellement chargé).
SKILLS = {"sales_analytics": {"description":"Useful for analyzing sales revenue, trends...","content":"""... Table Schema: sales_data ..."" },"inventory_management": {"description":"Useful for checking stock levels...","content":"""... Table Schema: inventory_items ..."" }}
Étape 2 : Implémenter les outils principaux (The Capabilities)
L'Agent dépend de deux outils essentiels pour accomplir les tâches :
-
load_skill(skill_name): Charge dynamiquement les détails de la compétence spécifiée au moment de l'exécution. -
run_sql_query(query): Exécute les instructions SQL spécifiques.
Étape 3 : Orchestrer la logique de l'Agent (The Brain)
Utilisez LangGraph pour construire un Agent ReAct. Le System Prompt joue un rôle essentiel ici, guidant l'Agent à suivre strictement la procédure opérationnelle standard (SOP) Identifier -> Charger -> Interroger.
system_prompt ="""1. Identify the relevant skill.2. Use 'load_skill' to get schema.3. Write and execute SQL using 'run_sql_query'....Do not guess table names. Always load the skill first.""
5. Vérification de l'efficacité de l'exécution
En exécutant test_agent.py, nous avons testé des requêtes dans deux domaines différents, Sales et Inventory. Voici les journaux de sortie réels de la console, montrant comment l'Agent charge dynamiquement les compétences en fonction de la question :
Testing Sales Query...Agent calling tools: [{'name': 'load_skill', 'args': {'skill_name': 'sales_analytics'}, 'id': 'call_f270d76b7ce4404cb5f61bf2', 'type': 'tool_call'}]Tool output:You are a Sales Analytics Expert.You have access to the 'sales_data' table.Table Schema:- id: integer...Agent calling tools: [{'name': 'run_sql_query', 'args': {'query': 'SELECT SUM(amount) as total_revenue FROM sales_data;'}, 'id': 'call_b4f3e686cc7f4f22b3bb9ea7', 'type': 'tool_call'}]Tool output: [(Decimal('730.50'),)]...Agent response: The total revenue is $730.50.Testing Inventory Query...Agent calling tools: [{'name': 'load_skill', 'args': {'skill_name': 'inventory_management'}, 'id': 'call_18c823b2d5064e95a0cfe2e3', 'type': 'tool_call'}]Tool output:You are an Inventory Management Expert.You have access to the 'inventory_items' table.Table Schema...Agent calling tools: [{'name': 'run_sql_query', 'args': {'query': "SELECT warehouse_location FROM inventory_items WHERE product_name = 'Laptop';"}, 'id': 'call_647ee3a444804bd98a045f00', 'type': 'tool_call'}]Tool output: [('Warehouse A',)]...Agent response: The Laptop is located in **Warehouse A**.## 6. Référence complète du code source
Voici le code source complet du projet, comprenant le script d'initialisation de la base de données et le programme principal de l'Agent.
1. Initialisation de la base de données (setup_db.py)
`import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import os from dotenv import load_dotenv
load_dotenv()
Veuillez vous assurer que les informations de connexion à la base de données sont configurées dans .env
DB_HOST = os.getenv("DB_HOST", "localhost") DB_PORT = os.getenv("DB_PORT", "5432") DB_USER = os.getenv("DB_USER", "postgres") DB_PASSWORD = os.getenv("DB_PASSWORD", "your_password") # Veuillez remplacer par le mot de passe réel DB_NAME = os.getenv("DB_NAME", "agent_platform")
def create_database(): try: # Connexion à la base de données par défaut 'postgres' pour créer une nouvelle base de données conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD, dbname="postgres" ) conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = conn.cursor()
# Vérifier si la base de données existe
cur.execute(f"SELECT 1 FROM pg_catalog.pg_database WHERE datname = '{DB_NAME}'")
exists = cur.fetchone()
if not exists:
print(f"Création de la base de données {DB_NAME}...")
cur.execute(f"CREATE DATABASE {DB_NAME}")
else:
print(f"La base de données {DB_NAME} existe déjà.")
cur.close()
conn.close()
except Exception as e:
print(f"Erreur lors de la création de la base de données : {e}")
def create_tables_and_data(): try: conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD, dbname=DB_NAME ) cur = conn.cursor()
# Créer la table Sales
print("Création de la table sales_data...")
cur.execute("""
CREATE TABLE IF NOT EXISTS sales_data (
id SERIAL PRIMARY KEY,
transaction_date DATE,
product_id VARCHAR(50),
amount DECIMAL(10, 2),
region VARCHAR(50)
)
""")
# Créer la table Inventory
print("Création de la table inventory_items...")
cur.execute("""
CREATE TABLE IF NOT EXISTS inventory_items (
id SERIAL PRIMARY KEY,
product_id VARCHAR(50),
product_name VARCHAR(100),
stock_count INTEGER,
warehouse_location VARCHAR(50)
)
""")
# Insérer des données factices
print("Insertion de données factices...")
cur.execute("TRUNCATE sales_data, inventory_items")
sales_data = [
('2023-01-01', 'P001', 100.00, 'North'),
('2023-01-02', 'P002', 150.50, 'South'),
('2023-01-03', 'P001', 120.00, 'East'),
('2023-01-04', 'P003', 200.00, 'West'),
('2023-01-05', 'P002', 160.00, 'North')
]
cur.executemany("INSERT INTO sales_data (transaction_date, product_id, amount, region) VALUES (%s, %s, %s, %s)",
sales_data
)
inventory_data = [
('P001', 'Laptop', 50, 'Warehouse A'),
('P002', 'Mouse', 200, 'Warehouse B'),
('P003', 'Keyboard', 150, 'Warehouse A'),
('P004', 'Monitor', 30, 'Warehouse C')
]
cur.executemany("INSERT INTO inventory_items (product_id, product_name, stock_count, warehouse_location) VALUES (%s, %s, %s, %s)",
inventory_data
)
conn.commit()
cur.close()
conn.close()
print("Configuration de la base de données terminée.")
except Exception as e:
print(f"Erreur lors de la configuration des tables : {e}")
if name == "main": create_database() create_tables_and_data() `### 2. Agent 主程序 (main.py)
`importosfromtypingimportAnnotated, Literal, TypedDict, Union, Dictfromdotenvimportload_dotenvfromlangchain_openaiimportChatOpenAIfromlangchain_core.toolsimporttoolfromlangchain_core.messagesimportSystemMessage, HumanMessage, AIMessage, ToolMessagefromlangchain_community.utilitiesimportSQLDatabasefromlangchain_community.agent_toolkitsimportSQLDatabaseToolkitfromlanggraph.graphimportStateGraph, START, END, MessagesStatefromlanggraph.prebuiltimportToolNode, tools_conditionload_dotenv()# --- Configuration ---BASE_URL = os.getenv("BASIC_MODEL_BASE_URL")API_KEY = os.getenv("BASIC_MODEL_API_KEY")MODEL_NAME = os.getenv("BASIC_MODEL_MODEL")DB_URI =f"postgresql://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"# --- Database Setup ---db = SQLDatabase.from_uri(DB_URI)# --- Skills Definition ---SKILLS: Dict[str, Dict[str, str]] = {"sales_analytics": {"description":"Useful for analyzing sales revenue, trends, and regional performance.","content":"""You are a Sales Analytics Expert.You have access to the 'sales_data' table.Table Schema:- id: integer (primary key)- transaction_date: date- product_id: varchar(50)- amount: decimal(10, 2)- region: varchar(50)Common queries:- Total revenue: SUM(amount)- Revenue by region: GROUP BY region- Sales trend: GROUP BY transaction_date""" },"inventory_management": {"description":"Useful for checking stock levels, product locations, and warehouse management.","content":"""You are an Inventory Management Expert.You have access to the 'inventory_item"""Table Schema:
- id: integer (primary key)
- product_id: varchar(50)
- product_name: varchar(100)
- stock_count: integer
- warehouse_location: varchar(50)
Common queries:
- Check stock: WHERE product_name = '...'
- Low stock: WHERE stock_count < threshold""" }}# --- Tools ---@tooldefload_skill(skill_name: str)-> str:""" Load the detailed prompt and schema for a specific skill. Available skills:
- sales_analytics: For sales, revenue, and transaction analysis.
- inventory_management: For stock, products, and warehouse queries. """ skill = SKILLS.get(skill_name) if not skill: return f"Error: Skill '{skill_name}' not found. Available skills: {list(SKILLS.keys())}" return skill["content"]
@tool def run_sql_query(query: str) -> str: """ Execute a SQL query against the database. Only use this tool AFTER loading the appropriate skill to understand the schema. """ try: return db.run(query) except Exception as e: return f"Error executing SQL: {e}"
@tool def list_tables() -> str: """List all available tables in the database.""" return str(db.get_usable_table_names())
tools = [load_skill, run_sql_query, list_tables]
--- Agent Setup ---
llm = ChatOpenAI( base_url=BASE_URL, api_key=API_KEY, model=MODEL_NAME, temperature=0 ) llm_with_tools = llm.bind_tools(tools)
--- Graph Definition ---
class AgentState(MessagesState):
We can add custom state if needed, but MessagesState is sufficient for simple chat
pass
def agent_node(state: AgentState): messages = state["messages"] response = llm_with_tools.invoke(messages) return {"messages": [response]}
workflow = StateGraph(AgentState) workflow.add_node("agent", agent_node)workflow.add_node("tools", ToolNode(tools))workflow.add_edge(START,"agent")workflow.add_conditional_edges("agent", tools_condition)workflow.add_edge("tools","agent")app = workflow.compile()# --- Exécution principale ---if__name__ =="main": system_prompt ="""Vous êtes un assistant SQL serviable.Vous avez accès à des compétences spécialisées qui contiennent des schémas de base de données et des connaissances du domaine.Pour répondre à la question d'un utilisateur :1. Identifiez la compétence pertinente (sales_analytics ou inventory_management).2. Utilisez l'outil « load_skill » pour obtenir le schéma et les instructions.3. Sur la base de la compétence chargée, écrivez et exécutez une requête SQL à l'aide de « run_sql_query ».4. Répondez à la question de l'utilisateur en fonction des résultats de la requête.Ne devinez pas les noms des tables. Chargez toujours la compétence en premier.""" print("Assistant SQL initialisé. Tapez 'quit' pour quitter.") print("-"*50) messages = [SystemMessage(content=system_prompt)]# Vérification de la connexion pré-chaufféetry: print(f"Connecté à la base de données :{DB_URI.split('@')[-1]}")exceptExceptionase: print(f"Avertissement de connexion à la base de données :{e}")whileTrue:try: user_input = input("Utilisateur : ")ifuser_input.lower()in["quit","exit"]:break messages.append(HumanMessage(content=user_input))# Diffusez l'exécution print("Agent : ", end="", flush=True) final_response =Noneforeventinapp.stream({"messages": messages}, stream_mode="values"):# En mode « values », nous obtenons l'état complet. Nous voulons juste voir le dernier message s'il est nouveau. last_message = event["messages"][-1]# Mettez à jour notre historique des messages avec le dernier étatpass# Une fois la diffusion terminée, le dernier état contient la réponse finale final_state = app.invoke({"messages": messages}) last_msg = final_state["messages"][-1] ifisinstance(last_msg, AIMessage): print(last_msg.content) messages = final_state["messages"]# Update history print("-"*50) exceptExceptionase: print(f"\nError:{e}") break





