LangChain Skills 模式实战:构建按需加载知识的 SQL 助手
在先前的文章中,我们探讨了如何通过 Deep Agents CLI 模拟 Deep Agent 使用 Skills 的模式。如今,LangChain 已原生支持这一特性,极大地简化了开发流程。本文将带领大家深入体验这一功能,构建一个更智能的 SQL 助手。
构建复杂的 AI Agent 时,开发者往往陷入两难境地:是将所有上下文(数据库表结构、API 文档、业务规则)一次性注入 System Prompt,导致上下文窗口(Context Window)溢出且分散模型注意力?还是选择成本高昂的频繁微调(Fine-tuning)?
**Skills 模式(Skills Pattern)**提供了一条优雅的中间路线。它通过动态加载所需知识,实现了上下文的高效利用。LangChain 对此模式的原生支持,意味着我们可以更轻松地构建具备“按需学习”能力的 Agent。
本文将结合官方文档 Build a SQL assistant with on-demand skills,引导读者从零开始,构建一个支持“按需加载知识”的 SQL Assistant。
1. 核心概念:为何选择 Skills 模式?
传统 SQL Agent 的局限性
在传统的 SQL Agent 架构中,我们通常需要在 System Prompt 中提供完整的 Database Schema。随着业务发展,当表数量扩展到数百张时,这种方式会带来显著问题:
-
Token 消耗巨大:每次对话都携带大量无关的表结构,造成资源浪费。
-
幻觉风险增加:过多的无关干扰信息会降低模型的推理准确性。
-
维护困难:所有业务线的知识紧密耦合,难以独立迭代。
Skills 模式:基于渐进式披露的解决方案
Skills 模式基于**渐进式披露(Progressive Disclosure)**原则,将知识获取过程分层处理:
-
Agent 初始状态:仅掌握有哪些“技能”(Skills)及其简要描述(Description),保持轻量级。
-
运行时加载:当面对具体问题(如“查询库存”)时,Agent 主动调用工具(
load_skill)加载该技能详细的上下文(Schema + Prompt)。 -
执行任务:基于加载的精确上下文,执行具体的任务(如编写并执行 SQL)。
这种模式有效支持了无限扩展和团队解耦,使 Agent 能够适应日益复杂的业务场景。
2. 系统架构设计
本实战项目将构建一个包含两个核心 Skills 的 SQL Assistant,以演示该模式的实际应用:
-
Sales Analytics(销售分析):负责
sales_data表,处理收入统计、订单趋势分析等。 -
Inventory Management(库存管理):负责
inventory_items表,处理库存水平监控、位置查询等。
3. 开发环境搭建
本项目采用 Pythonuv进行高效的依赖管理。
核心依赖安装
uv add langchain langchain-openai langgraph psycopg2-binary python-dotenv langchain-community
PostgreSQL 环境配置
本地启动一个 Postgres 实例,并创建agent_platform数据库。我们提供了setup_db.py脚本来自动初始化表结构和测试数据(详见文末源码)。
4. 核心实现步骤详解### Βήμα 1: Ορισμός των Δεξιοτήτων Τομέα (Η Γνώση)
Θα ορίσουμε τις δεξιότητες ως μια δομή λεξικού, προσομοιώνοντας τη διαδικασία φόρτωσης από ένα σύστημα αρχείων ή μια βάση δεδομένων. Σημειώστε τη διάκριση μεταξύ description (που χρησιμοποιείται από τον Agent για τη λήψη αποφάσεων επιλογής) και content (το πραγματικό φορτωμένο λεπτομερές περιεχόμενο).
SKILLS = {"sales_analytics": {"description":"Useful for analyzing sales revenue, trends...","content":"""... Table Schema: sales_data ..."" },"inventory_management": {"description":"Useful for checking stock levels...","content":"""... Table Schema: inventory_items ..."" }}
Βήμα 2: Υλοποίηση Βασικών Εργαλείων (Οι Δυνατότητες)
Ο Agent βασίζεται σε δύο βασικά εργαλεία για να ολοκληρώσει τις εργασίες:
-
load_skill(skill_name)**: Φορτώνει δυναμικά τις λεπτομέρειες της καθορισμένης δεξιότητας κατά το χρόνο εκτέλεσης. ** -
run_sql_query(query)**: Εκτελεί συγκεκριμένες δηλώσεις SQL. **
Βήμα 3: Οργάνωση της Λογικής του Agent (Ο Εγκέφαλος)
Χρησιμοποιήστε το LangGraph για να δημιουργήσετε έναν ReAct Agent. Το System Prompt παίζει καθοριστικό ρόλο εδώ, καθοδηγώντας τον Agent να ακολουθεί αυστηρά τις τυπικές λειτουργικές διαδικασίες (SOP) Identify -> Load -> Query.
system_prompt ="""1. Identify the relevant skill.2. Use 'load_skill' to get schema.3. Write and execute SQL using 'run_sql_query'....Do not guess table names. Always load the skill first.""
5. Επαλήθευση Αποτελεσμάτων Εκτέλεσης
Εκτελώντας το test_agent.py, δοκιμάσαμε ερωτήματα σε δύο διαφορετικούς τομείς, Sales και Inventory. Ακολουθούν τα πραγματικά αρχεία καταγραφής εξόδου της κονσόλας, που δείχνουν πώς ο Agent φορτώνει δυναμικά δεξιότητες με βάση το ερώτημα:
Testing Sales Query...Agent calling tools: [{'name': 'load_skill', 'args': {'skill_name': 'sales_analytics'}, 'id': 'call_f270d76b7ce4404cb5f61bf2', 'type': 'tool_call'}]Tool output:You are a Sales Analytics Expert.You have access to the 'sales_data' table.Table Schema:- id: integer...Agent calling tools: [{'name': 'run_sql_query', 'args': {'query': 'SELECT SUM(amount) as total_revenue FROM sales_data;'}, 'id': 'call_b4f3e686cc7f4f22b3bb9ea7', 'type': 'tool_call'}]Tool output: [(Decimal('730.50'),)]...Agent response: The total revenue is $730.50.Testing Inventory Query...Agent calling tools: [{'name': 'load_skill', 'args': {'skill_name': 'inventory_management'}, 'id': 'call_18c823b2d5064e95a0cfe2e3', 'type': 'tool_call'}]Tool output:You are an Inventory Management Expert.You have access to the 'inventory_items' table.Table Schema...Agent calling tools: [{'name': 'run_sql_query', 'args': {'query': "SELECT warehouse_location FROM inventory_items WHERE product_name = 'Laptop';"}, 'id': 'call_647ee3a444804bd98a045f00', 'type': 'tool_call'}]Tool output: [('Warehouse A',)]...Agent response: The Laptop is located in **Warehouse A**.## 6. Πλήρης Πηγής Κώδικας Αναφοράς
Ακολούθως είναι η πλήρης πηγή κώδικας του εργου, περιλαμβάνει το σκριπάκι αρχικοποίησης της βάσης δεδομένων και το κεντρικό πρόγραμμα Agent.
1. Αρχικοποίηση της βάσης δεδομένων (setup_db.py)
importpsycopg2frompsycopg2.extensionsimportISOLATION_LEVEL_AUTOCOMMITimportosfromdotenvimportload_dotenvload_dotenv()# Παρακαλώ εξασφαλίστε ότι στο .env έχετε ρυθμιστείτε τις πληροφορίες σύνδεσης της βάσης δεδομένωνDB_HOST = os.getenv("DB_HOST","localhost")DB_PORT = os.getenv("DB_PORT","5432")DB_USER = os.getenv("DB_USER","postgres")DB_PASSWORD = os.getenv("DB_PASSWORD","your_password")# Παρακαλώ αντικαταστήστε με τον πραγματικό κωδικόDB_NAME = os.getenv("DB_NAME","agent_platform")defcreate_database():try:# Connect to default 'postgres' database to create new db conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD, dbname="postgres" ) conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = conn.cursor()# Check if database exists cur.execute(f"SELECT 1 FROM pg_catalog.pg_database WHERE datname = '{DB_NAME}'") exists = cur.fetchone()ifnotexists: print(f"Creating database{DB_NAME}...") cur.execute(f"CREATE DATABASE{DB_NAME}")else: print(f"Database{DB_NAME}already exists.") cur.close() conn.close()exceptExceptionase: print(f"Error creating database:{e}")defcreate_tables_and_data():try: conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD, dbname=DB_NAME ) cur = conn.cursor()# Create Sales Table print("Creating sales_data table...") cur.execute(""" CREATE TABLE IF NOT EXISTS sales_data ( id SERIAL PRIMARY KEY, transaction_date DATE, product_id VARCHAR(50), amount DECIMAL(10, 2), region VARCHAR(50) ) """)# Create Inventory Table print("Creating inventory_items table...") cur.execute(""" CREATE TABLE IF NOT EXISTS inventory_items ( id SERIAL PRIMARY KEY, product_id VARCHAR(50), product_name VARCHAR(100), stock_count INTEGER, warehouse_location VARCHAR(50) ) """)# Insert Mock Data print("Inserting mock data...") cur.execute("TRUNCATE sales_data, inventory_items") sales_data = [ ('2023-01-01','P001',100.00,'North'), ('2023-01-02','P002',150.50,'South'), ('2023-01-03','P001',120.00,'East'), ('2023-01-04','P003',200.00,'West'), ('2023-01-05','P002',160.00,'North') ] cur.executemany("INSERT INTO sales_data (transaction_date, product_id, amount, region) VALUES (%s, %s, %s, %s)", sales_data ) inventory_data = [ ('P001','Laptop',50,'Warehouse A'), ('P002','Mouse',200,'Warehouse B'), ('P003','Keyboard',150,'Warehouse A'), ('P004','Monitor',30,'Warehouse C') ] cur.executemany("INSERT INTO inventory_items (product_id, product_name, stock_count, warehouse_location) VALUES (%s, %s, %s, %s)", inventory_data ) conn.commit() cur.close() conn.close() print("Database setup complete.")exceptExceptionase: print(f"Error setting up tables:{e}")if__name__ =="__main__": create_database() create_tables_and_data()### 2. Agent Κεντρικό Πρόγραμμα (main.py)
`importosfromtypingimportAnnotated, Literal, TypedDict, Union, Dictfromdotenvimportload_dotenvfromlangchain_openaiimportChatOpenAIfromlangchain_core.toolsimporttoolfromlangchain_core.messagesimportSystemMessage, HumanMessage, AIMessage, ToolMessagefromlangchain_community.utilitiesimportSQLDatabasefromlangchain_community.agent_toolkitsimportSQLDatabaseToolkitfromlanggraph.graphimportStateGraph, START, END, MessagesStatefromlanggraph.prebuiltimportToolNode, tools_conditionload_dotenv()# --- Configuration ---BASE_URL = os.getenv("BASIC_MODEL_BASE_URL")API_KEY = os.getenv("BASIC_MODEL_API_KEY")MODEL_NAME = os.getenv("BASIC_MODEL_MODEL")DB_URI =f"postgresql://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"# --- Database Setup ---db = SQLDatabase.from_uri(DB_URI)# --- Skills Definition ---SKILLS: Dict[str, Dict[str, str]] = {"sales_analytics": {"description":"Useful for analyzing sales revenue, trends, and regional performance.","content":"""You are a Sales Analytics Expert.You have access to the 'sales_data' table.Table Schema:- id: integer (primary key)- transaction_date: date- product_id: varchar(50)- amount: decimal(10, 2)- region: varchar(50)Common queries:- Total revenue: SUM(amount)- Revenue by region: GROUP BY region- Sales trend: GROUP BY transaction_date""" },"inventory_management": {"description":"Useful for checking stock levels, product locations, and warehouse management.","content":"""You are an Inventory Management Expert.You have access to the 'inventory_item"Table Schema:
- id: integer (primary key)
- product_id: varchar(50)
- product_name: varchar(100)
- stock_count: integer
- warehouse_location: varchar(50) Common queries:
- Check stock: WHERE product_name = '...'
- Low stock: WHERE stock_count < threshold""" }}# --- Tools ---@tooldefload_skill(skill_name: str)-> str:""" Load the detailed prompt and schema for a specific skill. Available skills:
- sales_analytics: For sales, revenue, and transaction analysis.
- inventory_management: For stock, products, and warehouse queries. """ skill = SKILLS.get(skill_name) if not skill: return f"Error: Skill '{skill_name}' not found. Available skills: {list(SKILLS.keys())}" return skill["content"]
@tool def run_sql_query(query: str) -> str: """ Execute a SQL query against the database. Only use this tool AFTER loading the appropriate skill to understand the schema. """ try: return db.run(query) except Exception as e: return f"Error executing SQL: {e}"
@tool def list_tables() -> str: """List all available tables in the database.""" return str(db.get_usable_table_names())
tools = [load_skill, run_sql_query, list_tables]
--- Agent Setup ---
llm = ChatOpenAI( base_url=BASE_URL, api_key=API_KEY, model=MODEL_NAME, temperature=0 ) llm_with_tools = llm.bind_tools(tools)
--- Graph Definition ---
class AgentState(MessagesState):
We can add custom state if needed, but MessagesState is sufficient for simple chat
pass
def agent_node(state: AgentState): messages = state["messages"] response = llm_with_tools.invoke(messages) return {"messages": [response]}
workflow = StateGraph(AgentState) workflow.add_node("agent", agent_node)workflow.add_node("tools", ToolNode(tools))workflow.add_edge(START,"agent")workflow.add_conditional_edges("agent", tools_condition)workflow.add_edge("tools","agent")app = workflow.compile()# --- Main Execution ---if__name__ =="main": system_prompt ="""Είστε ένας χρήσιμος Βοηθός SQL.Έχετε πρόσβαση σε εξειδικευμένες δεξιότητες που περιέχουν σχήματα βάσεων δεδομένων και γνώσεις τομέα.Για να απαντήσετε στην ερώτηση ενός χρήστη:1. Προσδιορίστε τη σχετική δεξιότητα (sales_analytics ή inventory_management).2. Χρησιμοποιήστε το εργαλείο 'load_skill' για να λάβετε το σχήμα και τις οδηγίες.3. Με βάση την φορτωμένη δεξιότητα, γράψτε και εκτελέστε ένα ερώτημα SQL χρησιμοποιώντας το 'run_sql_query'.4. Απαντήστε στην ερώτηση του χρήστη με βάση τα αποτελέσματα του ερωτήματος.Μην μαντεύετε ονόματα πινάκων. Να φορτώνετε πάντα πρώτα την δεξιότητα.""" print("SQL Assistant initialized. Type 'quit' to exit.") print("-"*50) messages = [SystemMessage(content=system_prompt)]# Pre-warm connection checktry: print(f"Connected to database:{DB_URI.split('@')[-1]}")exceptExceptionase: print(f"Database connection warning:{e}")whileTrue:try: user_input = input("User: ")ifuser_input.lower()in["quit","exit"]:break messages.append(HumanMessage(content=user_input))# Stream the execution print("Agent: ", end="", flush=True) final_response =Noneforeventinapp.stream({"messages": messages}, stream_mode="values"):# In 'values' mode, we get the full state. We just want to see the last message if it's new. last_message = event["messages"][-1]# Update our message history with the latest statepass# After stream finishes, the last state has the final answer final_state = app.invoke({"messages": messages}) last_msg = final_state["messages"][-1]ifisinstance(last_msg, AIMessage): print(last_msg.content) messages = final_state["messages"]# Update history print("-"*50)exceptExceptionase: print(f"\nError:{e}")break`





