LangGraph ์ํฌํ๋ก์ฐ ํ ํ๋ฆฟ (v38)
Python ๊ฐ๋ฐ์๋ฅผ ์ํ LangGraph ๊ธฐ๋ฐ AI ์์ด์ ํธ ์ํฌํ๋ก์ฐ ํ ํ๋ฆฟ
LangChain๊ณผ LangGraph๋ฅผ ์ฌ์ฉํ Python ๊ธฐ๋ฐ AI ์์ด์ ํธ ๊ฐ๋ฐ์ ์ํ ์ค์ ๊ฐ์ด๋์ ๋๋ค. ์ด ํ ํ๋ฆฟ์ ์ค์ ๊ฐ๋ฐ์๋ค์ด ๊ฒช๋ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํด ์ค๊ณ๋์์ต๋๋ค.
1. LangGraph ์ํคํ ์ฒ ๊ฐ์
LangGraph๋ ์ํ ๊ธฐ๋ฐ ์ํฌํ๋ก์ฐ ์์คํ ์ผ๋ก, ๋ค์๊ณผ ๊ฐ์ ํต์ฌ ๊ตฌ์ฑ ์์๋ก ๊ตฌ์ฑ๋ฉ๋๋ค:
ํต์ฌ ๊ตฌ์ฑ ์์:
- Nodes: ์ํฌํ๋ก์ฐ์ ๋จ๊ณ
- Edges: ๋ ธ๋ ๊ฐ์ ์ฐ๊ฒฐ
- State: ์ํฌํ๋ก์ฐ์ ์ํ ๊ด๋ฆฌ
- Checkpointing: ์ํ ์ ์ฅ ๋ฐ ๋ณต์
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class GraphState(TypedDict):
messages: Annotated[list, operator.add]
context: str
# ๊ทธ๋ํ ์์ฑ
workflow = StateGraph(GraphState)
2. ํ ํ๋ฆฟ 1: ๊ฐ๋จํ RAG ์์ด์ ํธ (๊ฒ์ โ ์์ฑ โ ๊ฒ์ฆ)
๋๋ถ๋ถ์ LLM ์์ฉ ํ๋ก๊ทธ๋จ์ RAG(๊ฒ์ ๊ธฐ๋ฐ ์์ฑ) ํจํด์ ๊ธฐ๋ฐ์ผ๋ก ํฉ๋๋ค:
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
class RAGAgent:
def __init__(self, llm):
self.llm = llm
self.retriever = self._setup_retriever()
self.prompt = PromptTemplate.from_template(
"์ฃผ์ด์ง ์ปจํ
์คํธ๋ฅผ ์ฌ์ฉํ์ฌ ์ง๋ฌธ์ ๋ตํ์ธ์:\n\n{context}\n\n์ง๋ฌธ: {question}"
)
def retrieve(self, state):
question = state["messages"][-1].content
docs = self.retriever.invoke(question)
context = "\n\n".join([doc.page_content for doc in docs])
return {"context": context}
def generate(self, state):
chain = self.prompt | self.llm | StrOutputParser()
result = chain.invoke({
"question": state["messages"][-1].content,
"context": state["context"]
})
return {"messages": [result]}
def validate(self, state):
# ๊ฐ๋จํ ๊ฒ์ฆ ๋ก์ง
if len(state["messages"][-1].content) < 10:
return {"messages": ["์ง๋ฌธ์ ๋ํ ์ถฉ๋ถํ ์ ๋ณด๊ฐ ์์ต๋๋ค."]}
return {"messages": [state["messages"][-1].content]}
# ์ํฌํ๋ก์ฐ ์ ์
def create_rag_workflow():
workflow = StateGraph(GraphState)
workflow.add_node("retrieve", RAGAgent.retrieve)
workflow.add_node("generate", RAGAgent.generate)
workflow.add_node("validate", RAGAgent.validate)
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", "validate")
workflow.add_edge("validate", END)
return workflow.compile()
3. ํ ํ๋ฆฟ 2: ๋ค์ค ๋๊ตฌ ์์ด์ ํธ (๊ณํ โ ์คํ โ ๊ด์ฐฐ โ ๊ฒฐ์ )
๋ณต์กํ ์์
์ ์ฒ๋ฆฌํ๋ ๋ค์ค ๋๊ตฌ ์์ด์ ํธ:
from langchain.tools import Tool
from langchain_core.messages import ToolMessage
import json
class MultiToolAgent:
def __init__(self):
# ์์ ๋๊ตฌ๋ค
self.tools = [
Tool(
name="weather",
func=lambda location: f"{location}์ ๋ ์จ๋ ๋ง์ต๋๋ค.",
description="๋ ์จ ์ ๋ณด ์กฐํ"
),
Tool(
name="calculator",
func=lambda expression: f"๊ฒฐ๊ณผ: {eval(expression)}",
description="์ํ ๊ณ์ฐ"
)
]
def plan(self, state):
# ์์
๊ณํ ์์ฑ
plan = {
"tasks": [
{"name": "weather", "params": {"location": "์์ธ"}},
{"name": "calculator", "params": {"expression": "2+2"}}
]
}
return {"plan": plan}
def execute(self, state):
# ๋๊ตฌ ์คํ
results = []
for task in state["plan"]["tasks"]:
tool = next(t for t in self.tools if t.name == task["name"])
result = tool.func(**task["params"])
results.append(result)
return {"execution_results": results}
def observe(self, state):
# ๊ฒฐ๊ณผ ๊ด์ฐฐ ๋ฐ ๋ถ์
return {"analysis": "๋ชจ๋ ์์
์ด ์๋ฃ๋์์ต๋๋ค."}
def decide(self, state):
# ๊ฒฐ์ ๋ก์ง
if len(state["execution_results"]) > 0:
return {"messages": ["์์
์ด ์ฑ๊ณต์ ์ผ๋ก ์๋ฃ๋์์ต๋๋ค."]}
return {"messages": ["์์
์คํจ."]}
4. ํ ํ๋ฆฟ 3: ์ธ๊ฐ-์ค๊ฐ ์ํฌํ๋ก์ฐ (์ผ์์ ์ง โ ๊ฒํ โ ๊ณ์)
์ฌ๋์ ๊ฐ์
์ด ํ์ํ ์ค์ํ ๊ฒฐ์ ์ ์ํ ์ํฌํ๋ก์ฐ:
class HumanInLoopAgent:
def __init__(self):
self.user_feedback = None
def pause_for_review(self, state):
# ์ฌ์ฉ์ ๊ฒํ ๋ฅผ ์ํ ์ผ์์ ์ง
return {"status": "pending_review"}
def review(self, state):
# ์ฌ์ฉ์ ๊ฒํ ์ฒ๋ฆฌ
if self.user_feedback is not None:
return {"messages": [self.user_feedback]}
return {"messages": ["์ฌ์ฉ์ ๊ฒํ ๊ฐ ํ์ํฉ๋๋ค."]}
def continue_workflow(self, state):
# ์ํฌํ๋ก์ฐ ์ฌ๊ฐ
return {"status": "processing"}
# ์ฌ์ฉ์ ํผ๋๋ฐฑ ์ค์
def set_user_feedback(feedback):
agent = HumanInLoopAgent()
agent.user_feedback = feedback
return agent
5. ํ ํ๋ฆฟ 5: ๋ณ๋ ฌ ์คํ ์์ด์ ํธ (๋ถ๊ธฐ โ ์ฒ๋ฆฌ โ ์ง๊ณ)
๋๋ ๋ฐ์ดํฐ๋ฅผ ๋ณ๋ ฌ๋ก ์ฒ๋ฆฌ:
from concurrent.futures import ThreadPoolExecutor
import asyncio
class ParallelAgent:
def __init__(self, max_workers=4):
self.max_workers = max_workers
def fan_out(self, state):
# ์
๋ ฅ ๋ฐ์ดํฐ ๋ถ๊ธฐ
data_chunks = [
{"chunk_id": i, "data": f"chunk_{i}"}
for i in range(4)
]
return {"chunks": data_chunks}
def process_chunk(self, chunk_data):
# ๊ฐ๋ณ ์ฒญํฌ ์ฒ๋ฆฌ
time.sleep(0.1) # ์๋ฎฌ๋ ์ด์
return {"result": f"processed_{chunk_data['data']}"}
async def parallel_process(self, state):
# ๋ณ๋ ฌ ์ฒ๋ฆฌ
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [
executor.submit(self.process_chunk, chunk)
for chunk in state["chunks"]
]
results = [future.result() for future in futures]
return {"processed_results": results}
def aggregate(self, state):
# ๊ฒฐ๊ณผ ์ง๊ณ
aggregated = {
"total_count": len(state["processed_results"]),
"results": [r["result"] for r in state["processed_results"]]
}
return {"messages": [f"์ด {aggregated['total_count']}๊ฐ ์ฒ๋ฆฌ ์๋ฃ"]}
6. ์ํ ๊ด๋ฆฌ ํจํด
ํจ์จ์ ์ธ ์ํ ๊ด๋ฆฌ:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint import BaseCheckpointSaver
class StateManager:
def __init__(self):
# ๋ฉ๋ชจ๋ฆฌ ์ฒดํฌํฌ์ธํธ ์ฌ์ฉ
self.checkpointer = MemorySaver()
def get_state(self, thread_id):
# ์ํ ์กฐํ
return self.checkpointer.get(thread_id)
def update_state(self, thread_id, new_state):
# ์ํ ์
๋ฐ์ดํธ
self.checkpointer.put(thread_id, new_state)
def rollback(self, thread_id, version):
# ์ด์ ๋ฒ์ ์ผ๋ก ๋กค๋ฐฑ
pass
# ์ํ ์ ์ฅ ์ค์
def setup_workflow_with_checkpoint():
workflow = StateGraph(GraphState)
workflow.add_checkpoint(MemorySaver())
return workflow
7. ์คํธ๋ฆฌ๋ฐ ๋ฐ ์ค์๊ฐ ์ ๋ฐ์ดํธ
์ค์๊ฐ ์ฒ๋ฆฌ ์ง์:
python
from langchain_core.callbacks import BaseCallbackHandler
class StreamingHandler(BaseCallbackHandler):
def on_chain_start(self, serialized, inputs, **kwargs):
print("์ฒด์ธ ์์")
def on_chain_end(self, outputs, **kwargs):
print("์ฒด์ธ ์ข
๋ฃ")
def on_llm_new_token(self, token, **kwargs):
print(f"ํ ํฐ: {token}", end="", flush=True)
class StreamingAgent:
def __init__(self):
self.streaming_handler = StreamingHandler()
def stream_response(self, query):
# ์คํธ๋ฆฌ๋ฐ ์๋ต
llm = ChatOpenAI(callbacks=[self.streaming_handler])
response = llm.invoke(query)
return response
# ์ฌ์ฉ ์์
async def stream_example():
---
๐ฅ **Get the full guide on Gumroad**: https://gumroad.com/l/auto ($5)













