LangGraph μν¬νλ‘μ° ν νλ¦Ώ (v34)
Python κ°λ°μλ₯Ό μν LangChain/LangGraph μν¬νλ‘μ° ν νλ¦Ώ
1. LangGraph μν€ν μ² κ°μ
LangGraphλ μν κΈ°λ°μ μν¬νλ‘μ° μμ€ν μΌλ‘, λ€μκ³Ό κ°μ ν΅μ¬ κ΅¬μ± μμλ‘ μλν©λλ€:
- λ Έλ (Nodes): κ° λ¨κ³μ μμ μ μ μνλ ν¨μ
- μ£μ§ (Edges): λ Έλ κ°μ μ μ΄ μ‘°κ±΄
- μν (State): λͺ¨λ λ Έλμμ 곡μ λλ λ°μ΄ν° ꡬ쑰
- 체ν¬ν¬μΈν (Checkpointing): μνλ₯Ό μ μ₯νκ³ λ³΅κ΅¬νλ κΈ°λ₯
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
import operator
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
user_input: str
# κΈ°λ³Έ μν¬νλ‘μ° κ΅¬μ±
workflow = StateGraph(AgentState)
2. ν νλ¦Ώ 1: κ°λ¨ν RAG μμ΄μ νΈ (κ²μ β μμ± β κ²μ¦)
μ€μ λ¬Έμ : λ¬Έμ κ²μ ν μμ±λ λ΅λ³μ μ νμ± κ²μ¦ νμ
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
# RAG μν¬νλ‘μ°
def retrieve(state):
# λ¬Έμ κ²μ λ‘μ§
vector_store = VectorStore()
query = state["user_input"]
retrieved_docs = vector_store.search(query, k=3)
return {"retrieved_docs": retrieved_docs}
def generate(state):
# μμ± λ‘μ§
prompt = PromptTemplate.from_template(
"λ€μ λ¬Έμλ₯Ό κΈ°λ°μΌλ‘ μ§λ¬Έμ λ΅νμΈμ: {context}\nμ§λ¬Έ: {query}"
)
chain = prompt | ChatOpenAI(model="gpt-4") | StrOutputParser()
context = "\n".join([doc.page_content for doc in state["retrieved_docs"]])
answer = chain.invoke({"context": context, "query": state["user_input"]})
return {"generated_answer": answer}
def validate(state):
# κ²μ¦ λ‘μ§
prompt = PromptTemplate.from_template(
"λ€μ λ΅λ³μ΄ μ§λ¬Έμ λ΅νκ³ μλμ§ κ²μ¦νμΈμ:\nμ§λ¬Έ: {query}\nλ΅λ³: {answer}\nμ λ΅ μ¬λΆ: (μ/μλμ€)"
)
chain = prompt | ChatOpenAI(model="gpt-4") | StrOutputParser()
verdict = chain.invoke({
"query": state["user_input"],
"answer": state["generated_answer"]
})
return {"validation": verdict, "valid": "μ" in verdict}
# μν¬νλ‘μ° μ μ
workflow = StateGraph(AgentState)
workflow.add_node("retrieve", retrieve)
workflow.add_node("generate", generate)
workflow.add_node("validate", validate)
# μ£μ§ μ μ
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
# κ²μ¦ κ²°κ³Όμ λ°λΌ λΆκΈ°
def route_validation(state):
if state["valid"]:
return END
else:
return "retrieve" # μ¬κ²μ
workflow.add_conditional_edges(
"validate",
route_validation,
{
"retrieve": "retrieve",
END: END
}
)
3. ν νλ¦Ώ 2: λ€μ€ λꡬ μμ΄μ νΈ (κ³ν β μ€ν β κ΄μ°° β κ²°μ )
μ€μ λ¬Έμ : μ¬λ¬ λꡬλ₯Ό μ¬μ©ν΄μΌ νλ 볡μ‘ν μμ
μ²λ¦¬
from langchain.tools import Tool
from langchain_openai import OpenAI
# λꡬ μ μ
tools = [
Tool(
name="search",
func=lambda query: f"κ²μ κ²°κ³Ό: {query}",
description="μΉ κ²μμ μν λꡬ"
),
Tool(
name="calculator",
func=lambda expression: f"κ³μ° κ²°κ³Ό: {eval(expression)}",
description="μν κ³μ°μ μν λꡬ"
)
]
class ToolAgentState(TypedDict):
messages: Annotated[list, operator.add]
plan: str
execution_result: str
observation: str
def plan(state):
# μμ
κ³ν μμ±
prompt = PromptTemplate.from_template(
"λ€μ μμ
μ μν κ³νμ μΈμ°μΈμ:\n{task}\nλꡬ μ¬μ© κ³ν:"
)
chain = prompt | ChatOpenAI(model="gpt-4") | StrOutputParser()
plan = chain.invoke({"task": state["user_input"]})
return {"plan": plan}
def execute(state):
# λꡬ μ€ν
tools_dict = {tool.name: tool for tool in tools}
# κ°λ¨ν μ€ν λ‘μ§ (μ€μ ꡬν μ λꡬ μ ν λ‘μ§ ν¬ν¨)
tool_result = tools_dict["search"].run(state["user_input"])
return {"execution_result": tool_result}
def observe(state):
# μ€ν κ²°κ³Ό κ΄μ°°
return {"observation": f"μ€ν κ²°κ³Ό: {state['execution_result']}"}
def decide(state):
# κ²°μ λ‘μ§
prompt = PromptTemplate.from_template(
"λ€μ μμ
μ κ³ λ €νμΈμ:\n{task}\nκ΄μ°° κ²°κ³Ό: {observation}\nκ²°μ :"
)
chain = prompt | ChatOpenAI(model="gpt-4") | StrOutputParser()
decision = chain.invoke({
"task": state["user_input"],
"observation": state["observation"]
})
return {"decision": decision}
# μν¬νλ‘μ° κ΅¬μ±
workflow = StateGraph(ToolAgentState)
workflow.add_node("plan", plan)
workflow.add_node("execute", execute)
workflow.add_node("observe", observe)
workflow.add_node("decide", decide)
workflow.set_entry_point("plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "observe")
workflow.add_edge("observe", "decide")
workflow.add_edge("decide", END)
4. ν νλ¦Ώ 3: μΈκ°-μ€κ° μν¬νλ‘μ° (μΌμμ μ§ β κ²ν β κ³μ)
μ€μ λ¬Έμ : μΈκ°μ νλ¨μ΄ νμν μμ
μμ μλνμ μΈκ° ν΅μ μ κ· ν
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, END
from datetime import datetime
class HumanInLoopState(TypedDict):
messages: Annotated[list, operator.add]
task: str
human_review: str
status: str # "pending", "approved", "rejected"
timestamp: str
def task_creation(state):
# μμ
μμ±
task = f"μ¬μ©μ μμ²: {state['user_input']}"
return {"task": task, "status": "pending", "timestamp": datetime.now().isoformat()}
def review_request(state):
# κ²ν μμ²
return {"messages": state["messages"] + [{"role": "assistant", "content":
f"μμ
μ κ²ν ν΄ μ£ΌμΈμ:\n{state['task']}\n[μΉμΈ/κ±°λΆ]"}]}
def human_review(state):
# μΈκ° κ²ν
review = state["human_review"] # μ¬μ©μ μ
λ ₯
status = "approved" if "μΉμΈ" in review else "rejected"
return {"status": status, "messages": state["messages"] + [{"role": "user", "content": review}]}
def handle_rejection(state):
# κ±°λΆ μ μ¬μμ
return {"messages": state["messages"] + [{"role": "assistant", "content":
"μμ
μ΄ κ±°λΆλμμ΅λλ€. μλ‘μ΄ μμ²μ μ£ΌμΈμ"}]}
# μν¬νλ‘μ° μ μ
workflow = StateGraph(HumanInLoopState)
workflow.add_node("create_task", task_creation)
workflow.add_node("request_review", review_request)
workflow.add_node("human_review", human_review)
workflow.add_node("handle_rejection", handle_rejection)
workflow.set_entry_point("create_task")
workflow.add_edge("create_task", "request_review")
workflow.add_edge("request_review", "human_review")
def route_review(state):
if state["status"] == "approved":
return "END"
else:
return "handle_rejection"
workflow.add_conditional_edges(
"human_review",
route_review,
{
"handle_rejection": "handle_rejection",
"END": END
}
)
5. ν νλ¦Ώ 5: λ³λ ¬ μ€ν μμ΄μ νΈ (ν¬μμ β μ²λ¦¬ β μ§κ³)
μ€μ λ¬Έμ : μ¬λ¬ λ°μ΄ν° μμ€λ₯Ό λμμ μ²λ¦¬νκ³ κ²°κ³Όλ₯Ό ν΅ν©νλ μμ
python
import asyncio
from concurrent.futures import ThreadPoolExecutor
class ParallelProcessingState(TypedDict):
messages: Annotated[list, operator.add]
data_sources: list
processed_results: list
aggregated_result: str
def fan_out(state):
# λ°μ΄ν° μμ€ λΆμ°
sources = state["data_sources"]
return {"processed_results": []}
async def process_async(data_source):
# λΉλκΈ° μ²λ¦¬ λ‘μ§
await asyncio.sleep(1) # μ€μ API νΈμΆ
return f"μ²λ¦¬λ {data_source}"
def process_parallel(state):
# λ³λ ¬ μ²λ¦¬
with ThreadPoolExecutor(max
---
π₯ **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)













