Building a Social Media Agent That Creates, Schedules, and Analyzes Content
Mei-Lin Zhang
ML researcher focused on autonomous agents and multi-agent systems.
You've probably seen a dozen "AI social media manager" SaaS products launch this year. Most are thin wrappers around OpenAI's API with a calendar widget bolted on. If you want something that actually ...
Building an AI Agent for Social Media Management: A Practical Guide
You've probably seen a dozen "AI social media manager" SaaS products launch this year. Most are thin wrappers around OpenAI's API with a calendar widget bolted on. If you want something that actually fits your workflow — or you want to build a product that's more than vaporware — you need to understand the moving parts.
This tutorial walks through building a real social media management agent. Not a toy demo. We'll cover content generation, image creation, scheduling, engagement analysis, and multi-platform orchestration. I'll use Python throughout, and I'll be honest about where things get messy.
Architecture Overview
Before writing a single line of code, let's map the system. A social media agent has five core responsibilities:
┌─────────────────────────────────────────────────┐
│ Agent Core (Orchestrator) │
├──────────┬──────────┬───────────┬───────────────┤
│ Content │ Image │ Scheduler │ Engagement │
│ Engine │ Engine │ System │ Analyzer │
├──────────┴──────────┴───────────┴───────────────┤
│ Platform Abstraction Layer │
├────────┬────────┬─────────┬─────────┬───────────┤
│Twitter │LinkedIn│Instagram│ Facebook│ TikTok │
│ /X │ │ │ │ │
└────────┴────────┴─────────┴─────────┴───────────┘
The Platform Abstraction Layer is the most important piece. Every platform has different APIs, content formats, rate limits, and authentication schemes. If you don't abstract this properly, your agent becomes a tangled mess of platform-specific conditionals.
We'll use these key libraries:
| Purpose | Library | Why |
|---|---|---|
| LLM orchestration | LangChain | Chain composition, tool calling, memory |
| Content generation | OpenAI GPT-4o / Claude 3.5 | Best instruction-following for copy |
| Image generation | DALL-E 3 / Replicate (SDXL) | API access, quality |
| Scheduling | APScheduler + platform APIs | Reliable cron-like scheduling |
| Analytics | Platform APIs + pandas | Metric collection and analysis |
| Async operations | asyncio + httpx | Non-blocking API calls |
Part 1: The Platform Abstraction Layer
This is where most tutorials skip ahead. Don't. Getting this right saves you weeks of refactoring.
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
import httpx
class Platform(Enum):
TWITTER = "twitter"
LINKEDIN = "linkedin"
INSTAGRAM = "instagram"
FACEBOOK = "facebook"
@dataclass
class PostContent:
"""Normalized content representation across platforms."""
text: str
image_urls: list[str] = field(default_factory=list)
video_url: Optional[str] = None
link: Optional[str] = None
hashtags: list[str] = field(default_factory=list)
mentions: list[str] = field(default_factory=list)
def for_platform(self, platform: Platform) -> "PostContent":
"""Return platform-adapted content."""
limits = {
Platform.TWITTER: 280,
Platform.LINKEDIN: 3000,
Platform.INSTAGRAM: 2200,
Platform.FACEBOOK: 63206,
}
limit = limits.get(platform, 2000)
adapted_text = self.text
if len(adapted_text) > limit:
adapted_text = adapted_text[:limit - 3] + "..."
# Instagram doesn't support clickable links in posts
if platform == Platform.INSTAGRAM and self.link:
adapted_text += f"\n\n🔗 Link in bio"
return PostContent(
text=adapted_text,
image_urls=self.image_urls[:self._max_images(platform)],
video_url=self.video_url,
link=self.link if platform != Platform.INSTAGRAM else None,
hashtags=self.hashtags,
mentions=self.mentions,
)
def _max_images(self, platform: Platform) -> int:
return {
Platform.TWITTER: 4,
Platform.LINKEDIN: 9,
Platform.INSTAGRAM: 10,
Platform.FACEBOOK: 10,
}.get(platform, 4)
@dataclass
class PostResult:
platform: Platform
post_id: str
url: Optional[str] = None
published_at: Optional[datetime] = None
error: Optional[str] = None
@dataclass
class EngagementMetrics:
platform: Platform
post_id: str
impressions: int = 0
likes: int = 0
comments: int = 0
shares: int = 0
clicks: int = 0
engagement_rate: float = 0.0
fetched_at: datetime = field(default_factory=datetime.utcnow)
class PlatformAdapter(ABC):
"""Base class for platform-specific implementations."""
def __init__(self, credentials: dict):
self.credentials = credentials
self.client = httpx.AsyncClient(timeout=30.0)
@abstractmethod
async def publish(self, content: PostContent) -> PostResult:
pass
@abstractmethod
async def get_metrics(self, post_id: str) -> EngagementMetrics:
pass
@abstractmethod
async def get_recent_engagement(self, limit: int = 20) -> list[EngagementMetrics]:
pass
@abstractmethod
async def reply(self, post_id: str, text: str) -> PostResult:
pass
Now let's implement a concrete adapter. Twitter/X is the most commonly requested, so we'll start there:
import hashlib
import hmac
import urllib.parse
from base64 import b64encode
class TwitterAdapter(PlatformAdapter):
"""Twitter/X API v2 adapter using OAuth 1.0a."""
BASE_URL = "https://api.twitter.com/2"
async def publish(self, content: PostContent) -> PostResult:
adapted = content.for_platform(Platform.TWITTER)
# Upload media first if present
media_ids = []
for url in adapted.image_urls:
media_id = await self._upload_media(url)
if media_id:
media_ids.append(media_id)
payload = {"text": self._build_tweet_text(adapted)}
if media_ids:
payload["media"] = {"media_ids": media_ids}
response = await self._authenticated_request(
"POST", "/tweets", json=payload
)
if response.status_code == 201:
data = response.json()["data"]
return PostResult(
platform=Platform.TWITTER,
post_id=data["id"],
url=f"https://x.com/i/status/{data['id']}",
published_at=datetime.utcnow(),
)
else:
return PostResult(
platform=Platform.TWITTER,
post_id="",
error=f"HTTP {response.status_code}: {response.text}",
)
async def get_metrics(self, post_id: str) -> EngagementMetrics:
response = await self._authenticated_request(
"GET",
f"/tweets/{post_id}",
params={
"tweet.fields": "public_metrics,created_at",
},
)
if response.status_code == 200:
metrics = response.json()["data"]["public_metrics"]
impressions = metrics.get("impression_count", 0)
total_engagement = (
metrics["like_count"]
+ metrics["reply_count"]
+ metrics["retweet_count"]
)
return EngagementMetrics(
platform=Platform.TWITTER,
post_id=post_id,
impressions=impressions,
likes=metrics["like_count"],
comments=metrics["reply_count"],
shares=metrics["retweet_count"],
engagement_rate=(
total_engagement / impressions if impressions > 0 else 0.0
),
)
raise ValueError(f"Failed to fetch metrics: {response.text}")
def _build_tweet_text(self, content: PostContent) -> str:
parts = [content.text]
if content.hashtags:
tags = " ".join(f"#{tag.lstrip('#')}" for tag in content.hashtags)
parts.append(tags)
return "\n\n".join(parts)
async def _upload_media(self, image_url: str) -> Optional[str]:
# Twitter media upload requires OAuth 1.0a
# Implementation depends on whether you're uploading bytes or a URL
# This is a simplified version
async with self.client.stream("GET", image_url) as resp:
image_bytes = b""
async for chunk in resp.aiter_bytes():
image_bytes += chunk
upload_url = "https://upload.twitter.com/1.1/media/upload.json"
# ... OAuth signing and multipart upload
# Returns media_id_string on success
return None # Placeholder
async def _authenticated_request(self, method, path, **kwargs):
# In production, use a library like 'requests-oauthlib' or
# 'tweepy' for proper OAuth 1.0a signing
url = f"{self.BASE_URL}{path}"
headers = self._get_oauth_headers(method, url, kwargs.get("params"))
return await self.client.request(
method, url, headers=headers, **kwargs
)
Honest caveat: Twitter's API situation is a mess right now. The free tier allows write-only operations. The Basic tier ($100/month) gives you limited read access. The Pro tier ($5,000/month) gives you reasonable rate limits. If you're building for a business, budget for at least Basic. For LinkedIn and Instagram, you'll need Meta/Facebook App Review, which takes weeks and has unpredictable approval criteria.
Part 2: Content Generation Engine
The content engine is the brain. It needs to understand brand voice, platform conventions, and content strategy. Here's where LangChain earns its keep.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
class ContentPlan(BaseModel):
"""Structured output for content planning."""
topic: str = Field(description="The core topic or angle")
hook: str = Field(description="Opening line that grabs attention")
body: str = Field(description="Main content body")
cta: str = Field(description="Call to action")
hashtags: list[str] = Field(description="Relevant hashtags")
tone_notes: str = Field(description="Notes about tone adjustments")
class ContentEngine:
def __init__(self, brand_config: dict):
self.llm = ChatOpenAI(
model="gpt-4o",
temperature=0.7, # Some creativity, not too wild
)
self.brand = brand_config
self.parser = PydanticOutputParser(pydantic_object=ContentPlan)
self.system_prompt = self._build_system_prompt()
def _build_system_prompt(self) -> str:
return f"""You are a social media content strategist for {self.brand['name']}.
BRAND VOICE:
- Tone: {self.brand.get('tone', 'professional but approachable')}
- Industry: {self.brand.get('industry', 'technology')}
- Audience: {self.brand.get('audience', 'developers and tech professionals')}
- Values: {', '.join(self.brand.get('values', ['innovation', 'transparency']))}
- Words to avoid: {', '.join(self.brand.get('banned_words', ['synergy', 'leverage', 'disrupt']))}
CONTENT RULES:
1. Never use clickbait or misleading claims
2. Every post must provide genuine value (insight, entertainment, or utility)
3. Hashtags: use 2-5 for Twitter, 5-10 for Instagram, 3-5 for LinkedIn
4. LinkedIn posts should open with a bold statement or question
5. Twitter posts should be scannable — use line breaks strategically
6. Include a clear CTA when appropriate (not every post needs one)
7. Reference current trends only when genuinely relevant
{self.parser.get_format_instructions()}"""
async def generate_post(
self,
topic: str,
platform: Platform,
content_type: str = "educational", # educational, promotional, engagement, news
additional_context: str = "",
) -> ContentPlan:
prompt = ChatPromptTemplate.from_messages([
("system", self.system_prompt),
("human", """Create a {platform} post about: {topic}
Content type: {content_type}
Additional context: {context}
Make it specific and substantive. Avoid generic statements.
If citing statistics or claims, note if they need verification."""),
])
chain = prompt | self.llm | self.parser
result = await chain.ainvoke({
"platform": platform.value,
"topic": topic,
"content_type": content_type,
"context": additional_context or "None provided",
})
return result
async def generate_thread(
self,
topic: str,
num_tweets: int = 5,
) -> list[ContentPlan]:
"""Generate a Twitter/X thread."""
prompt = ChatPromptTemplate.from_messages([
("system", self.system_prompt),
("human", """Create a Twitter thread with {num_tweets} tweets about: {topic}
Return a JSON array of {num_tweets} content plans. Each tweet should:
- Stand alone but connect to the narrative
- End with a hook that makes people want to read the next tweet
- The final tweet should have a strong CTA
Thread structure:
1. Hook tweet (bold claim or surprising fact)
2-3. Supporting points with evidence
4. Practical takeaway or framework
5. CTA + summary
Return as a JSON array of ContentPlan objects."""),
])
# For arrays, we need a slightly different approach
from langchain_core.output_parsers import JsonOutputParser
chain = prompt | self.llm | JsonOutputParser()
result = await chain.ainvoke({
"num_tweets": num_tweets,
"topic": topic,
})
return [ContentPlan(**item) for item in result]
async def adapt_for_platform(
self,
content: ContentPlan,
target_platform: Platform,
) -> ContentPlan:
"""Reformat existing content for a different platform."""
prompt = ChatPromptTemplate.from_messages([
("system", self.system_prompt),
("human", """Adapt this content for {platform}:
Original post:
{original}
Maintain the core message but adjust:
- Length and formatting for the platform
- Hashtag strategy
- Tone (LinkedIn is more formal, Twitter is punchier)
- CTA style"""),
])
chain = prompt | self.llm | self.parser
return await chain.ainvoke({
"platform": target_platform.value,
"original": content.model_dump_json(),
})
Here's how you'd use it:
brand_config = {
"name": "DriftSeas",
"tone": "authoritative but accessible, like a senior engineer explaining to a peer",
"industry": "AI/ML and developer tools",
"audience": "senior developers, ML engineers, CTOs",
"values": ["technical depth", "practical utility", "honest assessment"],
"banned_words": ["game-changing", "revolutionary", "cutting-edge", "synergy"],
}
engine = ContentEngine(brand_config)
# Generate a post
plan = await engine.generate_post(
topic="Why most RAG implementations fail in production",
platform=Platform.LINKEDIN,
content_type="educational",
additional_context="Based on our experience deploying 50+ RAG systems. "
"Common failures: bad chunking, no re-ranking, ignoring latency.",
)
What actually matters here: The system prompt is doing most of the work. Spend time crafting your brand voice rules. The difference between generic AI content and good AI content is entirely in the constraints you define. I've found that including specific "words to avoid" and explicit structural rules produces dramatically better output than vague instructions like "write engaging content."
Part 3: Image Generation Integration
Images are non-negotiable for Instagram and significantly boost engagement on other platforms. We'll support both DALL-E 3 and Stable Diffusion via Replicate, because DALL-E is better for text-heavy graphics while SDXL excels at photorealistic and artistic content.
import openai
import replicate
import base64
from pathlib import Path
class ImageEngine:
def __init__(self, config: dict):
self.openai_client = openai.AsyncOpenAI(
api_key=config["openai_api_key"]
)
self.output_dir = Path(config.get("output_dir", "./generated_images"))
self.output_dir.mkdir(exist_ok=True)
self.brand_style = config.get("brand_style", {})
async def generate(
self,
prompt: str,
platform: Platform,
style: str = "professional", # professional, artistic, minimal, infographic
engine: str = "dall-e", # dall-e, sdxl
) -> str:
"""Generate an image and return the local file path."""
enhanced_prompt = self._enhance_prompt(prompt, platform, style)
if engine == "dall-e":
return await self._generate_dalle(enhanced_prompt, platform)
elif engine == "sdxl":
return await self._generate_sdxl(enhanced_prompt, platform)
else:
raise ValueError(f"Unknown engine: {engine}")
def _enhance_prompt(
self, prompt: str, platform: Platform, style: str
) -> str:
"""Add style and branding context to the prompt."""
style_map = {
"professional": (
"Clean, modern, professional design. Muted color palette with "
"one accent color. Plenty of white space. Sans-serif typography."
),
"artistic": (
"Creative, eye-catching, slightly abstract. Bold colors and "
"dynamic composition. Suitable for social media scroll-stopping."
),
"minimal": (
"Extremely minimal design. Single focal point. Lots of negative "
"space. Monochromatic or two-tone color scheme."
),
}
size_context = {
Platform.TWITTER: "landscape 16:9 aspect ratio",
Platform.LINKEDIN: "landscape 1.91:1 aspect ratio",
Platform.INSTAGRAM: "square 1:1 aspect ratio",
Platform.FACEBOOK: "landscape 1.91:1 aspect ratio",
}
brand_colors = self.brand_style.get("colors", "blue and white")
return (
f"{prompt}. Style: {style_map.get(style, style_map['professional'])} "
f"Brand colors: {brand_colors}. "
f"Format: {size_context[platform]}. "
f"No text in the image unless specifically requested. "
f"High quality, suitable for professional social media."
)
async def _generate_dalle(
self, prompt: str, platform: Platform
) -> str:
size_map = {
Platform.TWITTER: "1792x1024",
Platform.LINKEDIN: "1792x1024",
Platform.INSTAGRAM: "1024x1024",
Platform.FACEBOOK: "1792x1024",
}
response = await self.openai_client.images.generate(
model="dall-e-3",
prompt=prompt,
size=size_map[platform],
quality="standard", # "hd" for important posts
n=1,
)
image_url = response.data[0].url
# Download and save locally
async with httpx.AsyncClient() as client:
img_response = await client.get(image_url)
filename = f"{platform.value}_{datetime.now():%Y%m%d_%H%M%S}.png"
filepath = self.output_dir / filename
filepath.write_bytes(img_response.content)
return str(filepath)
async def _generate_sdxl(
self, prompt: str, platform: Platform
) -> str:
"""Generate using SDXL via Replicate API."""
aspect_ratios = {
Platform.TWITTER: (1344, 768),
Platform.LINKEDIN: (1344, 768),
Platform.INSTAGRAM: (1024, 1024),
Platform.FACEBOOK: (1344, 768),
}
width, height = aspect_ratios[platform]
output = await replicate.async_run(
"stability-ai/sdxl:latest",
input={
"prompt": prompt,
"negative_prompt": (
"blurry, low quality, text, watermark, "
"oversaturated, cartoon, childish"
),
"width": width,
"height": height,
"num_inference_steps": 30,
"guidance_scale": 7.5,
},
)
# output is a URL from Replicate
async with httpx.AsyncClient() as client:
img_response = await client.get(output)
filename = f"sdxl_{platform.value}_{datetime.now():%Y%m%d_%H%M%S}.png"
filepath = self.output_dir / filename
filepath.write_bytes(img_response.content)
return str(filepath)
async def generate_for_post(
self, content_plan: ContentPlan, platform: Platform
) -> str:
"""Generate a contextually appropriate image for a post."""
# Use the LLM to create an image prompt from the content
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.8)
image_prompt = await llm.ainvoke(
f"""Create a DALL-E image prompt for a social media post.
Post topic: {content_plan.topic}
Post hook: {content_plan.hook}
Platform: {platform.value}
Rules:
- Describe a scene or concept, NOT text or typography
- Be specific about composition, lighting, and mood
- Make it scroll-stopping but professional
- Avoid clichés (no lightbulbs, no handshakes, no globe with connections)
Return ONLY the image prompt, nothing else."""
)
return await self.generate(
prompt=image_prompt.content,
platform=platform,
style="professional",
)
Reality check on image generation: DALL-E 3 is remarkably good at following instructions, but it still struggles with text in images. If you need text overlays (quotes, statistics, etc.), generate a clean background image and composite the text programmatically using Pillow or a service like Bannerbear. SDXL via Replicate costs about $0.004 per image versus DALL-E 3's $0.04-0.08, so use SDXL for drafts and experiments.
Part 4: Scheduling System
Scheduling sounds simple until you deal with time zones, rate limits, retry logic, and the fact that Instagram's API requires a Facebook Page connection.
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from dataclasses import dataclass
from enum import Enum
import json
import aiosqlite
class ScheduleStatus(Enum):
PENDING = "pending"
PUBLISHED = "published"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class ScheduledPost:
id: str
content: PostContent
platforms: list[Platform]
scheduled_time: datetime
status: ScheduleStatus = ScheduleStatus.PENDING
results: list[PostResult] = None
retry_count: int = 0
max_retries: int = 3
def __post_init__(self):
if self.results is None:
self.results = []
class SchedulingEngine:
def __init__(
self,
adapters: dict[Platform, PlatformAdapter],
db_path: str = "social_scheduler.db",
):
self.adapters = adapters
self.scheduler = AsyncIOScheduler()
self.db_path = db_path
self._initialized = False
async def initialize(self):
"""Set up database and start scheduler."""
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS scheduled_posts (
id TEXT PRIMARY KEY,
content_json TEXT NOT NULL,
platforms_json TEXT NOT NULL,
scheduled_time TEXT NOT NULL,
status TEXT DEFAULT 'pending',
results_json TEXT DEFAULT '[]',
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
await db.commit()
self.scheduler.start()
self._initialized = True
# Reload pending posts from database
await self._reload_pending_posts()
async def schedule_post(
self,
content: PostContent,
platforms: list[Platform],
scheduled_time: datetime,
post_id: str = None,
) -> ScheduledPost:
"""Schedule a post for future publication."""
if not self._initialized:
await self.initialize()
post_id = post_id or f"post_{datetime.now().strftime('%Y%m%d%H%M%S%f')}"
scheduled = ScheduledPost(
id=post_id,
content=content,
platforms=platforms,
scheduled_time=scheduled_time,
)
# Persist to database
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""INSERT INTO scheduled_posts
(id, content_json, platforms_json, scheduled_time, status)
VALUES (?, ?, ?, ?, ?)""",
(
post_id,
json.dumps(content.__dict__, default=str),
json.dumps([p.value for p in platforms]),
scheduled_time.isoformat(),
ScheduleStatus.PENDING.value,
),
)
await db.commit()
# Add to scheduler
self.scheduler.add_job(
self._execute_post,
trigger=DateTrigger(run_date=scheduled_time),
id=post_id,
args=[post_id],
replace_existing=True,
misfire_grace_time=300, # 5-minute grace period
)
return scheduled
async def schedule_recurring(
self,
content_generator, # async callable that returns PostContent
platforms: list[Platform],
cron_expression: str,
job_id: str,
):
"""Schedule recurring content using a cron expression."""
trigger = CronTrigger.from_crontab(cron_expression)
self.scheduler.add_job(
self._execute_generated_post,
trigger=trigger,
id=job_id,
args=[content_generator, platforms],
replace_existing=True,
)
async def _execute_post(self, post_id: str):
"""Publish a scheduled post to all target platforms."""
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT * FROM scheduled_posts WHERE id = ?", (post_id,)
)
row = await cursor.fetchone()
if not row:
return
content_data = json.loads(row[1])
platforms = [Platform(p) for p in json.loads(row[2])]
content = PostContent(**content_data)
results = []
# Publish to each platform with staggered timing
for i, platform in enumerate(platforms):
if platform not in self.adapters:
results.append(PostResult(
platform=platform,
post_id="",
error=f"No adapter configured for {platform.value}",
))
continue
# Stagger posts by 30 seconds to avoid rate limits
if i > 0:
await asyncio.sleep(30)
try:
adapted_content = content.for_platform(platform)
result = await self.adapters[platform].publish(adapted_content)
results.append(result)
except Exception as e:
results.append(PostResult(
platform=platform,
post_id="",
error=str(e),
))
# Check if we need to retry
all_failed = all(r.error for r in results)
retry_count = row[6]
max_retries = row[7]
new_status = ScheduleStatus.PUBLISHED
if all_failed and retry_count < max_retries:
new_status = ScheduleStatus.PENDING
retry_count += 1
# Retry in 5 minutes
self.scheduler.add_job(
self._execute_post,
trigger=DateTrigger(
run_date=datetime.now() + timedelta(minutes=5)
),
id=f"{post_id}_retry_{retry_count}",
args=[post_id],
)
elif all_failed:
new_status = ScheduleStatus.FAILED
# Update database
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""UPDATE scheduled_posts
SET status = ?, results_json = ?, retry_count = ?
WHERE id = ?""",
(
new_status.value,
json.dumps(
[{"platform": r.platform.value, "post_id": r.post_id,
"error": r.error} for r in results],
),
retry_count,
post_id,
),
)
await db.commit()
async def get_analytics_summary(
self, days: int = 7
) -> dict:
"""Get a summary of scheduled and published posts."""
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"""SELECT status, COUNT(*) FROM scheduled_posts
WHERE created_at > datetime('now', ?)
GROUP BY status""",
(f"-{days} days",),
)
rows = await cursor.fetchall()
return {row[0]: row[1] for row in rows}
The scheduling gotcha nobody mentions: Platform APIs have rate limits that aren't just "X requests per minute." Twitter, for example, has a 300 tweets per 3-hour rolling window for posting. If you're managing multiple accounts or doing high-volume posting, you need a token bucket or leaky bucket rate limiter per platform. Don't rely on asyncio.sleep(30) alone.
Part 5: Engagement Analysis
Raw metrics are useless without context. The analysis engine needs to track trends, identify what works, and feed insights back into content generation.
import pandas as pd
from collections import defaultdict
class EngagementAnalyzer:
def __init__(
self,
adapters: dict[Platform, PlatformAdapter],
db_path: str = "social_scheduler.db",
):
self.adapters = adapters
self.db_path = db_path
async def collect_metrics(self, days_back: int = 7) -> pd.DataFrame:
"""Collect engagement metrics from all platforms."""
all_metrics = []
for platform, adapter in self.adapters.items():
try:
metrics = await adapter.get_recent_engagement(limit=50)
for m in metrics:
all_metrics.append({
"platform": m.platform.value,
"post_id": m.post_id,
"impressions": m.impressions,
"likes": m.likes,
"comments": m.comments,
"shares": m.shares,
"clicks": m.clicks,
"engagement_rate": m.engagement_rate,
"fetched_at": m.fetched_at,
})
except Exception as e:
print(f"Failed to collect metrics for {platform.value}: {e}")
df = pd.DataFrame(all_metrics)
if not df.empty:
df["fetched_at"] = pd.to_datetime(df["fetched_at"])
return df
def analyze_performance(
self, df: pd.DataFrame
) -> dict:
"""Generate performance insights from collected metrics."""
if df.empty:
return {"error": "No data to analyze"}
insights = {
"summary": {},
"by_platform": {},
"top_posts": [],
"recommendations": [],
}
# Overall summary
insights["summary"] = {
"total_posts": len(df),
"total_impressions": int(df["impressions"].sum()),
"avg_engagement_rate": float(df["engagement_rate"].mean()),
"total_engagement": int(
df["likes"].sum() + df["comments"].sum() + df["shares"].sum()
),
}
# Per-platform breakdown
for platform in df["platform"].unique():
plat_df = df[df["platform"] == platform]
insights["by_platform"][platform] = {
"posts": len(plat_df),
"avg_impressions": float(plat_df["impressions"].mean()),
"avg_engagement_rate": float(plat_df["engagement_rate"].mean()),
"best_post_id": plat_df.loc[
plat_df["engagement_rate"].idxmax(), "post_id"
],
}
# Top performing posts
top_5 = df.nlargest(5, "engagement_rate")
insights["top_posts"] = top_5.to_dict("records")
# Generate recommendations
insights["recommendations"] = self._generate_recommendations(df)
return insights
def _generate_recommendations(self, df: pd.DataFrame) -> list[str]:
"""Generate actionable recommendations based on data."""
recommendations = []
# Platform performance comparison
platform_avg = df.groupby("platform")["engagement_rate"].mean()
if len(platform_avg) > 1:
best_platform = platform_avg.idxmax()
worst_platform = platform_avg.idxmin()
ratio = platform_avg[best_platform] / max(platform_avg[worst_platform], 0.001)
if ratio > 2:
recommendations.append(
f"{best_platform} is outperforming {worst_platform} by "
f"{ratio:.1f}x in engagement rate. Consider reallocating "
f"content effort or adapting your {worst_platform} strategy."
)
# Engagement rate assessment
avg_er = df["engagement_rate"].mean()
if avg_er < 0.02:
recommendations.append(
"Average engagement rate is below 2%. Consider: "
"stronger hooks, more questions/polls, posting at different times, "
"or reducing promotional content ratio."
)
elif avg_er > 0.06:
recommendations.append(
"Engagement rate is strong (>6%). Focus on scaling what works "
"and experimenting with new content formats while maintaining quality."
)
# Comment-to-like ratio (indicates conversation-starting content)
total_likes = df["likes"].sum()
total_comments = df["comments"].sum()
if total_likes > 0:
comment_ratio = total_comments / total_likes
if comment_ratio < 0.05:
recommendations.append(
"Low comment-to-like ratio suggests content is being consumed "
"passively. Add more questions, controversial takes, or "
"fill-in-the-blank style posts to drive comments."
)
elif comment_ratio > 0.2:
recommendations.append(
"High comment ratio indicates strong conversation. "
"Double down on the content types driving discussion."
)
return recommendations
class SentimentAnalyzer:
"""Lightweight sentiment analysis on comments and replies."""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
async def analyze_comments(
self, comments: list[str]
) -> dict:
"""Analyze sentiment and extract themes from comments."""
if not comments:
return {"sentiment": "neutral", "themes": [], "sample_size": 0}
# Batch analyze to save API calls
batch_size = 50
all_results = []
for i in range(0, len(comments), batch_size):
batch = comments[i:i + batch_size]
response = await self.llm.ainvoke(
f"""Analyze these social media comments. Return JSON