Automating Lead Generation with Python Agents and n8n Workflows
Abhishek Sharma
Software Developer
Automating Lead Generation with Python Agents and n8n Workflows
Most lead generation tools fall into two categories: expensive SaaS platforms with rigid workflows, or manual processes that eat 3 hours of your day. LeadsNeoForge sits in between -- a set of autonomous Python agents orchestrated by n8n, powered by Groq's free LLM tier, and monitored through a Next.js dashboard. It runs on a single VPS, generates 40-60 qualified leads per day, and costs almost nothing to operate. Here's the complete technical breakdown.
Architecture Overview
The system has five independent Python agents, each running as a daemon process. An n8n instance orchestrates multi-step workflows that coordinate these agents. A Next.js dashboard reads from flat files (CSV and JSON) to display metrics. The entire stack avoids databases intentionally -- at this volume, files are simpler, more portable, and easier to debug.
# System Architecture
┌─────────────────────────────────────────────────┐
│ n8n Workflows │
│ (scheduling, coordination, error handling) │
└──────┬──────┬──────┬──────┬──────┬──────────────┘
│ │ │ │ │
┌────▼──┐ ┌─▼───┐ ┌▼────┐ ┌▼───┐ ┌▼────────┐
│Reddit │ │Reddit│ │Cont.│ │Lin-│ │API Lead │
│Monitor│ │Poster│ │Sched│ │kedIn│ │Generator│
└───┬───┘ └──┬──┘ └──┬──┘ └─┬──┘ └────┬────┘
│ │ │ │ │
└────────┴───────┴──────┴──────────┘
│
┌────────▼────────┐
│ File Storage │
│ (JSON/CSV/MD) │
└────────┬────────┘
│
┌────────▼────────┐
│ Next.js Dash │
│ (read-only) │
└─────────────────┘
Python Agent Architecture
Each agent follows the same structural pattern: a main loop that runs on a configurable interval, a config loader that reads YAML, a Groq API client for content generation, and file-based output. Agents run as daemon processes managed by simple shell scripts.
#!/bin/bash
# start-agent.sh - Daemon launcher for any agent
AGENT_NAME=$1
LOG_DIR="./logs"
PID_DIR="./pids"
mkdir -p "$LOG_DIR" "$PID_DIR"
if [ -f "$PID_DIR/$AGENT_NAME.pid" ]; then
OLD_PID=$(cat "$PID_DIR/$AGENT_NAME.pid")
if kill -0 "$OLD_PID" 2>/dev/null; then
echo "$AGENT_NAME is already running (PID: $OLD_PID)"
exit 1
fi
fi
nohup python3 -u "agents/${AGENT_NAME}/main.py" \
>> "$LOG_DIR/$AGENT_NAME.log" 2>&1 &
echo $! > "$PID_DIR/$AGENT_NAME.pid"
echo "$AGENT_NAME started (PID: $!)"
The base agent class handles the lifecycle, error recovery, and metrics reporting:
# agents/base_agent.py
import time
import yaml
import csv
import os
import traceback
from datetime import datetime, date
from abc import ABC, abstractmethod
from pathlib import Path
class BaseAgent(ABC):
def __init__(self, agent_name: str):
self.agent_name = agent_name
self.config = self._load_config()
self.metrics_file = Path(f"data/metrics/{agent_name}-metrics.csv")
self.running = True
self._ensure_metrics_file()
def _load_config(self) -> dict:
config_path = Path(f"config/{self.agent_name}.yaml")
with open(config_path) as f:
return yaml.safe_load(f)
def _ensure_metrics_file(self):
if not self.metrics_file.exists():
self.metrics_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.metrics_file, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow([
"date", "runs", "successes", "failures",
"items_processed", "leads_generated"
])
def log_metric(self, successes: int, failures: int,
items: int, leads: int):
today = date.today().isoformat()
rows = []
# Read existing rows, update today's if it exists
if self.metrics_file.exists():
with open(self.metrics_file) as f:
reader = csv.DictReader(f)
for row in reader:
if row["date"] == today:
row["runs"] = str(int(row["runs"]) + 1)
row["successes"] = str(
int(row["successes"]) + successes
)
row["failures"] = str(
int(row["failures"]) + failures
)
row["items_processed"] = str(
int(row["items_processed"]) + items
)
row["leads_generated"] = str(
int(row["leads_generated"]) + leads
)
rows.append(row)
if not any(r["date"] == today for r in rows):
rows.append({
"date": today, "runs": "1",
"successes": str(successes),
"failures": str(failures),
"items_processed": str(items),
"leads_generated": str(leads),
})
with open(self.metrics_file, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=[
"date", "runs", "successes", "failures",
"items_processed", "leads_generated"
])
writer.writeheader()
writer.writerows(rows)
@abstractmethod
def execute(self) -> dict:
"""Run one cycle of the agent. Return metrics dict."""
pass
def run(self):
interval = self.config.get("interval_seconds", 3600)
print(f"[{self.agent_name}] Starting daemon "
f"(interval: {interval}s)")
while self.running:
try:
result = self.execute()
self.log_metric(
successes=result.get("successes", 0),
failures=result.get("failures", 0),
items=result.get("items_processed", 0),
leads=result.get("leads_generated", 0),
)
print(f"[{self.agent_name}] Cycle complete: "
f"{result}")
except Exception as e:
print(f"[{self.agent_name}] Error: {e}")
traceback.print_exc()
self.log_metric(0, 1, 0, 0)
time.sleep(interval)
Reddit Monitor Agent
The Reddit monitor watches specific subreddits for posts matching keyword patterns. When it finds a match, it generates a contextually relevant comment using Groq and queues it for posting (with delay to avoid detection). Reddit's API is accessed through PRAW (Python Reddit API Wrapper).
# agents/reddit_monitor/main.py
import praw
import json
import time
from pathlib import Path
from base_agent import BaseAgent
from groq_client import generate_response
class RedditMonitor(BaseAgent):
def __init__(self):
super().__init__("reddit_monitor")
self.reddit = praw.Reddit(
client_id=self.config["reddit"]["client_id"],
client_secret=self.config["reddit"]["client_secret"],
user_agent=self.config["reddit"]["user_agent"],
username=self.config["reddit"]["username"],
password=self.config["reddit"]["password"],
)
self.seen_file = Path("data/reddit/seen_posts.json")
self.queue_file = Path("data/reddit/comment_queue.json")
self.seen_ids = self._load_seen()
def _load_seen(self) -> set:
if self.seen_file.exists():
with open(self.seen_file) as f:
return set(json.load(f))
return set()
def _save_seen(self):
# Keep only last 5000 IDs to prevent unbounded growth
recent = list(self.seen_ids)[-5000:]
self.seen_ids = set(recent)
with open(self.seen_file, "w") as f:
json.dump(recent, f)
def execute(self) -> dict:
subreddits = self.config["subreddits"]
keywords = self.config["keywords"]
leads = 0
processed = 0
for sub_name in subreddits:
subreddit = self.reddit.subreddit(sub_name)
for post in subreddit.new(limit=25):
if post.id in self.seen_ids:
continue
self.seen_ids.add(post.id)
processed += 1
title_lower = post.title.lower()
body_lower = (post.selftext or "").lower()
# Check if post matches any keyword pattern
matched = any(
kw.lower() in title_lower
or kw.lower() in body_lower
for kw in keywords
)
if matched:
comment = self._generate_comment(
post.title, post.selftext, sub_name
)
if comment:
self._queue_comment(
post.id, post.permalink, comment
)
leads += 1
self._save_seen()
return {
"successes": leads,
"failures": 0,
"items_processed": processed,
"leads_generated": leads,
}
def _generate_comment(self, title: str, body: str,
subreddit: str) -> str | None:
prompt = f"""You are a helpful community member on
r/{subreddit}. Someone posted:
Title: {title}
Body: {body[:1000]}
Write a genuinely helpful reply (2-3 paragraphs) that:
1. Directly addresses their question or problem
2. Shares a relevant personal experience or insight
3. Naturally mentions our tool ONLY if genuinely relevant
4. Does NOT sound like an advertisement
5. Uses casual Reddit tone (no corporate speak)
If our tool is not relevant to this post, just write a
helpful reply without mentioning it. Being helpful builds
reputation. Return ONLY the comment text."""
return generate_response(prompt)
def _queue_comment(self, post_id: str, permalink: str,
comment: str):
queue = []
if self.queue_file.exists():
with open(self.queue_file) as f:
queue = json.load(f)
queue.append({
"post_id": post_id,
"permalink": permalink,
"comment": comment,
"queued_at": time.time(),
"status": "pending",
})
with open(self.queue_file, "w") as f:
json.dump(queue, f, indent=2)
if __name__ == "__main__":
RedditMonitor().run()
Groq LLM Integration
Groq offers the llama-3.3-70b-versatile model on a free tier with rate limits of 30 requests per minute and 6,000 tokens per minute. For lead generation content where you're generating short replies and posts, this is more than sufficient. The key is handling rate limits gracefully:
# groq_client.py
import os
import time
from groq import Groq
client = Groq(api_key=os.environ.get("GROQ_API_KEY"))
# Simple rate limiter
_last_request_time = 0
_min_interval = 2.5 # seconds between requests (safe margin)
def generate_response(
prompt: str,
model: str = "llama-3.3-70b-versatile",
max_tokens: int = 1024,
temperature: float = 0.7,
retries: int = 3,
) -> str | None:
global _last_request_time
for attempt in range(retries):
# Enforce minimum interval between requests
elapsed = time.time() - _last_request_time
if elapsed < _min_interval:
time.sleep(_min_interval - elapsed)
try:
_last_request_time = time.time()
response = client.chat.completions.create(
model=model,
messages=[
{
"role": "system",
"content": (
"You are a knowledgeable professional. "
"Write naturally. Never use corporate "
"jargon or marketing language. Be "
"genuinely helpful."
),
},
{"role": "user", "content": prompt},
],
max_tokens=max_tokens,
temperature=temperature,
)
return response.choices[0].message.content
except Exception as e:
error_str = str(e)
if "rate_limit" in error_str.lower():
wait = (attempt + 1) * 30
print(f"Rate limited. Waiting {wait}s...")
time.sleep(wait)
elif "503" in error_str or "unavailable" in error_str:
print(f"Groq unavailable. Retry {attempt + 1}...")
time.sleep(10)
else:
print(f"Groq error: {e}")
return None
return None
The free tier limitation of 30 RPM sounds restrictive, but in practice each agent cycle makes 10-25 API calls spaced over several minutes. With the 2.5-second minimum interval, you stay well within limits. If you need burst capacity, Groq's paid tier at $0.59/M input tokens is still dramatically cheaper than OpenAI.
The Automation Config Structure
Each agent reads from a YAML config that controls its behavior without code changes. This lets you tune keywords, posting schedules, and target platforms through configuration alone:
# config/reddit_monitor.yaml
reddit:
client_id: "${REDDIT_CLIENT_ID}"
client_secret: "${REDDIT_CLIENT_SECRET}"
user_agent: "LeadsBot/1.0"
username: "${REDDIT_USERNAME}"
password: "${REDDIT_PASSWORD}"
subreddits:
- SaaS
- startups
- EntrepreneurRideAlong
- smallbusiness
- Entrepreneur
keywords:
- "lead generation"
- "finding customers"
- "cold outreach"
- "email automation"
- "prospecting tool"
# Run every 45 minutes
interval_seconds: 2700
# Max comments per cycle (rate limiting)
max_actions_per_cycle: 5
# Minimum delay before posting queued comments (seconds)
post_delay_min: 300
post_delay_max: 900
n8n Workflow Orchestration
n8n serves as the conductor. While each agent can run independently, n8n handles cross-agent workflows, scheduled triggers, and error notifications. A key workflow is the daily content pipeline:
// n8n workflow: Daily Content Pipeline
// (exported JSON structure, simplified)
{
"name": "Daily Lead Gen Pipeline",
"nodes": [
{
"type": "n8n-nodes-base.scheduleTrigger",
"parameters": {
"rule": {
"interval": [{ "field": "hours", "hoursInterval": 6 }]
}
},
"name": "Every 6 Hours"
},
{
"type": "n8n-nodes-base.httpRequest",
"parameters": {
"url": "http://localhost:5000/api/agents/status",
"method": "GET"
},
"name": "Check Agent Health"
},
{
"type": "n8n-nodes-base.if",
"parameters": {
"conditions": {
"boolean": [{
"value1": "={{ $json.reddit_monitor.running }}",
"value2": true
}]
}
},
"name": "Is Reddit Monitor Alive?"
},
{
"type": "n8n-nodes-base.executeCommand",
"parameters": {
"command": "cd /app && python3 scripts/process_queue.py"
},
"name": "Process Comment Queue"
},
{
"type": "n8n-nodes-base.executeCommand",
"parameters": {
"command": "cd /app && python3 scripts/aggregate_metrics.py"
},
"name": "Aggregate Daily Metrics"
},
{
"type": "n8n-nodes-base.telegram",
"parameters": {
"chatId": "${TELEGRAM_CHAT_ID}",
"text": "={{ 'Daily Report:\\n' + $json.summary }}"
},
"name": "Send Daily Report"
}
]
}
n8n's visual workflow builder makes it easy to add conditional logic -- if the Reddit agent is down, restart it and alert via Telegram. If the daily lead count drops below a threshold, trigger an extra content generation cycle. These operational concerns live in n8n rather than being hardcoded into the Python agents.
Making AI-Generated Content Sound Authentic
This is the hardest technical problem in the entire system. LLMs default to a recognizable pattern: formal tone, bullet points, hedging language ("It's worth noting that..."), and a tendency to answer every question comprehensively. Reddit users and LinkedIn connections can smell this instantly.
The prompt engineering that actually works involves several strategies. First, provide examples of real high-performing comments from the target subreddit as few-shot examples. Second, explicitly ban common LLM phrases in the system prompt. Third, set temperature to 0.7-0.8 to introduce natural variation. Fourth, add post-processing that randomly introduces minor imperfections:
# content_humanizer.py
import random
import re
# Phrases that instantly mark content as AI-generated
AI_TELLS = [
r"it's worth noting",
r"it's important to note",
r"in conclusion",
r"here are some",
r"there are several",
r"that being said",
r"at the end of the day",
r"I'd be happy to",
r"absolutely[!.]",
r"great question",
r"dive into",
r"delve into",
r"landscape",
r"leverage",
r"streamline",
]
def humanize_content(text: str) -> str:
# Remove known AI-tell phrases
for pattern in AI_TELLS:
text = re.sub(
pattern, "", text, flags=re.IGNORECASE
)
# Clean up double spaces from removals
text = re.sub(r" +", " ", text)
text = re.sub(r"\n\n\n+", "\n\n", text)
# Randomly lowercase the first letter of some sentences
# (people on Reddit don't always capitalize)
sentences = text.split(". ")
for i in range(len(sentences)):
if random.random() < 0.15 and len(sentences[i]) > 0:
sentences[i] = (
sentences[i][0].lower() + sentences[i][1:]
)
text = ". ".join(sentences)
# Occasionally use contractions more aggressively
replacements = [
("do not", "don't"), ("cannot", "can't"),
("will not", "won't"), ("would not", "wouldn't"),
("I have", "I've"), ("I would", "I'd"),
]
for formal, casual in replacements:
if random.random() < 0.8:
text = text.replace(formal, casual)
return text.strip()
The most effective technique, though, is the simplest: instruct the LLM to be genuinely helpful first. Content that actually solves someone's problem doesn't need to "sound authentic" -- it is authentic in the way that matters. The promotional mention, if included at all, should be a parenthetical afterthought, not the point of the comment.
Metrics Tracking with Flat Files
The daily metrics aggregate into a simple CSV that the Next.js dashboard reads:
# data/metrics/daily-metrics.csv
date,total_leads,reddit_leads,linkedin_leads,api_leads,posts_made,comments_made,content_generated
2026-03-01,42,18,12,12,4,14,6
2026-03-02,38,15,10,13,3,12,5
2026-03-03,51,22,14,15,5,17,7
The Next.js dashboard reads this file through a server action:
// app/actions/metrics.ts
"use server"
import { readFileSync } from "fs";
import { parse } from "csv-parse/sync";
import path from "path";
interface DailyMetric {
date: string;
total_leads: number;
reddit_leads: number;
linkedin_leads: number;
api_leads: number;
posts_made: number;
comments_made: number;
content_generated: number;
}
export async function getMetrics(
days: number = 30
): Promise<DailyMetric[]> {
const csvPath = path.join(
process.cwd(), "data", "metrics", "daily-metrics.csv"
);
try {
const content = readFileSync(csvPath, "utf-8");
const records = parse(content, {
columns: true,
cast: (value, context) => {
if (context.header) return value;
if (context.column === "date") return value;
return parseInt(value, 10);
},
});
return records.slice(-days);
} catch {
return [];
}
}
Why File-Based Storage Works at This Scale
When I first described this system to other developers, the immediate reaction was "why not PostgreSQL?" or "use Redis for the queues." Here's why flat files are the right choice at this scale:
- Volume: We process 50-100 items per day. That's 3,000 rows per month. A CSV handles this with zero infrastructure.
- Debugging: When something goes wrong, I open the JSON file in VS Code. No SQL client, no connection strings, no "is the database running?" troubleshooting.
- Portability: Moving the system to a new VPS means
rsync-ing a directory. No database dumps, no migration scripts. - Backups: The entire data directory is 12MB after months of operation. It backs up to S3 in under a second.
- No dependencies: No database server to keep running, no connection pools to manage, no ORM to configure.
The breakpoint where files stop working is around 100,000+ records with frequent random reads, or when you need transactional guarantees across multiple files. We're nowhere near that. Premature infrastructure is as real a problem as premature optimization.
Ethical Boundaries and Rate Limiting
Automation is a tool. The ethical boundary is clear: generate content that is genuinely helpful to the community, never misrepresent the source (don't pretend to be multiple people), respect platform rate limits, and always provide real value before any promotional mention. Our configuration enforces these boundaries technically:
- Max 5 comments per agent cycle with 5-15 minute random delays between posts
- Content quality gate: generated comments shorter than 50 characters or containing banned promotional phrases are discarded
- Subreddit rules check: the agent reads subreddit rules and includes them in the generation prompt
- Kill switch: a single config flag stops all posting across all agents immediately
- Daily cap: maximum 15 comments and 3 posts across all platforms per day
The Reddit poster agent has an additional safeguard: it checks the account's karma and recent post history. If karma drops (indicating community disapproval) or if posts are getting removed, it automatically reduces posting frequency and alerts via Telegram. The goal is to be a net-positive community member, not to extract value.
Key Takeaways
Building LeadsNeoForge taught me that the most effective automation systems are boring architecturally. Daemon processes with sleep loops, CSV files, YAML configs. No Kubernetes, no message queues, no microservices. The sophistication lives in the content generation prompts and the rate limiting logic, not in the infrastructure. Groq's free tier makes the economics viable for bootstrapped products. And the single most important design decision was making every agent independently restartable -- when one crashes at 3am, the others keep running, and n8n restarts the failed one on the next health check cycle.