LangChain Skills 模式实战:构建按需加载知识的 SQL 助手
在先前的文章中,我们探讨了如何通过 Deep Agents CLI 模拟 Deep Agent 使用 Skills 的模式。如今,LangChain 已原生支持这一特性,极大地简化了开发流程。本文将带领大家深入体验这一功能,构建一个更智能的 SQL 助手。
构建复杂的 AI Agent 时,开发者往往陷入两难境地:是将所有上下文(数据库表结构、API 文档、业务规则)一次性注入 System Prompt,导致上下文窗口(Context Window)溢出且分散模型注意力?还是选择成本高昂的频繁微调(Fine-tuning)?
**Skills 模式(Skills Pattern)**提供了一条优雅的中间路线。它通过动态加载所需知识,实现了上下文的高效利用。LangChain 对此模式的原生支持,意味着我们可以更轻松地构建具备“按需学习”能力的 Agent。
本文将结合官方文档 Build a SQL assistant with on-demand skills,引导读者从零开始,构建一个支持“按需加载知识”的 SQL Assistant。
1. 核心概念:为何选择 Skills 模式?
传统 SQL Agent 的局限性
在传统的 SQL Agent 架构中,我们通常需要在 System Prompt 中提供完整的 Database Schema。随着业务发展,当表数量扩展到数百张时,这种方式会带来显著问题:
-
Token 消耗巨大:每次对话都携带大量无关的表结构,造成资源浪费。
-
幻觉风险增加:过多的无关干扰信息会降低模型的推理准确性。
-
维护困难:所有业务线的知识紧密耦合,难以独立迭代。
Skills 模式:基于渐进式披露的解决方案
Skills 模式基于**渐进式披露(Progressive Disclosure)**原则,将知识获取过程分层处理:
-
Agent 初始状态:仅掌握有哪些“技能”(Skills)及其简要描述(Description),保持轻量级。
-
运行时加载:当面对具体问题(如“查询库存”)时,Agent 主动调用工具(
load_skill)加载该技能详细的上下文(Schema + Prompt)。 -
执行任务:基于加载的精确上下文,执行具体的任务(如编写并执行 SQL)。
这种模式有效支持了无限扩展和团队解耦,使 Agent 能够适应日益复杂的业务场景。
2. 系统架构设计
本实战项目将构建一个包含两个核心 Skills 的 SQL Assistant,以演示该模式的实际应用:
-
Sales Analytics(销售分析):负责
sales_data表,处理收入统计、订单趋势分析等。 -
Inventory Management(库存管理):负责
inventory_items表,处理库存水平监控、位置查询等。
3. 开发环境搭建
本项目采用 Pythonuv进行高效的依赖管理。
核心依赖安装
uv add langchain langchain-openai langgraph psycopg2-binary python-dotenv langchain-community
PostgreSQL 环境配置
本地启动一个 Postgres 实例,并创建agent_platform数据库。我们提供了setup_db.py脚本来自动初始化表结构和测试数据(详见文末源码)。
4. 核心实现步骤详解### Krok 1: Definiowanie umiejętności dziedzinowych (Wiedza)
Zdefiniujemy umiejętności jako strukturę słownikową, symulując proces ładowania z systemu plików lub bazy danych. Należy odróżnić description (używany przez Agenta do podejmowania decyzji o wyborze) od content (rzeczywisty załadowany szczegółowy kontekst).
SKILLS = {"sales_analytics": {"description":"Przydatne do analizy przychodów ze sprzedaży, trendów...","content":"""... Schemat tabeli: sales_data ..."" },"inventory_management": {"description":"Przydatne do sprawdzania poziomów zapasów...","content":"""... Schemat tabeli: inventory_items ..."" }}
Krok 2: Implementacja podstawowych narzędzi (Możliwości)
Agent polega na dwóch kluczowych narzędziach do wykonywania zadań:
-
load_skill(skill_name)**: Dynamiczne ładowanie szczegółów określonej umiejętności w czasie wykonywania. ** -
run_sql_query(query)**: Wykonywanie konkretnych instrukcji SQL. **
Krok 3: Aranżacja logiki Agenta (Mózg)
Wykorzystanie LangGraph do budowy Agenta ReAct. System Prompt odgrywa tutaj kluczową rolę, kierując Agenta do ścisłego przestrzegania standardowej procedury operacyjnej (SOP) Identify -> Load -> Query.
system_prompt ="""1. Zidentyfikuj odpowiednią umiejętność.2. Użyj 'load_skill', aby pobrać schemat.3. Napisz i wykonaj SQL używając 'run_sql_query'....Nie zgaduj nazw tabel. Zawsze najpierw załaduj umiejętność."""
5. Weryfikacja efektów działania
Uruchamiając test_agent.py, przetestowaliśmy zapytania z dwóch różnych dziedzin: Sprzedaży i Zapasy. Poniżej znajduje się rzeczywisty dziennik wyjściowy z konsoli, pokazujący, jak Agent dynamicznie ładuje umiejętności w zależności od pytania:
Testing Sales Query...Agent calling tools: [{'name': 'load_skill', 'args': {'skill_name': 'sales_analytics'}, 'id': 'call_f270d76b7ce4404cb5f61bf2', 'type': 'tool_call'}]Tool output:You are a Sales Analytics Expert.You have access to the 'sales_data' table.Table Schema:- id: integer...Agent calling tools: [{'name': 'run_sql_query', 'args': {'query': 'SELECT SUM(amount) as total_revenue FROM sales_data;'}, 'id': 'call_b4f3e686cc7f4f22b3bb9ea7', 'type': 'tool_call'}]Tool output: [(Decimal('730.50'),)]...Agent response: The total revenue is $730.50.Testing Inventory Query...Agent calling tools: [{'name': 'load_skill', 'args': {'skill_name': 'inventory_management'}, 'id': 'call_18c823b2d5064e95a0cfe2e3', 'type': 'tool_call'}]Tool output:You are an Inventory Management Expert.You have access to the 'inventory_items' table.Table Schema...Agent calling tools: [{'name': 'run_sql_query', 'args': {'query': "SELECT warehouse_location FROM inventory_items WHERE product_name = 'Laptop';"}, 'id': 'call_647ee3a444804bd98a045f00', 'type': 'tool_call'}]Tool output: [('Warehouse A',)]...Agent response: The Laptop is located in **Warehouse A**.## 6. Pełny kod źródłowy - odniesienie
Poniżej znajduje się pełny kod źródłowy projektu, zawierający skrypt inicjalizacji bazy danych i główny program Agenta.
1. Inicjalizacja bazy danych (setup_db.py)
`import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import os from dotenv import load_dotenv
load_dotenv()
Proszę upewnić się, że informacje o połączeniu z bazą danych są skonfigurowane w .env
DB_HOST = os.getenv("DB_HOST", "localhost") DB_PORT = os.getenv("DB_PORT", "5432") DB_USER = os.getenv("DB_USER", "postgres") DB_PASSWORD = os.getenv("DB_PASSWORD", "your_password") # Proszę zastąpić rzeczywistym hasłem DB_NAME = os.getenv("DB_NAME", "agent_platform")
def create_database(): try: # Połącz się z domyślną bazą danych 'postgres', aby utworzyć nową bazę danych conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD, dbname="postgres", ) conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = conn.cursor()
# Sprawdź, czy baza danych istnieje
cur.execute(f"SELECT 1 FROM pg_catalog.pg_database WHERE datname = '{DB_NAME}'")
exists = cur.fetchone()
if not exists:
print(f"Creating database {DB_NAME}...")
cur.execute(f"CREATE DATABASE {DB_NAME}")
else:
print(f"Database {DB_NAME} already exists.")
cur.close()
conn.close()
except Exception as e:
print(f"Error creating database: {e}")
def create_tables_and_data(): try: conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD, dbname=DB_NAME, ) cur = conn.cursor()
# Utwórz tabelę Sales
print("Creating sales_data table...")
cur.execute(
"""
CREATE TABLE IF NOT EXISTS sales_data (
id SERIAL PRIMARY KEY,
transaction_date DATE,
product_id VARCHAR(50),
amount DECIMAL(10, 2),
region VARCHAR(50)
)
"""
)
# Utwórz tabelę Inventory
print("Creating inventory_items table...")
cur.execute(
"""
CREATE TABLE IF NOT EXISTS inventory_items (
id SERIAL PRIMARY KEY,
product_id VARCHAR(50),
product_name VARCHAR(100),
stock_count INTEGER,
warehouse_location VARCHAR(50)
)
"""
)
# Wstaw przykładowe dane
print("Inserting mock data...")
cur.execute("TRUNCATE sales_data, inventory_items")
sales_data = [
('2023-01-01', 'P001', 100.00, 'North'),
('2023-01-02', 'P002', 150.50, 'South'),
('2023-01-03', 'P001', 120.00, 'East'),
('2023-01-04', 'P003', 200.00, 'West'),
('2023-01-05', 'P002', 160.00, 'North'),
]
cur.executemany(
"INSERT INTO sales_data (transaction_date, product_id, amount, region) VALUES (%s, %s, %s, %s)",
sales_data,
)
inventory_data = [
('P001', 'Laptop', 50, 'Warehouse A'),
('P002', 'Mouse', 200, 'Warehouse B'),
('P003', 'Keyboard', 150, 'Warehouse A'),
('P004', 'Monitor', 30, 'Warehouse C'),
]
cur.executemany(
"INSERT INTO inventory_items (product_id, product_name, stock_count, warehouse_location) VALUES (%s, %s, %s, %s)",
inventory_data,
)
conn.commit()
cur.close()
conn.close()
print("Database setup complete.")
except Exception as e:
print(f"Error setting up tables: {e}")
if name == "main":
create_database()
create_tables_and_data()
### **2. Agent 主程序 (main.py)**\n\nimportosfromtypingimportAnnotated, Literal, TypedDict, Union, Dictfromdotenvimportload_dotenvfromlangchain_openaiimportChatOpenAIfromlangchain_core.toolsimporttoolfromlangchain_core.messagesimportSystemMessage, HumanMessage, AIMessage, ToolMessagefromlangchain_community.utilitiesimportSQLDatabasefromlangchain_community.agent_toolkitsimportSQLDatabaseToolkitfromlanggraph.graphimportStateGraph, START, END, MessagesStatefromlanggraph.prebuiltimportToolNode, tools_conditionload_dotenv()# --- Configuration ---BASE_URL = os.getenv(\s' table.Table Schema:- id: integer (primary key)- product_id: varchar(50)- product_name: varchar(100)- stock_count: integer- warehouse_location: varchar(50)Common queries:- Check stock: WHERE product_name = '...'- Low stock: WHERE stock_count < threshold""" }}# --- Tools ---@tooldefload_skill(skill_name: str)-> str:""" Load the detailed prompt and schema for a specific skill. Available skills: - sales_analytics: For sales, revenue, and transaction analysis. - inventory_management: For stock, products, and warehouse queries. """ skill = SKILLS.get(skill_name)ifnotskill:returnf"Error: Skill '{skill_name}' not found. Available skills:{list(SKILLS.keys())}"returnskill["content"]@tooldefrun_sql_query(query: str)-> str:""" Execute a SQL query against the database. Only use this tool AFTER loading the appropriate skill to understand the schema. """try:returndb.run(query)exceptExceptionase:returnf"Error executing SQL:{e}"@tooldeflist_tables()-> str:"""List all available tables in the database."""returnstr(db.get_usable_table_names())tools = [load_skill, run_sql_query, list_tables]# --- Agent Setup ---llm = ChatOpenAI( base_url=BASE_URL, api_key=API_KEY, model=MODEL_NAME, temperature=0)llm_with_tools = llm.bind_tools(tools)# --- Graph Definition ---classAgentState(MessagesState):# We can add custom state if needed, but MessagesState is sufficient for simple chatpassdefagent_node(state: AgentState): messages = state["messages"] response = llm_with_tools.invoke(messages)return{"messages": [response]}workflow = StateGraph(AgentState)workflow.add_node("agent", agent_node)workflow.add_node("tools", ToolNode(tools))workflow.add_edge(START,"agent")workflow.add_conditional_edges("agent", tools_condition)workflow.add_edge("tools","agent")app = workflow.compile()# --- Main Execution ---if__name__ =="main": system_prompt ="""Jesteś pomocnym Asystentem SQL.Masz dostęp do specjalistycznych umiejętności, które zawierają schematy baz danych i wiedzę domenową.Aby odpowiedzieć na pytanie użytkownika:1. Zidentyfikuj odpowiednią umiejętność (sales_analytics lub inventory_management).2. Użyj narzędzia 'load_skill', aby pobrać schemat i instrukcje.3. Na podstawie załadowanej umiejętności, napisz i wykonaj zapytanie SQL za pomocą 'run_sql_query'.4. Odpowiedz na pytanie użytkownika na podstawie wyników zapytania.Nie zgaduj nazw tabel. Zawsze najpierw załaduj umiejętność.""" print("Asystent SQL zainicjowany. Wpisz 'quit', aby wyjść.") print("-"*50) messages = [SystemMessage(content=system_prompt)]# Sprawdzenie wstępnego połączenia na rozgrzewkętry: print(f"Połączono z bazą danych: {DB_URI.split('@')[-1]}")exceptExceptionase: print(f"Ostrzeżenie o połączeniu z bazą danych: {e}")whileTrue:try: user_input = input("Użytkownik: ")ifuser_input.lower()in["quit","exit"]:break messages.append(HumanMessage(content=user_input))# Przesyłanie strumieniowe wykonania print("Agent: ", end="", flush=True) final_response =Noneforeventinapp.stream({"messages": messages}, stream_mode="values"):# W trybie 'values' otrzymujemy pełny stan. Chcemy tylko zobaczyć ostatnią wiadomość, jeśli jest nowa. last_message = event["messages"][-1]# Aktualizuj naszą historię wiadomości o najnowszy stanpass# Po zakończeniu strumienia, ostatni stan zawiera ostateczną odpowiedź final_state = app.invoke({"messages": messages})
ostatnia_wiadomosc = final_state["messages"][-1]
if isinstance(last_msg, AIMessage):
print(last_msg.content)
messages = final_state["messages"]# Aktualizacja historii
print("-"*50)
except Exception as e:
print(f"\nBłąd:{e}")
break`

