LangChain Skills 模式实战:构建按需加载知识的 SQL 助手
În articolul precedent, am explorat modul în care Deep Agents CLI simulează modelul de utilizare a abilităților (Skills) de către Deep Agent. Acum, LangChain acceptă nativ această caracteristică, simplificând considerabil procesul de dezvoltare. Acest articol vă va ghida printr-o experiență aprofundată a acestei funcții, construind un asistent SQL mai inteligent.
Atunci când construiesc agenți AI complecși, dezvoltatorii se confruntă adesea cu o dilemă: să injecteze toate contextele (structura tabelului bazei de date, documentația API, regulile de afaceri) în System Prompt dintr-o singură dată, ceea ce duce la depășirea ferestrei de context (Context Window) și la dispersarea atenției modelului? Sau să aleagă o ajustare fină (Fine-tuning) frecventă, costisitoare?
Modelul Abilități (Skills Pattern) oferă o cale de mijloc elegantă. Acesta realizează o utilizare eficientă a contextului prin încărcarea dinamică a cunoștințelor necesare. Suportul nativ al LangChain pentru acest model înseamnă că putem construi mai ușor agenți cu capacitatea de „învățare la cerere”.
Acest articol, combinat cu documentația oficială Build a SQL assistant with on-demand skills, va ghida cititorii de la zero în construirea unui asistent SQL care acceptă „încărcarea cunoștințelor la cerere”.
1. Concepte de bază: De ce să alegeți modelul Abilități?
Limitările agentului SQL tradițional
În arhitectura tradițională a agentului SQL, trebuie de obicei să furnizăm schema completă a bazei de date în System Prompt. Pe măsură ce afacerea se dezvoltă, atunci când numărul de tabele se extinde la sute, această abordare va aduce probleme semnificative:
-
Consum uriaș de jetoane: fiecare conversație poartă o cantitate mare de structuri de tabele irelevante, provocând risipă de resurse.
-
Risc crescut de halucinații: prea multe informații de interferență irelevante vor reduce acuratețea inferenței modelului.
-
Dificultăți de întreținere: toate cunoștințele liniilor de afaceri sunt strâns cuplate, ceea ce face dificilă iterarea independentă.
Modelul Abilități: o soluție bazată pe divulgarea progresivă
Modelul Abilități se bazează pe principiul divulgării progresive (Progressive Disclosure), care procesează stratificat procesul de achiziție a cunoștințelor:
-
Starea inițială a agentului: stăpânește doar ce „abilități” (Skills) există și descrierea lor succintă (Description), menținându-se ușor.
-
Încărcare la rulare: atunci când se confruntă cu o problemă specifică (cum ar fi „interogarea inventarului”), agentul apelează în mod activ instrumentul (
load_skill) pentru a încărca contextul detaliat al abilității (Schema + Prompt). -
Efectuarea sarcinii: pe baza contextului precis încărcat, efectuați sarcini specifice (cum ar fi scrierea și executarea SQL).
Acest model acceptă în mod eficient extinderea nelimitată și decuplarea echipei, permițând agentului să se adapteze la scenarii de afaceri din ce în ce mai complexe.
2. Proiectarea arhitecturii sistemului
Acest proiect practic va construi un asistent SQL care conține două abilități de bază pentru a demonstra aplicarea practică a acestui model:
-
Sales Analytics (Analiza vânzărilor): responsabil pentru tabelul
sales_data, gestionând statistici de venituri, analiza tendințelor comenzilor etc. -
Inventory Management (Gestionarea inventarului): responsabil pentru tabelul
inventory_items, gestionând monitorizarea nivelului stocurilor, interogarea locației etc.
3. Configurarea mediului de dezvoltare
Acest proiect folosește Pythonuv pentru gestionarea eficientă a dependențelor.
Instalarea dependențelor de bază
uv add langchain langchain-openai langgraph psycopg2-binary python-dotenv langchain-community
Configurarea mediului PostgreSQL
Lansați o instanță Postgres local și creați baza de dateagent_platform. Oferim scriptulsetup_db.py pentru a inițializa automat structura tabelului și datele de testare (vezi codul sursă la sfârșitul articolului).
4. Explicație detaliată a pașilor de implementare de bază### Pasul 1: Definirea abilităților domeniului (Cunoștințele)
Definim abilitățile ca o structură de dicționar, simulând procesul de încărcare dintr-un sistem de fișiere sau o bază de date. Vă rugăm să rețineți distincția dintre description (utilizată de Agent pentru a lua decizii de selecție) și content (contextul detaliat încărcat efectiv).
SKILLS = {"sales_analytics": {"description":"Util pentru analizarea veniturilor din vânzări, tendințe...","content":"""... Schema tabelului: sales_data ..."" },"inventory_management": {"description":"Util pentru verificarea nivelurilor stocurilor...","content":"""... Schema tabelului: inventory_items ..."" }}
Pasul 2: Implementarea instrumentelor de bază (Capacitățile)
Agentul se bazează pe două instrumente cheie pentru a finaliza sarcinile:
-
load_skill(skill_name)**: Încarcă dinamic detaliile abilității specificate în timpul execuției. ** -
run_sql_query(query)**: Execută instrucțiuni SQL specifice. **
Pasul 3: Orchestrând logica Agentului (Creierul)
Utilizați LangGraph pentru a construi Agentul ReAct. System Prompt joacă un rol crucial aici, ghidând Agentul să urmeze cu strictețe procedura standard de operare (SOP) Identify -> Load -> Query.
system_prompt ="""1. Identify the relevant skill.2. Use 'load_skill' to get schema.3. Write and execute SQL using 'run_sql_query'....Do not guess table names. Always load the skill first.""
5. Verificarea efectului de rulare
Prin rularea test_agent.py, am testat interogări în două domenii diferite, Vânzări și Inventar. Următoarele sunt jurnalele de ieșire efective ale consolei, care arată modul în care Agentul încarcă dinamic abilitățile în funcție de întrebare:
Testing Sales Query...Agent calling tools: [{'name': 'load_skill', 'args': {'skill_name': 'sales_analytics'}, 'id': 'call_f270d76b7ce4404cb5f61bf2', 'type': 'tool_call'}]Tool output:You are a Sales Analytics Expert.You have access to the 'sales_data' table.Table Schema:- id: integer...Agent calling tools: [{'name': 'run_sql_query', 'args': {'query': 'SELECT SUM(amount) as total_revenue FROM sales_data;'}, 'id': 'call_b4f3e686cc7f4f22b3bb9ea7', 'type': 'tool_call'}]Tool output: [(Decimal('730.50'),)]...Agent response: The total revenue is $730.50.Testing Inventory Query...Agent calling tools: [{'name': 'load_skill', 'args': {'skill_name': 'inventory_management'}, 'id': 'call_18c823b2d5064e95a0cfe2e3', 'type': 'tool_call'}]Tool output:You are an Inventory Management Expert.You have access to the 'inventory_items' table.Table Schema...Agent calling tools: [{'name': 'run_sql_query', 'args': {'query': "SELECT warehouse_location FROM inventory_items WHERE product_name = 'Laptop';"}, 'id': 'call_647ee3a444804bd98a045f00', 'type': 'tool_call'}]Tool output: [('Warehouse A',)]...Agent response: The Laptop is located in **Warehouse A**.## 6. Referință completă a codului sursă
Următorul este codul sursă complet al proiectului, incluzând scriptul de inițializare a bazei de date și programul principal Agent.
1. Inițializarea bazei de date (setup_db.py)
`import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import os from dotenv import load_dotenv
load_dotenv()
Vă rugăm să vă asigurați că informațiile de conectare la baza de date sunt configurate în .env
DB_HOST = os.getenv("DB_HOST", "localhost") DB_PORT = os.getenv("DB_PORT", "5432") DB_USER = os.getenv("DB_USER", "postgres") DB_PASSWORD = os.getenv("DB_PASSWORD", "your_password") # Vă rugăm să înlocuiți cu parola reală DB_NAME = os.getenv("DB_NAME", "agent_platform")
def create_database(): try: # Conectați-vă la baza de date implicită 'postgres' pentru a crea o nouă bază de date conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD, dbname="postgres", ) conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = conn.cursor()
# Verificați dacă baza de date există
cur.execute(f"SELECT 1 FROM pg_catalog.pg_database WHERE datname = '{DB_NAME}'")
exists = cur.fetchone()
if not exists:
print(f"Creating database {DB_NAME}...")
cur.execute(f"CREATE DATABASE {DB_NAME}")
else:
print(f"Database {DB_NAME} already exists.")
cur.close()
conn.close()
except Exception as e:
print(f"Error creating database: {e}")
def create_tables_and_data(): try: conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD, dbname=DB_NAME, ) cur = conn.cursor()
# Creați tabelul Sales
print("Creating sales_data table...")
cur.execute(
"""
CREATE TABLE IF NOT EXISTS sales_data (
id SERIAL PRIMARY KEY,
transaction_date DATE,
product_id VARCHAR(50),
amount DECIMAL(10, 2),
region VARCHAR(50)
)
"""
)
# Creați tabelul Inventory
print("Creating inventory_items table...")
cur.execute(
"""
CREATE TABLE IF NOT EXISTS inventory_items (
id SERIAL PRIMARY KEY,
product_id VARCHAR(50),
product_name VARCHAR(100),
stock_count INTEGER,
warehouse_location VARCHAR(50)
)
"""
)
# Inserați date de test
print("Inserting mock data...")
cur.execute("TRUNCATE sales_data, inventory_items")
sales_data = [
('2023-01-01', 'P001', 100.00, 'North'),
('2023-01-02', 'P002', 150.50, 'South'),
('2023-01-03', 'P001', 120.00, 'East'),
('2023-01-04', 'P003', 200.00, 'West'),
('2023-01-05', 'P002', 160.00, 'North'),
]
cur.executemany(
"INSERT INTO sales_data (transaction_date, product_id, amount, region) VALUES (%s, %s, %s, %s)",
sales_data,
)
inventory_data = [
('P001', 'Laptop', 50, 'Warehouse A'),
('P002', 'Mouse', 200, 'Warehouse B'),
('P003', 'Keyboard', 150, 'Warehouse A'),
('P004', 'Monitor', 30, 'Warehouse C'),
]
cur.executemany(
"INSERT INTO inventory_items (product_id, product_name, stock_count, warehouse_location) VALUES (%s, %s, %s, %s)",
inventory_data,
)
conn.commit()
cur.close()
conn.close()
print("Database setup complete.")
except Exception as e:
print(f"Error setting up tables: {e}")
if name == "main": create_database() create_tables_and_data() `### 2. Agent 主程序 (main.py)
`import os from typing import Annotated, Literal, TypedDict, Union, Dict from dotenv import load_dotenv from langchain_openai import ChatOpenAI from langchain_core.tools import tool from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage from langchain_community.utilities import SQLDatabase from langchain_community.agent_toolkits import SQLDatabaseToolkit from langgraph.graph import StateGraph, START, END, MessagesState from langgraph.prebuilt import ToolNode, tools_condition
load_dotenv()
--- Configuration ---
BASE_URL = os.getenv("BASIC_MODEL_BASE_URL") API_KEY = os.getenv("BASIC_MODEL_API_KEY") MODEL_NAME = os.getenv("BASIC_MODEL_MODEL") DB_URI = f"postgresql://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"
--- Database Setup ---
db = SQLDatabase.from_uri(DB_URI)
--- Skills Definition ---
SKILLS: Dict[str, Dict[str, str]] = { "sales_analytics": { "description": "Useful for analyzing sales revenue, trends, and regional performance.", "content": """You are a Sales Analytics Expert. You have access to the 'sales_data' table. Table Schema:
- id: integer (primary key)
- transaction_date: date
- product_id: varchar(50)
- amount: decimal(10, 2)
- region: varchar(50) Common queries:
- Total revenue: SUM(amount)
- Revenue by region: GROUP BY region
- Sales trend: GROUP BY transaction_date""" }, "inventory_management": { "description": "Useful for checking stock levels, product locations, and warehouse management.", "content": """You are an Inventory Management Expert. You have access to the 'inventory_item""" Table Schema:
- id: integer (primary key)
- product_id: varchar(50)
- product_name: varchar(100)
- stock_count: integer
- warehouse_location: varchar(50) Common queries:
- Check stock: WHERE product_name = '...'
- Low stock: WHERE stock_count < threshold""" }}# --- Tools ---@tooldefload_skill(skill_name: str)-> str:"""
Încarcă promptul detaliat și schema pentru o anumită abilitate.
Abilități disponibile:
- sales_analytics: Pentru analiza vânzărilor, a veniturilor și a tranzacțiilor.
- inventory_management: Pentru interogări privind stocul, produsele și depozitul. "" skill = SKILLS.get(skill_name) if not skill: return f"Eroare: Abilitatea '{skill_name}' nu a fost găsită. Abilități disponibile: {list(SKILLS.keys())}" return skill["content"]
@tool def run_sql_query(query: str) -> str: """ Execută o interogare SQL asupra bazei de date. Utilizează acest instrument NUMAI DUPĂ încărcarea abilității corespunzătoare pentru a înțelege schema. """ try: return db.run(query) except Exception as e: return f"Eroare la executarea SQL: {e}"
@tool def list_tables() -> str: """Listează toate tabelele disponibile în baza de date.""" return str(db.get_usable_table_names())
unelte = [load_skill, run_sql_query, list_tables]
--- Configurare agent ---
llm = ChatOpenAI( base_url=BASE_URL, api_key=API_KEY, model=MODEL_NAME, temperature=0 )
llm_with_tools = llm.bind_tools(tools)
--- Definiție grafic ---
class AgentState(MessagesState):
Putem adăuga stări personalizate dacă este necesar, dar MessagesState este suficient pentru chat simplu
pass
def agent_node(state: AgentState): messages = state["messages"] response = llm_with_tools.invoke(messages) return {"messages": [response]}
workflow = StateGraph(AgentState) workflow.add_node("agent", agent_node)workflow.add_node("tools", ToolNode(tools))workflow.add_edge(START,"agent")workflow.add_conditional_edges("agent", tools_condition)workflow.add_edge("tools","agent")app = workflow.compile()# --- Execuție Principală ---if__name__ =="main": system_prompt ="""Ești un Asistent SQL util.Ai acces la abilități specializate care conțin scheme de baze de date și cunoștințe de domeniu.Pentru a răspunde la întrebarea unui utilizator:1. Identifică abilitatea relevantă (sales_analytics sau inventory_management).2. Folosește instrumentul 'load_skill' pentru a obține schema și instrucțiunile.3. Pe baza abilității încărcate, scrie și execută o interogare SQL folosind 'run_sql_query'.4. Răspunde la întrebarea utilizatorului pe baza rezultatelor interogării.Nu ghici numele tabelelor. Încarcă întotdeauna abilitatea mai întâi.""" print("Asistent SQL inițializat. Tastează 'quit' pentru a ieși.") print("-"*50) messages = [SystemMessage(content=system_prompt)]# Verificare prealabilă a conexiunii încercaretry: print(f"Conectat la baza de date:{DB_URI.split('@')[-1]}")exceptExceptionase: print(f"Avertisment conexiune bază de date:{e}")whileTrue:try: user_input = input("Utilizator: ")ifuser_input.lower()in["quit","exit"]:break messages.append(HumanMessage(content=user_input))# Transmite execuția print("Agent: ", end="", flush=True) final_response =Noneforeventinapp.stream({"messages": messages}, stream_mode="values"):# În modul 'values', obținem starea completă. Vrem doar să vedem ultimul mesaj dacă este nou. last_message = event["messages"][-1]# Actualizează istoricul mesajelor noastre cu cea mai recentă starepass# După ce fluxul se termină, ultima stare are răspunsul final final_state = app.invoke({"messages": messages})
last_msg = final_state["messages"][-1]
if isinstance(last_msg, AIMessage):
print(last_msg.content)
messages = final_state["messages"]
Actualizează istoricul
print("-"*50)
except Exception as e:
print(f"\nEroare: {e}")
break`





