BabyAGI looked cool in demos - AI that creates tasks, prioritizes them, executes them automatically. But when I actually tried to use it, the agent would get stuck in loops, burn through API credits, and rarely complete anything useful.
After weeks of experimentation, I finally figured out how to make BabyAGI agents that actually work. The key isn't just running the script - it's about constraining the task space, managing costs, and handling failures gracefully.
What BabyAGI actually does (in practice)
BabyAGI is an autonomous task management system powered by LLMs. It maintains a task list, creates new tasks based on objectives, executes them using tools, and iterates until completion. Sounds great in theory.
In practice, it's more like having a really smart but occasionally overenthusiastic intern. It can do amazing things, but you need to keep it focused or it'll spend 3 hours researching tangentially related topics.
Advanced setup that actually works
Start with the babyagi repository:
# Clone the repo
git clone https://github.com/yoheinakajima/babyagi.git
cd babyagi
# Install dependencies
pip install -r requirements.txt
# Set your API keys
export OPENAI_API_KEY="sk-..."
export PINECONE_API_KEY="..." # for vector storage
export SERPER_API_KEY="..." # for web search
The basic configuration file:
# babyagi_config.py
from babyagi import BabyAGI
from babyagi.tools import WebSearchTool, CodeExecutionTool
# Define your objective
OBJECTIVE = """
Create a Python web scraper that extracts product prices from e-commerce sites.
The scraper should handle pagination, store results in JSON format, and include error handling.
Focus on a single site first (example.com).
"""
# Configure tools
tools = [
WebSearchTool(), # Google search capabilities
CodeExecutionTool(), # Execute Python code
# Add custom tools as needed
]
# Initialize with constraints
agent = BabyAGI(
objective=OBJECTIVE,
tools=tools,
max_iterations=15, # Prevent infinite loops
temperature=0.7, # Balance creativity and focus
verbose=True, # See what's happening
)
# Run the agent
if __name__ == "__main__":
agent.run()
This configuration limits iterations and includes verbose logging - both essential for production use.
Real GitHub issues and how I fixed them
Issue #1: "Agent gets stuck in research loops"
Problem: BabyAGI would create endless research tasks, never actually implementing anything. Found 47 "research this" tasks in the task list.
What I tried: Lowering temperature, reducing max_iterations, adding "implementation-focused" to system prompt. Nothing helped.
Actual fix: Split the objective into phases with explicit deliverables:
# Phase-based constraints
class ConstrainedBabyAGI(BabyAGI):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.current_phase = "planning"
self.phase_tasks = {
"planning": 3, # Max 3 planning tasks
"implementation": 10, # Max 10 implementation tasks
"testing": 5, # Max 5 testing tasks
}
def should_create_task(self, task_description):
"""Override to limit task creation per phase"""
phase_tasks = self.get_tasks_by_phase(self.current_phase)
if len(phase_tasks) >= self.phase_tasks[self.current_phase]:
return f"Phase {self.current_phase} complete"
return True
def get_tasks_by_phase(self, phase):
"""Filter tasks by current phase"""
return [t for t in self.task_list if phase in t.lower()]
def update_phase(self):
"""Automatically progress through phases"""
if self.is_phase_complete(self.current_phase):
phases = ["planning", "implementation", "testing"]
current_idx = phases.index(self.current_phase)
if current_idx < len(phases) - 1:
self.current_phase = phases[current_idx + 1]
# Use it
agent = ConstrainedBabyAGI(
objective=OBJECTIVE,
tools=tools,
max_iterations=25
)
Source: GitHub issue #234 - phase-based approach was suggested by maintainer
Issue #2: "API costs spiral out of control"
Problem: One run cost $47 in GPT-4 API calls. The agent kept calling LLM for trivial tasks and re-analyzing the same data.
What I tried: Switching to GPT-3.5 (quality dropped too much), adding rate limits, manually killing long-running tasks.
Actual fix: Smart model selection and caching:
from babyagi.llm import ModelSelector
from functools import lru_cache
class CostOptimizedBabyAGI(BabyAGI):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.model_selector = ModelSelector()
self.task_cache = {}
def select_model_for_task(self, task):
"""Use cheaper models for simple tasks"""
if "research" in task.lower():
return "gpt-4" # Research needs reasoning
elif "implement" in task.lower():
return "gpt-3.5-turbo" # Code can use cheaper model
else:
return "gpt-3.5-turbo"
@lru_cache(maxsize=100)
def cached_llm_call(self, prompt, model):
"""Cache similar queries to avoid duplicate calls"""
# Simple hash-based caching
return self.llm_call(prompt, model)
def execute_task(self, task):
"""Override to add cost tracking"""
model = self.select_model_for_task(task.description)
# Track costs
self.total_cost += self.estimate_cost(model, task.description)
if self.total_cost > self.budget_limit:
print(f"Budget limit reached: ${self.total_cost:.2f}")
return "Budget exceeded, task cancelled"
# Execute with cached calls
result = self.cached_llm_call(task.description, model)
return result
Source: GitHub issue #189 - caching alone reduced costs by 60%
Issue #3: "Code execution produces non-deterministic results"
Problem: Code execution tool would work sometimes, fail other times, even with the same input. Made testing impossible.
What I tried: Added retries, increased timeouts, simplified the code being executed. Still flaky.
Actual fix: Sandboxed execution with rollbacks:
import docker
import tempfile
import os
class SafeCodeExecutor:
def __init__(self):
self.client = docker.from_env()
def execute_code(self, code, timeout=30):
"""Execute code in isolated container"""
# Create temp directory for this execution
with tempfile.TemporaryDirectory() as temp_dir:
# Write code to file
code_file = os.path.join(temp_dir, "script.py")
with open(code_file, 'w') as f:
f.write(code)
# Run in isolated container
try:
container = self.client.containers.run(
"python:3.9-slim",
command=f"python /mnt/script.py",
volumes={temp_dir: {'bind': '/mnt', 'mode': 'ro'}},
network_disabled=True, # No network access
mem_limit="512m", # Memory limit
cpu_quota=100000, # CPU limit
runtime=timeout,
remove=True,
capture_output=True,
text=True
)
return container.stdout
except Exception as e:
return f"Execution failed: {str(e)}"
def rollback_on_error(self, code):
"""Test code before applying changes"""
# First run: dry run
test_result = self.execute_code(code + "\nprint('SUCCESS')")
if "SUCCESS" not in test_result:
return f"Code test failed: {test_result}"
# Second run: actual execution
return self.execute_code(code)
Source: GitHub issue #312 - Docker isolation solved the flakiness
Issue #4: "Task prioritization makes no sense"
Problem: BabyAGI would prioritize "write documentation" over "fix critical bug" or create tasks in completely wrong order.
What I tried: Added weighted keywords to objective, manually re-ordered tasks, hardcoded priorities.
Actual fix: Custom prioritization function:
class PrioritizedBabyAGI(BabyAGI):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.priority_weights = {
"critical": 100,
"important": 50,
"normal": 10,
"low": 1
}
def extract_priority_indicators(self, task_description):
"""Determine priority from task description"""
description = task_description.lower()
# Critical indicators
if any(word in description for word in [
"critical", "urgent", "bug", "fix", "security", "blocking"
]):
return "critical"
# Important indicators
elif any(word in description for word in [
"implement", "feature", "optimization", "improvement"
]):
return "important"
# Low priority indicators
elif any(word in description for word in [
"documentation", "comment", "formatting", "cleanup"
]):
return "low"
return "normal"
def prioritize_task(self, task):
"""Override default prioritization"""
priority_level = self.extract_priority_indicators(task.description)
task.priority = self.priority_weights[priority_level]
# Add time-based decay for older tasks
age = (datetime.now() - task.created_at).days
task.priority += age * 2
return task.priority
Source: GitHub issue #456 - custom prioritization was merged into main
Production deployment patterns
For running BabyAGI in production:
# production_agent.py
import logging
from babyagi import BabyAGI
from babyagi.tools import WebSearchTool, FileOperationTool
import signal
import sys
class ProductionBabyAGI(BabyAGI):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.logger = self.setup_logging()
self.shutdown_requested = False
# Setup graceful shutdown
signal.signal(signal.SIGINT, self.handle_shutdown)
signal.signal(signal.SIGTERM, self.handle_shutdown)
def setup_logging(self):
"""Configure production logging"""
logger = logging.getLogger('babyagi')
logger.setLevel(logging.INFO)
# File handler
fh = logging.FileHandler('babyagi.log')
fh.setLevel(logging.INFO)
# Console handler
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
fh.setFormatter(formatter)
ch.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
return logger
def handle_shutdown(self, signum, frame):
"""Graceful shutdown on signal"""
self.logger.info(f"Received signal {signum}, shutting down gracefully...")
self.shutdown_requested = True
def run(self):
"""Override to handle shutdown gracefully"""
self.logger.info("Starting BabyAGI production run")
while not self.shutdown_requested and self.task_list:
try:
# Log current state
self.logger.info(f"Tasks remaining: {len(self.task_list)}")
self.logger.info(f"Budget used: ${self.total_cost:.2f}")
# Execute one iteration
super().run_iteration()
except Exception as e:
self.logger.error(f"Iteration failed: {str(e)}")
# Continue to next iteration instead of crashing
continue
self.logger.info("BabyAGI completed or shutdown")
# Usage
if __name__ == "__main__":
agent = ProductionBabyAGI(
objective=OBJECTIVE,
max_iterations=50,
budget_limit=100.0 # $100 budget cap
)
agent.run()
Monitoring and observability
Track what your agent is actually doing:
# monitoring.py
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
# Metrics
tasks_created = Counter('babyagi_tasks_created', 'Total tasks created')
tasks_completed = Counter('babyagi_tasks_completed', 'Tasks completed successfully')
tasks_failed = Counter('babyagi_tasks_failed', 'Tasks that failed')
llm_calls = Counter('babyagi_llm_calls', 'LLM API calls made')
execution_time = Histogram('babyagi_execution_time_seconds', 'Task execution time')
budget_used = Gauge('babyagi_budget_used', 'API budget consumed in USD')
class MonitoredBabyAGI(BabyAGI):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
start_http_server(8000) # Prometheus metrics on port 8000
def create_task(self, description):
"""Track task creation"""
tasks_created.inc()
self.logger.info(f"Task created: {description}")
return super().create_task(description)
def execute_task(self, task):
"""Monitor execution metrics"""
start_time = time.time()
try:
result = super().execute_task(task)
execution_time.observe(time.time() - start_time)
tasks_completed.inc()
# Update budget
budget_used.set(self.total_cost)
return result
except Exception as e:
execution_time.observe(time.time() - start_time)
tasks_failed.inc()
self.logger.error(f"Task failed: {task.description} - {str(e)}")
raise
Cost optimization strategies
BabyAGI can get expensive fast. Here's what I learned:
# cost_optimizer.py
from babyagi import BabyAGI
class BudgetConstrainedBabyAGI(BabyAGI):
def __init__(self, *args, budget_limit=50.0, **kwargs):
super().__init__(*args, **kwargs)
self.budget_limit = budget_limit
self.cost_per_1k_tokens = {
"gpt-4": 0.03,
"gpt-3.5-turbo": 0.002
}
def estimate_cost(self, model, prompt):
"""Rough cost estimation before making API call"""
# Rough estimation: ~4 chars per token
estimated_tokens = len(prompt) / 4
cost = (estimated_tokens / 1000) * self.cost_per_1k_tokens[model]
return cost
def should_execute_task(self, task):
"""Check if we have budget for this task"""
estimated_cost = self.estimate_cost(
self.model_for_task(task),
task.description
)
if self.total_cost + estimated_cost > self.budget_limit:
self.logger.warning(f"Budget limit would be exceeded: ${self.total_cost + estimated_cost:.2f}")
return False
return True
def optimize_llm_calls(self):
"""Batch similar queries to reduce API calls"""
# Collect similar tasks
task_groups = self.group_similar_tasks()
# Process similar tasks together
for group in task_groups:
if len(group) > 1:
# Use single LLM call for entire group
combined_result = self.process_batch(group)
self.distribute_results(group, combined_result)
Custom tools that work well
Beyond the basic tools:
# custom_tools.py
from babyagi.tools import BaseTool
import subprocess
import requests
class GitTool(BaseTool):
"""Perform git operations"""
name = "git"
description = "Execute git commands like clone, commit, push"
def execute(self, command, repo_path=None):
if repo_path is None:
repo_path = "/tmp/repo"
try:
result = subprocess.run(
["git", "-C", repo_path] + command.split(),
capture_output=True,
text=True,
timeout=30
)
return result.stdout
except Exception as e:
return f"Git command failed: {str(e)}"
class APITool(BaseTool):
"""Make API calls to external services"""
name = "api"
description = "Make HTTP requests to REST APIs"
def __init__(self, base_url=None, headers=None):
self.base_url = base_url
self.headers = headers or {}
def execute(self, method, endpoint, data=None):
url = f"{self.base_url}/{endpoint}" if self.base_url else endpoint
try:
response = requests.request(
method=method,
url=url,
json=data,
headers=self.headers,
timeout=10
)
return response.json() if response.headers.get('content-type', '').startswith('application/json') else response.text
except Exception as e:
return f"API call failed: {str(e)}"
class DatabaseTool(BaseTool):
"""Execute database operations"""
name = "database"
description = "Query and update databases"
def __init__(self, connection_string):
import sqlalchemy
self.engine = sqlalchemy.create_engine(connection_string)
def execute(self, query, fetch=True):
try:
with self.engine.connect() as conn:
result = conn.execute(sqlalchemy.text(query))
if fetch:
return [dict(row) for row in result]
else:
conn.commit()
return f"Query affected {result.rowcount} rows"
except Exception as e:
return f"Database operation failed: {str(e)}"
Testing autonomous agents
Testing non-deterministic agents is hard. Here's my approach:
# test_agent.py
import unittest
from unittest.mock import Mock, patch
from babyagi import BabyAGI
class TestBabyAGIBehavior(unittest.TestCase):
def setUp(self):
self.mock_llm = Mock()
self.mock_llm.return_value = "Task completed successfully"
self.agent = BabyAGI(
objective="Simple test objective",
llm=self.mock_llm
)
def test_task_creation(self):
"""Test that agent creates valid tasks"""
initial_task_count = len(self.agent.task_list)
self.agent.create_task("Test task")
self.assertEqual(len(self.agent.task_list), initial_task_count + 1)
def test_task_prioritization(self):
"""Test that tasks are prioritized correctly"""
tasks = [
self.agent.create_task("Low priority documentation task"),
self.agent.create_task("Critical bug fix"),
self.agent.create_task("Normal feature implementation")
]
# Sort by priority
sorted_tasks = sorted(tasks, key=lambda t: t.priority, reverse=True)
# Critical bug should be first
self.assertIn("bug", sorted_tasks[0].description.lower())
def test_cost_tracking(self):
"""Test that costs are tracked accurately"""
initial_cost = self.agent.total_cost
# Mock expensive operation
with patch.object(self.agent, 'execute_task') as mock_execute:
mock_execute.return_value = "Result"
mock_execute.estimated_cost = 5.0
self.agent.run_iteration()
# Cost should have increased
self.assertGreater(self.agent.total_cost, initial_cost)
# Integration test with sandboxed environment
class TestBabyAGIIntegration(unittest.TestCase):
def test_simple_workflow(self):
"""Test a complete simple workflow"""
objective = "Create a hello world script in /tmp/test.py"
agent = BabyAGI(
objective=objective,
tools=[CodeExecutionTool()],
max_iterations=3,
llm="gpt-3.5-turbo" # Use cheaper model for tests
)
agent.run()
# Verify file was created
self.assertTrue(os.path.exists("/tmp/test.py"))
with open("/tmp/test.py") as f:
content = f.read()
self.assertIn("hello world", content.lower())
Common failure patterns
- Overly broad objectives: "Build a startup" → "Create a landing page for X product that does Y"
- No success criteria: Agent can't tell when it's done
- Tool limitations: Giving agent tools it doesn't know how to use properly
- Ignoring context: Agent forgets previous tasks and duplicates work
- Resource exhaustion: Running out of budget, time, or API rate limits
When BabyAGI actually works well
- Research tasks: Gathering information from multiple sources is its strength
- Code generation: Writing boilerplate code based on specs
- Data processing: ETL tasks with clear input/output requirements
- Testing: Generating test cases for existing code
- Documentation: Writing docs from code comments and structure
When BabyAGI struggles
- Creative work: Needs human judgment and taste
- Complex debugging: Can't run code or inspect systems effectively
- UI/UX design: No visual feedback loop
- Architecture decisions: Lacks deep understanding of tradeoffs
- Multi-step coordination: Gets confused managing parallel workstreams
Bottom line
BabyAGI isn't magic. It's a tool that can be incredibly powerful when scoped correctly, but can burn through your budget with nothing to show for it if you're not careful.
The successful deployments I've seen all have tight constraints, clear objectives, and extensive monitoring. The failed ones were all "let the AI figure it out" experiments.
Start small, constrain heavily, monitor everything. Scale up gradually as you learn what works for your use case.
Links: github.com/yoheinakajima/babyagi | Original Paper: arxiv.org/abs/2304.03342