Back to Home
Creative Agents

Building a Social Media Agent That Creates, Schedules, and Analyzes Content

Mei-Lin Zhang

ML researcher focused on autonomous agents and multi-agent systems.

February 12, 202618 min read

You've probably seen a dozen "AI social media manager" SaaS products launch this year. Most are thin wrappers around OpenAI's API with a calendar widget bolted on. If you want something that actually ...

Building an AI Agent for Social Media Management: A Practical Guide

You've probably seen a dozen "AI social media manager" SaaS products launch this year. Most are thin wrappers around OpenAI's API with a calendar widget bolted on. If you want something that actually fits your workflow — or you want to build a product that's more than vaporware — you need to understand the moving parts.

This tutorial walks through building a real social media management agent. Not a toy demo. We'll cover content generation, image creation, scheduling, engagement analysis, and multi-platform orchestration. I'll use Python throughout, and I'll be honest about where things get messy.

Architecture Overview

Before writing a single line of code, let's map the system. A social media agent has five core responsibilities:

┌─────────────────────────────────────────────────┐
│                  Agent Core (Orchestrator)       │
├──────────┬──────────┬───────────┬───────────────┤
│ Content  │  Image   │ Scheduler │  Engagement   │
│ Engine   │  Engine  │  System   │  Analyzer     │
├──────────┴──────────┴───────────┴───────────────┤
│          Platform Abstraction Layer              │
├────────┬────────┬─────────┬─────────┬───────────┤
│Twitter │LinkedIn│Instagram│ Facebook│  TikTok   │
│  /X    │        │         │         │           │
└────────┴────────┴─────────┴─────────┴───────────┘

The Platform Abstraction Layer is the most important piece. Every platform has different APIs, content formats, rate limits, and authentication schemes. If you don't abstract this properly, your agent becomes a tangled mess of platform-specific conditionals.

We'll use these key libraries:

Purpose Library Why
LLM orchestration LangChain Chain composition, tool calling, memory
Content generation OpenAI GPT-4o / Claude 3.5 Best instruction-following for copy
Image generation DALL-E 3 / Replicate (SDXL) API access, quality
Scheduling APScheduler + platform APIs Reliable cron-like scheduling
Analytics Platform APIs + pandas Metric collection and analysis
Async operations asyncio + httpx Non-blocking API calls

Part 1: The Platform Abstraction Layer

This is where most tutorials skip ahead. Don't. Getting this right saves you weeks of refactoring.

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
import httpx


class Platform(Enum):
    TWITTER = "twitter"
    LINKEDIN = "linkedin"
    INSTAGRAM = "instagram"
    FACEBOOK = "facebook"


@dataclass
class PostContent:
    """Normalized content representation across platforms."""
    text: str
    image_urls: list[str] = field(default_factory=list)
    video_url: Optional[str] = None
    link: Optional[str] = None
    hashtags: list[str] = field(default_factory=list)
    mentions: list[str] = field(default_factory=list)
    
    def for_platform(self, platform: Platform) -> "PostContent":
        """Return platform-adapted content."""
        limits = {
            Platform.TWITTER: 280,
            Platform.LINKEDIN: 3000,
            Platform.INSTAGRAM: 2200,
            Platform.FACEBOOK: 63206,
        }
        limit = limits.get(platform, 2000)
        
        adapted_text = self.text
        if len(adapted_text) > limit:
            adapted_text = adapted_text[:limit - 3] + "..."
        
        # Instagram doesn't support clickable links in posts
        if platform == Platform.INSTAGRAM and self.link:
            adapted_text += f"\n\n🔗 Link in bio"
        
        return PostContent(
            text=adapted_text,
            image_urls=self.image_urls[:self._max_images(platform)],
            video_url=self.video_url,
            link=self.link if platform != Platform.INSTAGRAM else None,
            hashtags=self.hashtags,
            mentions=self.mentions,
        )
    
    def _max_images(self, platform: Platform) -> int:
        return {
            Platform.TWITTER: 4,
            Platform.LINKEDIN: 9,
            Platform.INSTAGRAM: 10,
            Platform.FACEBOOK: 10,
        }.get(platform, 4)


@dataclass
class PostResult:
    platform: Platform
    post_id: str
    url: Optional[str] = None
    published_at: Optional[datetime] = None
    error: Optional[str] = None


@dataclass
class EngagementMetrics:
    platform: Platform
    post_id: str
    impressions: int = 0
    likes: int = 0
    comments: int = 0
    shares: int = 0
    clicks: int = 0
    engagement_rate: float = 0.0
    fetched_at: datetime = field(default_factory=datetime.utcnow)


class PlatformAdapter(ABC):
    """Base class for platform-specific implementations."""
    
    def __init__(self, credentials: dict):
        self.credentials = credentials
        self.client = httpx.AsyncClient(timeout=30.0)
    
    @abstractmethod
    async def publish(self, content: PostContent) -> PostResult:
        pass
    
    @abstractmethod
    async def get_metrics(self, post_id: str) -> EngagementMetrics:
        pass
    
    @abstractmethod
    async def get_recent_engagement(self, limit: int = 20) -> list[EngagementMetrics]:
        pass
    
    @abstractmethod
    async def reply(self, post_id: str, text: str) -> PostResult:
        pass

Now let's implement a concrete adapter. Twitter/X is the most commonly requested, so we'll start there:

import hashlib
import hmac
import urllib.parse
from base64 import b64encode


class TwitterAdapter(PlatformAdapter):
    """Twitter/X API v2 adapter using OAuth 1.0a."""
    
    BASE_URL = "https://api.twitter.com/2"
    
    async def publish(self, content: PostContent) -> PostResult:
        adapted = content.for_platform(Platform.TWITTER)
        
        # Upload media first if present
        media_ids = []
        for url in adapted.image_urls:
            media_id = await self._upload_media(url)
            if media_id:
                media_ids.append(media_id)
        
        payload = {"text": self._build_tweet_text(adapted)}
        if media_ids:
            payload["media"] = {"media_ids": media_ids}
        
        response = await self._authenticated_request(
            "POST", "/tweets", json=payload
        )
        
        if response.status_code == 201:
            data = response.json()["data"]
            return PostResult(
                platform=Platform.TWITTER,
                post_id=data["id"],
                url=f"https://x.com/i/status/{data['id']}",
                published_at=datetime.utcnow(),
            )
        else:
            return PostResult(
                platform=Platform.TWITTER,
                post_id="",
                error=f"HTTP {response.status_code}: {response.text}",
            )
    
    async def get_metrics(self, post_id: str) -> EngagementMetrics:
        response = await self._authenticated_request(
            "GET",
            f"/tweets/{post_id}",
            params={
                "tweet.fields": "public_metrics,created_at",
            },
        )
        
        if response.status_code == 200:
            metrics = response.json()["data"]["public_metrics"]
            impressions = metrics.get("impression_count", 0)
            total_engagement = (
                metrics["like_count"]
                + metrics["reply_count"]
                + metrics["retweet_count"]
            )
            
            return EngagementMetrics(
                platform=Platform.TWITTER,
                post_id=post_id,
                impressions=impressions,
                likes=metrics["like_count"],
                comments=metrics["reply_count"],
                shares=metrics["retweet_count"],
                engagement_rate=(
                    total_engagement / impressions if impressions > 0 else 0.0
                ),
            )
        
        raise ValueError(f"Failed to fetch metrics: {response.text}")
    
    def _build_tweet_text(self, content: PostContent) -> str:
        parts = [content.text]
        if content.hashtags:
            tags = " ".join(f"#{tag.lstrip('#')}" for tag in content.hashtags)
            parts.append(tags)
        return "\n\n".join(parts)
    
    async def _upload_media(self, image_url: str) -> Optional[str]:
        # Twitter media upload requires OAuth 1.0a
        # Implementation depends on whether you're uploading bytes or a URL
        # This is a simplified version
        async with self.client.stream("GET", image_url) as resp:
            image_bytes = b""
            async for chunk in resp.aiter_bytes():
                image_bytes += chunk
        
        upload_url = "https://upload.twitter.com/1.1/media/upload.json"
        # ... OAuth signing and multipart upload
        # Returns media_id_string on success
        return None  # Placeholder
    
    async def _authenticated_request(self, method, path, **kwargs):
        # In production, use a library like 'requests-oauthlib' or 
        # 'tweepy' for proper OAuth 1.0a signing
        url = f"{self.BASE_URL}{path}"
        headers = self._get_oauth_headers(method, url, kwargs.get("params"))
        return await self.client.request(
            method, url, headers=headers, **kwargs
        )

Honest caveat: Twitter's API situation is a mess right now. The free tier allows write-only operations. The Basic tier ($100/month) gives you limited read access. The Pro tier ($5,000/month) gives you reasonable rate limits. If you're building for a business, budget for at least Basic. For LinkedIn and Instagram, you'll need Meta/Facebook App Review, which takes weeks and has unpredictable approval criteria.

Part 2: Content Generation Engine

The content engine is the brain. It needs to understand brand voice, platform conventions, and content strategy. Here's where LangChain earns its keep.

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field


class ContentPlan(BaseModel):
    """Structured output for content planning."""
    topic: str = Field(description="The core topic or angle")
    hook: str = Field(description="Opening line that grabs attention")
    body: str = Field(description="Main content body")
    cta: str = Field(description="Call to action")
    hashtags: list[str] = Field(description="Relevant hashtags")
    tone_notes: str = Field(description="Notes about tone adjustments")


class ContentEngine:
    def __init__(self, brand_config: dict):
        self.llm = ChatOpenAI(
            model="gpt-4o",
            temperature=0.7,  # Some creativity, not too wild
        )
        self.brand = brand_config
        self.parser = PydanticOutputParser(pydantic_object=ContentPlan)
        
        self.system_prompt = self._build_system_prompt()
    
    def _build_system_prompt(self) -> str:
        return f"""You are a social media content strategist for {self.brand['name']}.

BRAND VOICE:
- Tone: {self.brand.get('tone', 'professional but approachable')}
- Industry: {self.brand.get('industry', 'technology')}
- Audience: {self.brand.get('audience', 'developers and tech professionals')}
- Values: {', '.join(self.brand.get('values', ['innovation', 'transparency']))}
- Words to avoid: {', '.join(self.brand.get('banned_words', ['synergy', 'leverage', 'disrupt']))}

CONTENT RULES:
1. Never use clickbait or misleading claims
2. Every post must provide genuine value (insight, entertainment, or utility)
3. Hashtags: use 2-5 for Twitter, 5-10 for Instagram, 3-5 for LinkedIn
4. LinkedIn posts should open with a bold statement or question
5. Twitter posts should be scannable — use line breaks strategically
6. Include a clear CTA when appropriate (not every post needs one)
7. Reference current trends only when genuinely relevant

{self.parser.get_format_instructions()}"""
    
    async def generate_post(
        self,
        topic: str,
        platform: Platform,
        content_type: str = "educational",  # educational, promotional, engagement, news
        additional_context: str = "",
    ) -> ContentPlan:
        
        prompt = ChatPromptTemplate.from_messages([
            ("system", self.system_prompt),
            ("human", """Create a {platform} post about: {topic}

Content type: {content_type}
Additional context: {context}

Make it specific and substantive. Avoid generic statements. 
If citing statistics or claims, note if they need verification."""),
        ])
        
        chain = prompt | self.llm | self.parser
        
        result = await chain.ainvoke({
            "platform": platform.value,
            "topic": topic,
            "content_type": content_type,
            "context": additional_context or "None provided",
        })
        
        return result
    
    async def generate_thread(
        self,
        topic: str,
        num_tweets: int = 5,
    ) -> list[ContentPlan]:
        """Generate a Twitter/X thread."""
        
        prompt = ChatPromptTemplate.from_messages([
            ("system", self.system_prompt),
            ("human", """Create a Twitter thread with {num_tweets} tweets about: {topic}

Return a JSON array of {num_tweets} content plans. Each tweet should:
- Stand alone but connect to the narrative
- End with a hook that makes people want to read the next tweet
- The final tweet should have a strong CTA

Thread structure:
1. Hook tweet (bold claim or surprising fact)
2-3. Supporting points with evidence
4. Practical takeaway or framework
5. CTA + summary

Return as a JSON array of ContentPlan objects."""),
        ])
        
        # For arrays, we need a slightly different approach
        from langchain_core.output_parsers import JsonOutputParser
        
        chain = prompt | self.llm | JsonOutputParser()
        
        result = await chain.ainvoke({
            "num_tweets": num_tweets,
            "topic": topic,
        })
        
        return [ContentPlan(**item) for item in result]
    
    async def adapt_for_platform(
        self,
        content: ContentPlan,
        target_platform: Platform,
    ) -> ContentPlan:
        """Reformat existing content for a different platform."""
        
        prompt = ChatPromptTemplate.from_messages([
            ("system", self.system_prompt),
            ("human", """Adapt this content for {platform}:

Original post:
{original}

Maintain the core message but adjust:
- Length and formatting for the platform
- Hashtag strategy
- Tone (LinkedIn is more formal, Twitter is punchier)
- CTA style"""),
        ])
        
        chain = prompt | self.llm | self.parser
        
        return await chain.ainvoke({
            "platform": target_platform.value,
            "original": content.model_dump_json(),
        })

Here's how you'd use it:

brand_config = {
    "name": "DriftSeas",
    "tone": "authoritative but accessible, like a senior engineer explaining to a peer",
    "industry": "AI/ML and developer tools",
    "audience": "senior developers, ML engineers, CTOs",
    "values": ["technical depth", "practical utility", "honest assessment"],
    "banned_words": ["game-changing", "revolutionary", "cutting-edge", "synergy"],
}

engine = ContentEngine(brand_config)

# Generate a post
plan = await engine.generate_post(
    topic="Why most RAG implementations fail in production",
    platform=Platform.LINKEDIN,
    content_type="educational",
    additional_context="Based on our experience deploying 50+ RAG systems. "
                       "Common failures: bad chunking, no re-ranking, ignoring latency.",
)

What actually matters here: The system prompt is doing most of the work. Spend time crafting your brand voice rules. The difference between generic AI content and good AI content is entirely in the constraints you define. I've found that including specific "words to avoid" and explicit structural rules produces dramatically better output than vague instructions like "write engaging content."

Part 3: Image Generation Integration

Images are non-negotiable for Instagram and significantly boost engagement on other platforms. We'll support both DALL-E 3 and Stable Diffusion via Replicate, because DALL-E is better for text-heavy graphics while SDXL excels at photorealistic and artistic content.

import openai
import replicate
import base64
from pathlib import Path


class ImageEngine:
    def __init__(self, config: dict):
        self.openai_client = openai.AsyncOpenAI(
            api_key=config["openai_api_key"]
        )
        self.output_dir = Path(config.get("output_dir", "./generated_images"))
        self.output_dir.mkdir(exist_ok=True)
        self.brand_style = config.get("brand_style", {})
    
    async def generate(
        self,
        prompt: str,
        platform: Platform,
        style: str = "professional",  # professional, artistic, minimal, infographic
        engine: str = "dall-e",  # dall-e, sdxl
    ) -> str:
        """Generate an image and return the local file path."""
        
        enhanced_prompt = self._enhance_prompt(prompt, platform, style)
        
        if engine == "dall-e":
            return await self._generate_dalle(enhanced_prompt, platform)
        elif engine == "sdxl":
            return await self._generate_sdxl(enhanced_prompt, platform)
        else:
            raise ValueError(f"Unknown engine: {engine}")
    
    def _enhance_prompt(
        self, prompt: str, platform: Platform, style: str
    ) -> str:
        """Add style and branding context to the prompt."""
        
        style_map = {
            "professional": (
                "Clean, modern, professional design. Muted color palette with "
                "one accent color. Plenty of white space. Sans-serif typography."
            ),
            "artistic": (
                "Creative, eye-catching, slightly abstract. Bold colors and "
                "dynamic composition. Suitable for social media scroll-stopping."
            ),
            "minimal": (
                "Extremely minimal design. Single focal point. Lots of negative "
                "space. Monochromatic or two-tone color scheme."
            ),
        }
        
        size_context = {
            Platform.TWITTER: "landscape 16:9 aspect ratio",
            Platform.LINKEDIN: "landscape 1.91:1 aspect ratio",
            Platform.INSTAGRAM: "square 1:1 aspect ratio",
            Platform.FACEBOOK: "landscape 1.91:1 aspect ratio",
        }
        
        brand_colors = self.brand_style.get("colors", "blue and white")
        
        return (
            f"{prompt}. Style: {style_map.get(style, style_map['professional'])} "
            f"Brand colors: {brand_colors}. "
            f"Format: {size_context[platform]}. "
            f"No text in the image unless specifically requested. "
            f"High quality, suitable for professional social media."
        )
    
    async def _generate_dalle(
        self, prompt: str, platform: Platform
    ) -> str:
        size_map = {
            Platform.TWITTER: "1792x1024",
            Platform.LINKEDIN: "1792x1024",
            Platform.INSTAGRAM: "1024x1024",
            Platform.FACEBOOK: "1792x1024",
        }
        
        response = await self.openai_client.images.generate(
            model="dall-e-3",
            prompt=prompt,
            size=size_map[platform],
            quality="standard",  # "hd" for important posts
            n=1,
        )
        
        image_url = response.data[0].url
        
        # Download and save locally
        async with httpx.AsyncClient() as client:
            img_response = await client.get(image_url)
            
        filename = f"{platform.value}_{datetime.now():%Y%m%d_%H%M%S}.png"
        filepath = self.output_dir / filename
        filepath.write_bytes(img_response.content)
        
        return str(filepath)
    
    async def _generate_sdxl(
        self, prompt: str, platform: Platform
    ) -> str:
        """Generate using SDXL via Replicate API."""
        
        aspect_ratios = {
            Platform.TWITTER: (1344, 768),
            Platform.LINKEDIN: (1344, 768),
            Platform.INSTAGRAM: (1024, 1024),
            Platform.FACEBOOK: (1344, 768),
        }
        
        width, height = aspect_ratios[platform]
        
        output = await replicate.async_run(
            "stability-ai/sdxl:latest",
            input={
                "prompt": prompt,
                "negative_prompt": (
                    "blurry, low quality, text, watermark, "
                    "oversaturated, cartoon, childish"
                ),
                "width": width,
                "height": height,
                "num_inference_steps": 30,
                "guidance_scale": 7.5,
            },
        )
        
        # output is a URL from Replicate
        async with httpx.AsyncClient() as client:
            img_response = await client.get(output)
        
        filename = f"sdxl_{platform.value}_{datetime.now():%Y%m%d_%H%M%S}.png"
        filepath = self.output_dir / filename
        filepath.write_bytes(img_response.content)
        
        return str(filepath)
    
    async def generate_for_post(
        self, content_plan: ContentPlan, platform: Platform
    ) -> str:
        """Generate a contextually appropriate image for a post."""
        
        # Use the LLM to create an image prompt from the content
        llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.8)
        
        image_prompt = await llm.ainvoke(
            f"""Create a DALL-E image prompt for a social media post.

Post topic: {content_plan.topic}
Post hook: {content_plan.hook}
Platform: {platform.value}

Rules:
- Describe a scene or concept, NOT text or typography
- Be specific about composition, lighting, and mood
- Make it scroll-stopping but professional
- Avoid clichés (no lightbulbs, no handshakes, no globe with connections)

Return ONLY the image prompt, nothing else."""
        )
        
        return await self.generate(
            prompt=image_prompt.content,
            platform=platform,
            style="professional",
        )

Reality check on image generation: DALL-E 3 is remarkably good at following instructions, but it still struggles with text in images. If you need text overlays (quotes, statistics, etc.), generate a clean background image and composite the text programmatically using Pillow or a service like Bannerbear. SDXL via Replicate costs about $0.004 per image versus DALL-E 3's $0.04-0.08, so use SDXL for drafts and experiments.

Part 4: Scheduling System

Scheduling sounds simple until you deal with time zones, rate limits, retry logic, and the fact that Instagram's API requires a Facebook Page connection.

import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from dataclasses import dataclass
from enum import Enum
import json
import aiosqlite


class ScheduleStatus(Enum):
    PENDING = "pending"
    PUBLISHED = "published"
    FAILED = "failed"
    CANCELLED = "cancelled"


@dataclass
class ScheduledPost:
    id: str
    content: PostContent
    platforms: list[Platform]
    scheduled_time: datetime
    status: ScheduleStatus = ScheduleStatus.PENDING
    results: list[PostResult] = None
    retry_count: int = 0
    max_retries: int = 3
    
    def __post_init__(self):
        if self.results is None:
            self.results = []


class SchedulingEngine:
    def __init__(
        self,
        adapters: dict[Platform, PlatformAdapter],
        db_path: str = "social_scheduler.db",
    ):
        self.adapters = adapters
        self.scheduler = AsyncIOScheduler()
        self.db_path = db_path
        self._initialized = False
    
    async def initialize(self):
        """Set up database and start scheduler."""
        async with aiosqlite.connect(self.db_path) as db:
            await db.execute("""
                CREATE TABLE IF NOT EXISTS scheduled_posts (
                    id TEXT PRIMARY KEY,
                    content_json TEXT NOT NULL,
                    platforms_json TEXT NOT NULL,
                    scheduled_time TEXT NOT NULL,
                    status TEXT DEFAULT 'pending',
                    results_json TEXT DEFAULT '[]',
                    retry_count INTEGER DEFAULT 0,
                    max_retries INTEGER DEFAULT 3,
                    created_at TEXT DEFAULT CURRENT_TIMESTAMP
                )
            """)
            await db.commit()
        
        self.scheduler.start()
        self._initialized = True
        
        # Reload pending posts from database
        await self._reload_pending_posts()
    
    async def schedule_post(
        self,
        content: PostContent,
        platforms: list[Platform],
        scheduled_time: datetime,
        post_id: str = None,
    ) -> ScheduledPost:
        """Schedule a post for future publication."""
        
        if not self._initialized:
            await self.initialize()
        
        post_id = post_id or f"post_{datetime.now().strftime('%Y%m%d%H%M%S%f')}"
        
        scheduled = ScheduledPost(
            id=post_id,
            content=content,
            platforms=platforms,
            scheduled_time=scheduled_time,
        )
        
        # Persist to database
        async with aiosqlite.connect(self.db_path) as db:
            await db.execute(
                """INSERT INTO scheduled_posts 
                   (id, content_json, platforms_json, scheduled_time, status)
                   VALUES (?, ?, ?, ?, ?)""",
                (
                    post_id,
                    json.dumps(content.__dict__, default=str),
                    json.dumps([p.value for p in platforms]),
                    scheduled_time.isoformat(),
                    ScheduleStatus.PENDING.value,
                ),
            )
            await db.commit()
        
        # Add to scheduler
        self.scheduler.add_job(
            self._execute_post,
            trigger=DateTrigger(run_date=scheduled_time),
            id=post_id,
            args=[post_id],
            replace_existing=True,
            misfire_grace_time=300,  # 5-minute grace period
        )
        
        return scheduled
    
    async def schedule_recurring(
        self,
        content_generator,  # async callable that returns PostContent
        platforms: list[Platform],
        cron_expression: str,
        job_id: str,
    ):
        """Schedule recurring content using a cron expression."""
        
        trigger = CronTrigger.from_crontab(cron_expression)
        
        self.scheduler.add_job(
            self._execute_generated_post,
            trigger=trigger,
            id=job_id,
            args=[content_generator, platforms],
            replace_existing=True,
        )
    
    async def _execute_post(self, post_id: str):
        """Publish a scheduled post to all target platforms."""
        
        async with aiosqlite.connect(self.db_path) as db:
            cursor = await db.execute(
                "SELECT * FROM scheduled_posts WHERE id = ?", (post_id,)
            )
            row = await cursor.fetchone()
        
        if not row:
            return
        
        content_data = json.loads(row[1])
        platforms = [Platform(p) for p in json.loads(row[2])]
        content = PostContent(**content_data)
        
        results = []
        
        # Publish to each platform with staggered timing
        for i, platform in enumerate(platforms):
            if platform not in self.adapters:
                results.append(PostResult(
                    platform=platform,
                    post_id="",
                    error=f"No adapter configured for {platform.value}",
                ))
                continue
            
            # Stagger posts by 30 seconds to avoid rate limits
            if i > 0:
                await asyncio.sleep(30)
            
            try:
                adapted_content = content.for_platform(platform)
                result = await self.adapters[platform].publish(adapted_content)
                results.append(result)
            except Exception as e:
                results.append(PostResult(
                    platform=platform,
                    post_id="",
                    error=str(e),
                ))
        
        # Check if we need to retry
        all_failed = all(r.error for r in results)
        retry_count = row[6]
        max_retries = row[7]
        
        new_status = ScheduleStatus.PUBLISHED
        if all_failed and retry_count < max_retries:
            new_status = ScheduleStatus.PENDING
            retry_count += 1
            # Retry in 5 minutes
            self.scheduler.add_job(
                self._execute_post,
                trigger=DateTrigger(
                    run_date=datetime.now() + timedelta(minutes=5)
                ),
                id=f"{post_id}_retry_{retry_count}",
                args=[post_id],
            )
        elif all_failed:
            new_status = ScheduleStatus.FAILED
        
        # Update database
        async with aiosqlite.connect(self.db_path) as db:
            await db.execute(
                """UPDATE scheduled_posts 
                   SET status = ?, results_json = ?, retry_count = ?
                   WHERE id = ?""",
                (
                    new_status.value,
                    json.dumps(
                        [{"platform": r.platform.value, "post_id": r.post_id, 
                          "error": r.error} for r in results],
                    ),
                    retry_count,
                    post_id,
                ),
            )
            await db.commit()
    
    async def get_analytics_summary(
        self, days: int = 7
    ) -> dict:
        """Get a summary of scheduled and published posts."""
        async with aiosqlite.connect(self.db_path) as db:
            cursor = await db.execute(
                """SELECT status, COUNT(*) FROM scheduled_posts 
                   WHERE created_at > datetime('now', ?)
                   GROUP BY status""",
                (f"-{days} days",),
            )
            rows = await cursor.fetchall()
        
        return {row[0]: row[1] for row in rows}

The scheduling gotcha nobody mentions: Platform APIs have rate limits that aren't just "X requests per minute." Twitter, for example, has a 300 tweets per 3-hour rolling window for posting. If you're managing multiple accounts or doing high-volume posting, you need a token bucket or leaky bucket rate limiter per platform. Don't rely on asyncio.sleep(30) alone.

Part 5: Engagement Analysis

Raw metrics are useless without context. The analysis engine needs to track trends, identify what works, and feed insights back into content generation.

import pandas as pd
from collections import defaultdict


class EngagementAnalyzer:
    def __init__(
        self,
        adapters: dict[Platform, PlatformAdapter],
        db_path: str = "social_scheduler.db",
    ):
        self.adapters = adapters
        self.db_path = db_path
    
    async def collect_metrics(self, days_back: int = 7) -> pd.DataFrame:
        """Collect engagement metrics from all platforms."""
        
        all_metrics = []
        
        for platform, adapter in self.adapters.items():
            try:
                metrics = await adapter.get_recent_engagement(limit=50)
                for m in metrics:
                    all_metrics.append({
                        "platform": m.platform.value,
                        "post_id": m.post_id,
                        "impressions": m.impressions,
                        "likes": m.likes,
                        "comments": m.comments,
                        "shares": m.shares,
                        "clicks": m.clicks,
                        "engagement_rate": m.engagement_rate,
                        "fetched_at": m.fetched_at,
                    })
            except Exception as e:
                print(f"Failed to collect metrics for {platform.value}: {e}")
        
        df = pd.DataFrame(all_metrics)
        if not df.empty:
            df["fetched_at"] = pd.to_datetime(df["fetched_at"])
        
        return df
    
    def analyze_performance(
        self, df: pd.DataFrame
    ) -> dict:
        """Generate performance insights from collected metrics."""
        
        if df.empty:
            return {"error": "No data to analyze"}
        
        insights = {
            "summary": {},
            "by_platform": {},
            "top_posts": [],
            "recommendations": [],
        }
        
        # Overall summary
        insights["summary"] = {
            "total_posts": len(df),
            "total_impressions": int(df["impressions"].sum()),
            "avg_engagement_rate": float(df["engagement_rate"].mean()),
            "total_engagement": int(
                df["likes"].sum() + df["comments"].sum() + df["shares"].sum()
            ),
        }
        
        # Per-platform breakdown
        for platform in df["platform"].unique():
            plat_df = df[df["platform"] == platform]
            insights["by_platform"][platform] = {
                "posts": len(plat_df),
                "avg_impressions": float(plat_df["impressions"].mean()),
                "avg_engagement_rate": float(plat_df["engagement_rate"].mean()),
                "best_post_id": plat_df.loc[
                    plat_df["engagement_rate"].idxmax(), "post_id"
                ],
            }
        
        # Top performing posts
        top_5 = df.nlargest(5, "engagement_rate")
        insights["top_posts"] = top_5.to_dict("records")
        
        # Generate recommendations
        insights["recommendations"] = self._generate_recommendations(df)
        
        return insights
    
    def _generate_recommendations(self, df: pd.DataFrame) -> list[str]:
        """Generate actionable recommendations based on data."""
        recommendations = []
        
        # Platform performance comparison
        platform_avg = df.groupby("platform")["engagement_rate"].mean()
        if len(platform_avg) > 1:
            best_platform = platform_avg.idxmax()
            worst_platform = platform_avg.idxmin()
            ratio = platform_avg[best_platform] / max(platform_avg[worst_platform], 0.001)
            
            if ratio > 2:
                recommendations.append(
                    f"{best_platform} is outperforming {worst_platform} by "
                    f"{ratio:.1f}x in engagement rate. Consider reallocating "
                    f"content effort or adapting your {worst_platform} strategy."
                )
        
        # Engagement rate assessment
        avg_er = df["engagement_rate"].mean()
        if avg_er < 0.02:
            recommendations.append(
                "Average engagement rate is below 2%. Consider: "
                "stronger hooks, more questions/polls, posting at different times, "
                "or reducing promotional content ratio."
            )
        elif avg_er > 0.06:
            recommendations.append(
                "Engagement rate is strong (>6%). Focus on scaling what works "
                "and experimenting with new content formats while maintaining quality."
            )
        
        # Comment-to-like ratio (indicates conversation-starting content)
        total_likes = df["likes"].sum()
        total_comments = df["comments"].sum()
        if total_likes > 0:
            comment_ratio = total_comments / total_likes
            if comment_ratio < 0.05:
                recommendations.append(
                    "Low comment-to-like ratio suggests content is being consumed "
                    "passively. Add more questions, controversial takes, or "
                    "fill-in-the-blank style posts to drive comments."
                )
            elif comment_ratio > 0.2:
                recommendations.append(
                    "High comment ratio indicates strong conversation. "
                    "Double down on the content types driving discussion."
                )
        
        return recommendations


class SentimentAnalyzer:
    """Lightweight sentiment analysis on comments and replies."""
    
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    
    async def analyze_comments(
        self, comments: list[str]
    ) -> dict:
        """Analyze sentiment and extract themes from comments."""
        
        if not comments:
            return {"sentiment": "neutral", "themes": [], "sample_size": 0}
        
        # Batch analyze to save API calls
        batch_size = 50
        all_results = []
        
        for i in range(0, len(comments), batch_size):
            batch = comments[i:i + batch_size]
            
            response = await self.llm.ainvoke(
                f"""Analyze these social media comments. Return JSON

Keywords

AI agentcreative-agents