LangChain Skills 模式实战:构建按需加载知识的 SQL 助手
在先前的文章中,我们探讨了如何通过 Deep Agents CLI 模拟 Deep Agent 使用 Skills 的模式。如今,LangChain 已原生支持这一特性,极大地简化了开发流程。本文将带领大家深入体验这一功能,构建一个更智能的 SQL 助手。
构建复杂的 AI Agent 时,开发者往往陷入两难境地:是将所有上下文(数据库表结构、API 文档、业务规则)一次性注入 System Prompt,导致上下文窗口(Context Window)溢出且分散模型注意力?还是选择成本高昂的频繁微调(Fine-tuning)?
**Skills 模式(Skills Pattern)**提供了一条优雅的中间路线。它通过动态加载所需知识,实现了上下文的高效利用。LangChain 对此模式的原生支持,意味着我们可以更轻松地构建具备“按需学习”能力的 Agent。
本文将结合官方文档 Build a SQL assistant with on-demand skills,引导读者从零开始,构建一个支持“按需加载知识”的 SQL Assistant。
1. 核心概念:为何选择 Skills 模式?
传统 SQL Agent 的局限性
在传统的 SQL Agent 架构中,我们通常需要在 System Prompt 中提供完整的 Database Schema。随着业务发展,当表数量扩展到数百张时,这种方式会带来显著问题:
-
Token 消耗巨大:每次对话都携带大量无关的表结构,造成资源浪费。
-
幻觉风险增加:过多的无关干扰信息会降低模型的推理准确性。
-
维护困难:所有业务线的知识紧密耦合,难以独立迭代。
Skills 模式:基于渐进式披露的解决方案
Skills 模式基于**渐进式披露(Progressive Disclosure)**原则,将知识获取过程分层处理:
-
Agent 初始状态:仅掌握有哪些“技能”(Skills)及其简要描述(Description),保持轻量级。
-
运行时加载:当面对具体问题(如“查询库存”)时,Agent 主动调用工具(
load_skill)加载该技能详细的上下文(Schema + Prompt)。 -
执行任务:基于加载的精确上下文,执行具体的任务(如编写并执行 SQL)。
这种模式有效支持了无限扩展和团队解耦,使 Agent 能够适应日益复杂的业务场景。
2. 系统架构设计
本实战项目将构建一个包含两个核心 Skills 的 SQL Assistant,以演示该模式的实际应用:
-
Sales Analytics(销售分析):负责
sales_data表,处理收入统计、订单趋势分析等。 -
Inventory Management(库存管理):负责
inventory_items表,处理库存水平监控、位置查询等。
3. 开发环境搭建
本项目采用 Pythonuv进行高效的依赖管理。
核心依赖安装
uv add langchain langchain-openai langgraph psycopg2-binary python-dotenv langchain-community
PostgreSQL 环境配置
本地启动一个 Postgres 实例,并创建agent_platform数据库。我们提供了setup_db.py脚本来自动初始化表结构和测试数据(详见文末源码)。
4. 核心实现步骤详解### Első lépés: A szakterületi készségek (The Knowledge) definiálása
A készségeket szótárszerkezetként definiáljuk, szimulálva a fájlrendszerből vagy adatbázisból történő betöltés folyamatát. Kérjük, tegye egyértelművé a különbséget a description (az Agent döntéshozatalához használatos) és a content (a ténylegesen betöltött részletes kontextus) között.
SKILLS = {"sales_analytics": {"description":"Hasznos az értékesítési bevétel, trendek elemzéséhez...","content":"""... Table Schema: sales_data ..."" },"inventory_management": {"description":"Hasznos a készletszintek ellenőrzéséhez...","content":"""... Table Schema: inventory_items ..."" }}
Második lépés: A fő eszközök (The Capabilities) megvalósítása
Az Agent két kulcsfontosságú eszköztől függ a feladatok elvégzéséhez:
-
load_skill(skill_name)**: A megadott készség részleteinek dinamikus betöltése futásidőben. ** -
run_sql_query(query)**: Konkrét SQL-utasítások végrehajtása. **
Harmadik lépés: Az Agent logika (The Brain) összeállítása
A LangGraph használatával ReAct Agent létrehozása. A System Prompt kulcsszerepet játszik itt, mivel irányítja az Agentet, hogy szigorúan kövesse az Identify -> Load -> Query szabványos eljárást (SOP).
system_prompt ="""1. Identify the relevant skill.2. Use 'load_skill' to get schema.3. Write and execute SQL using 'run_sql_query'....Do not guess table names. Always load the skill first."""
5. A futási eredmények ellenőrzése
A test_agent.py futtatásával teszteltük az értékesítési és a készletkezelési lekérdezéseket két különböző területen. Az alábbiakban a konzol tényleges kimeneti naplói láthatók, amelyek bemutatják, hogyan tölti be az Agent dinamikusan a készségeket a kérdés alapján:
Testing Sales Query...Agent calling tools: [{'name': 'load_skill', 'args': {'skill_name': 'sales_analytics'}, 'id': 'call_f270d76b7ce4404cb5f61bf2', 'type': 'tool_call'}]Tool output:You are a Sales Analytics Expert.You have access to the 'sales_data' table.Table Schema:- id: integer...Agent calling tools: [{'name': 'run_sql_query', 'args': {'query': 'SELECT SUM(amount) as total_revenue FROM sales_data;'}, 'id': 'call_b4f3e686cc7f4f22b3bb9ea7', 'type': 'tool_call'}]Tool output: [(Decimal('730.50'),)]...Agent response: The total revenue is $730.50.Testing Inventory Query...Agent calling tools: [{'name': 'load_skill', 'args': {'skill_name': 'inventory_management'}, 'id': 'call_18c823b2d5064e95a0cfe2e3', 'type': 'tool_call'}]Tool output:You are an Inventory Management Expert.You have access to the 'inventory_items' table.Table Schema...Agent calling tools: [{'name': 'run_sql_query', 'args': {'query': "SELECT warehouse_location FROM inventory_items WHERE product_name = 'Laptop';"}, 'id': 'call_647ee3a444804bd98a045f00', 'type': 'tool_call'}]Tool output: [('Warehouse A',)]...Agent response: The Laptop is located in **Warehouse A**.## 6. Teljes forráskód referencia
Az alábbiakban a projekt teljes forráskódja található, beleértve az adatbázis inicializáló szkriptet és az Agent főprogramot.
1. Adatbázis inicializálás (setup_db.py)
`import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import os from dotenv import load_dotenv
load_dotenv()
Kérjük, győződjön meg róla, hogy a .env fájlban konfigurálva vannak az adatbázis kapcsolati adatok
DB_HOST = os.getenv("DB_HOST", "localhost") DB_PORT = os.getenv("DB_PORT", "5432") DB_USER = os.getenv("DB_USER", "postgres") DB_PASSWORD = os.getenv("DB_PASSWORD", "your_password") # Kérjük, cserélje le a tényleges jelszóra DB_NAME = os.getenv("DB_NAME", "agent_platform")
def create_database(): try: # Csatlakozás az alapértelmezett 'postgres' adatbázishoz az új adatbázis létrehozásához conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD, dbname="postgres", ) conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = conn.cursor()
# Ellenőrizze, hogy az adatbázis létezik-e
cur.execute(f"SELECT 1 FROM pg_catalog.pg_database WHERE datname = '{DB_NAME}'")
exists = cur.fetchone()
if not exists:
print(f"Creating database {DB_NAME}...")
cur.execute(f"CREATE DATABASE {DB_NAME}")
else:
print(f"Database {DB_NAME} already exists.")
cur.close()
conn.close()
except Exception as e:
print(f"Error creating database: {e}")
def create_tables_and_data(): try: conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD, dbname=DB_NAME, ) cur = conn.cursor()
# Sales tábla létrehozása
print("Creating sales_data table...")
cur.execute(
"""
CREATE TABLE IF NOT EXISTS sales_data (
id SERIAL PRIMARY KEY,
transaction_date DATE,
product_id VARCHAR(50),
amount DECIMAL(10, 2),
region VARCHAR(50)
)
"""
)
# Inventory tábla létrehozása
print("Creating inventory_items table...")
cur.execute(
"""
CREATE TABLE IF NOT EXISTS inventory_items (
id SERIAL PRIMARY KEY,
product_id VARCHAR(50),
product_name VARCHAR(100),
stock_count INTEGER,
warehouse_location VARCHAR(50)
)
"""
)
# Mock adatok beszúrása
print("Inserting mock data...")
cur.execute("TRUNCATE sales_data, inventory_items")
sales_data = [
('2023-01-01', 'P001', 100.00, 'North'),
('2023-01-02', 'P002', 150.50, 'South'),
('2023-01-03', 'P001', 120.00, 'East'),
('2023-01-04', 'P003', 200.00, 'West'),
('2023-01-05', 'P002', 160.00, 'North'),
]
cur.executemany(
"INSERT INTO sales_data (transaction_date, product_id, amount, region) VALUES (%s, %s, %s, %s)",
sales_data,
)
inventory_data = [
('P001', 'Laptop', 50, 'Warehouse A'),
('P002', 'Mouse', 200, 'Warehouse B'),
('P003', 'Keyboard', 150, 'Warehouse A'),
('P004', 'Monitor', 30, 'Warehouse C'),
]
cur.executemany(
"INSERT INTO inventory_items (product_id, product_name, stock_count, warehouse_location) VALUES (%s, %s, %s, %s)",
inventory_data,
)
conn.commit()
cur.close()
conn.close()
print("Database setup complete.")
except Exception as e:
print(f"Error setting up tables: {e}")
if name == "main": create_database() create_tables_and_data() `### 2. Agent főprogram (main.py)
`import os from typing import Annotated, Literal, TypedDict, Union, Dict from dotenv import load_dotenv from langchain_openai import ChatOpenAI from langchain_core.tools import tool from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage from langchain_community.utilities import SQLDatabase from langchain_community.agent_toolkits import SQLDatabaseToolkit from langgraph.graph import StateGraph, START, END, MessagesState from langgraph.prebuilt import ToolNode, tools_condition
load_dotenv()
--- Konfiguráció ---
BASE_URL = os.getenv("BASIC_MODEL_BASE_URL") API_KEY = os.getenv("BASIC_MODEL_API_KEY") MODEL_NAME = os.getenv("BASIC_MODEL_MODEL") DB_URI = f"postgresql://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"
--- Adatbázis beállítás ---
db = SQLDatabase.from_uri(DB_URI)
--- Képességek definíciója ---
SKILLS: Dict[str, Dict[str, str]] = { "sales_analytics": { "description": "Hasznos az értékesítési bevétel, trendek és regionális teljesítmény elemzéséhez.", "content": """ Ön egy Értékesítési Elemzési Szakértő. Hozzáférése van az 'sales_data' táblához. Tábla séma: - id: egész szám (elsődleges kulcs) - transaction_date: dátum - product_id: varchar(50) - amount: decimal(10, 2) - region: varchar(50) Gyakori lekérdezések: - Teljes bevétel: SUM(amount) - Bevétel régiónként: GROUP BY region - Értékesítési trend: GROUP BY transaction_date """ }, "inventory_management": { "description": "Hasznos a készletszintek, termékhelyek és raktárkezelés ellenőrzéséhez.", "content": """ Ön egy Készletgazdálkodási Szakértő. Hozzáférése van az 'inventory_itemTable Schema:
- id: integer (primary key)
- product_id: varchar(50)
- product_name: varchar(100)
- stock_count: integer
- warehouse_location: varchar(50)
Common queries:
- Check stock: WHERE product_name = '...'
- Low stock: WHERE stock_count < threshold""" }}# --- Tools ---@tooldefload_skill(skill_name: str)-> str:""" Load the detailed prompt and schema for a specific skill. Available skills:
- sales_analytics: For sales, revenue, and transaction analysis.
- inventory_management: For stock, products, and warehouse queries. """ skill = SKILLS.get(skill_name) if not skill: return f"Error: Skill '{skill_name}' not found. Available skills: {list(SKILLS.keys())}" return skill["content"]
@tool def run_sql_query(query: str) -> str: """ Execute a SQL query against the database. Only use this tool AFTER loading the appropriate skill to understand the schema. """ try: return db.run(query) except Exception as e: return f"Error executing SQL: {e}"
@tool def list_tables() -> str: """List all available tables in the database.""" return str(db.get_usable_table_names())
tools = [load_skill, run_sql_query, list_tables]
--- Agent Setup ---
llm = ChatOpenAI( base_url=BASE_URL, api_key=API_KEY, model=MODEL_NAME, temperature=0 ) llm_with_tools = llm.bind_tools(tools)
--- Graph Definition ---
class AgentState(MessagesState):
We can add custom state if needed, but MessagesState is sufficient for simple chat
pass
def agent_node(state: AgentState): messages = state["messages"] response = llm_with_tools.invoke(messages) return {"messages": [response]}
workflow = StateGraph(AgentState) workflow.add_node("agent", agent_node)workflow.add_node("tools", ToolNode(tools))workflow.add_edge(START,"agent")workflow.add_conditional_edges("agent", tools_condition)workflow.add_edge("tools","agent")app = workflow.compile()# --- Fő Végrehajtás ---if__name__ =="main": system_prompt ="""Ön egy segítőkész SQL Asszisztens.Hozzáférésed van speciális készségekhez, amelyek adatbázis sémákat és domain tudást tartalmaznak.A felhasználó kérdésének megválaszolásához:1. Azonosítsa a releváns készséget (sales_analytics vagy inventory_management).2. Használja a 'load_skill' eszközt a séma és az utasítások lekéréséhez.3. A betöltött készség alapján írjon és hajtson végre egy SQL lekérdezést a 'run_sql_query' használatával.4. Válaszolja meg a felhasználó kérdését a lekérdezés eredményei alapján.Ne találgasson táblaneveket. Mindig töltse be először a készséget.""" print("SQL Asszisztens inicializálva. Írja be a 'quit' szót a kilépéshez.") print("-"*50) messages = [SystemMessage(content=system_prompt)]# Előmelegítő kapcsolat ellenőrzésetry: print(f"Csatlakozva az adatbázishoz:{DB_URI.split('@')[-1]}")exceptExceptionase: print(f"Adatbázis kapcsolat figyelmeztetés:{e}")whileTrue:try: user_input = input("Felhasználó: ")ifuser_input.lower()in["quit","exit"]:break messages.append(HumanMessage(content=user_input))# A végrehajtás streamelése print("Ügynök: ", end="", flush=True) final_response =Noneforeventinapp.stream({"messages": messages}, stream_mode="values"):# A 'values' módban a teljes állapotot kapjuk meg. Csak az utolsó üzenetet szeretnénk látni, ha új. last_message = event["messages"][-1]# Frissítjük az üzenettörténetünket a legújabb állapottalpass# A stream befejezése után az utolsó állapot tartalmazza a végső választ final_state = app.invoke({"messages": messages}) last_msg = final_state["messages"][-1] ifisinstance(last_msg, AIMessage): print(last_msg.content) messages = final_state["messages"]
Update history
print("-"*50) exceptExceptionase: print(f"\nError:{e}") break`





