How to Build an AI Email Assistant That Actually Works
Diego Herrera
Creative technologist writing about AI agents in design and content.
**Your inbox is a decision engine disguised as a list.** Every email demands a micro-judgment: respond now, respond later, delegate, archive, or ignore. An AI email agent doesn't eliminate those decis...
Building an AI Email Management Agent: A Practical Guide
Your inbox is a decision engine disguised as a list. Every email demands a micro-judgment: respond now, respond later, delegate, archive, or ignore. An AI email agent doesn't eliminate those decisions — it batches and accelerates them.
This tutorial walks through building a production-grade email management agent that triages incoming mail, generates contextual drafts, tracks follow-ups, and integrates with both Gmail and Microsoft Graph APIs. We'll use Python, LangChain for orchestration, and real OAuth2 flows — not toy examples with hardcoded data.
Architecture Overview
Before writing code, understand what we're building:
┌─────────────────────────────────────────────────────────┐
│ Email Agent System │
│ │
│ ┌──────────┐ ┌──────────┐ ┌───────────────────┐ │
│ │ Gmail │──▶│ │ │ Draft Generator │ │
│ │ Watcher │ │ Triage │──▶│ (LLM-powered) │ │
│ └──────────┘ │ Engine │ └───────────────────┘ │
│ ┌──────────┐ │ │ ┌───────────────────┐ │
│ │ Outlook │──▶│ │──▶│ Follow-up Tracker │ │
│ │ Poller │ └──────────┘ │ (SQLite + Cron) │ │
│ └──────────┘ └───────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Privacy & Storage Layer │ │
│ └──────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
The agent has four core modules. Each runs independently, communicates through a shared state database, and can be deployed as separate services or a single process.
Prerequisites
pip install langchain langchain-openai google-api-python-client \
google-auth-oauthlib msal sqlalchemy pydantic python-dateutil \
ollama # optional, for local LLM inference
You'll also need:
- A Google Cloud project with Gmail API enabled and OAuth2 credentials
- An Azure AD app registration with
Mail.ReadWriteandMail.Sendpermissions - An OpenAI API key (or a local Ollama instance running
llama3.1ormistral)
Part 1: Email Provider Integration
Gmail Integration
Gmail's API is powerful but verbose. We'll wrap it in a clean interface.
import base64
import email
from email.mime.text import MIMEText
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from google.auth.transport.requests import Request
import os
import pickle
SCOPES = [
'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/gmail.send',
'https://www.googleapis.com/auth/gmail.modify',
]
@dataclass
class EmailMessage:
id: str
thread_id: str
subject: str
sender: str
to: str
date: datetime
body: str
snippet: str
labels: list[str] = field(default_factory=list)
is_read: bool = True
provider: str = "gmail"
class GmailClient:
def __init__(self, credentials_path: str, token_path: str = "token.pickle"):
self.credentials_path = credentials_path
self.token_path = token_path
self.service = self._authenticate()
def _authenticate(self):
creds = None
if os.path.exists(self.token_path):
with open(self.token_path, 'rb') as token:
creds = pickle.load(token)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
self.credentials_path, SCOPES
)
creds = flow.run_local_server(port=0)
with open(self.token_path, 'wb') as token:
pickle.dump(creds, token)
return build('gmail', 'v1', credentials=creds)
def fetch_recent_emails(self, max_results: int = 20,
query: str = "is:unread") -> list[EmailMessage]:
results = self.service.users().messages().list(
userId='me', maxResults=max_results, q=query
).execute()
messages = results.get('messages', [])
emails = []
for msg in messages:
full_msg = self.service.users().messages().get(
userId='me', id=msg['id'], format='full'
).execute()
emails.append(self._parse_message(full_msg))
return emails
def _parse_message(self, raw_message: dict) -> EmailMessage:
headers = {h['name']: h['value']
for h in raw_message['payload']['headers']}
body = self._extract_body(raw_message['payload'])
date_str = headers.get('Date', '')
return EmailMessage(
id=raw_message['id'],
thread_id=raw_message['threadId'],
subject=headers.get('Subject', '(no subject)'),
sender=headers.get('From', ''),
to=headers.get('To', ''),
date=self._parse_date(date_str),
body=body,
snippet=raw_message.get('snippet', ''),
labels=raw_message.get('labelIds', []),
is_read='UNREAD' not in raw_message.get('labelIds', []),
provider="gmail"
)
def _extract_body(self, payload: dict) -> str:
"""Recursively extract text body from MIME parts."""
if payload.get('mimeType') == 'text/plain' and payload.get('body', {}).get('data'):
return base64.urlsafe_b64decode(
payload['body']['data']
).decode('utf-8', errors='replace')
if 'parts' in payload:
for part in payload['parts']:
result = self._extract_body(part)
if result:
return result
# Fallback: try text/html
if payload.get('mimeType') == 'text/html' and payload.get('body', {}).get('data'):
from html.parser import HTMLParser
class TextExtractor(HTMLParser):
def __init__(self):
super().__init__()
self.text = []
def handle_data(self, data):
self.text.append(data)
html = base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8', errors='replace')
extractor = TextExtractor()
extractor.feed(html)
return ' '.join(extractor.text)
return ""
def send_reply(self, thread_id: str, to: str, subject: str, body: str):
message = MIMEText(body)
message['to'] = to
message['subject'] = subject
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
self.service.users().messages().send(
userId='me',
body={'raw': raw, 'threadId': thread_id}
).execute()
def add_label(self, message_id: str, label_name: str):
# Get or create label
labels = self.service.users().labels().list(userId='me').execute()
label_id = None
for label in labels.get('labels', []):
if label['name'] == label_name:
label_id = label['id']
break
if not label_id:
label = self.service.users().labels().create(
userId='me',
body={'name': label_name, 'labelListVisibility': 'labelShow'}
).execute()
label_id = label['id']
self.service.users().messages().modify(
userId='me', id=message_id,
body={'addLabelIds': [label_id]}
).execute()
def mark_as_read(self, message_id: str):
self.service.users().messages().modify(
userId='me', id=message_id,
body={'removeLabelIds': ['UNREAD']}
).execute()
@staticmethod
def _parse_date(date_str: str) -> datetime:
from email.utils import parsedate_to_datetime
try:
return parsedate_to_datetime(date_str)
except (ValueError, TypeError):
return datetime.now()
Outlook/Microsoft Graph Integration
Microsoft Graph uses a different auth model (MSAL) and REST patterns:
import msal
import requests
from datetime import datetime, timedelta, timezone
class OutlookClient:
GRAPH_ENDPOINT = "https://graph.microsoft.com/v1.0"
def __init__(self, client_id: str, tenant_id: str,
client_secret: str, scopes: list[str] = None):
self.client_id = client_id
self.tenant_id = tenant_id
self.client_secret = client_secret
self.scopes = scopes or [
'Mail.Read', 'Mail.Send', 'Mail.ReadWrite'
]
self.access_token = self._acquire_token()
def _acquire_token(self) -> str:
app = msal.ConfidentialClientApplication(
self.client_id,
authority=f"https://login.microsoftonline.com/{self.tenant_id}",
client_credential=self.client_secret,
)
result = app.acquire_token_silent(self.scopes, account=None)
if not result:
result = app.acquire_token_for_client(scopes=self.scopes)
if "access_token" not in result:
raise RuntimeError(
f"Token acquisition failed: {result.get('error_description')}"
)
return result["access_token"]
def fetch_recent_emails(self, max_results: int = 20,
filter_unread: bool = True) -> list[EmailMessage]:
headers = {'Authorization': f'Bearer {self.access_token}'}
params = {
'$top': max_results,
'$orderby': 'receivedDateTime desc',
'$select': 'id,conversationId,subject,from,toRecipients,'
'receivedDateTime,body,bodyPreview,isRead',
}
if filter_unread:
params['$filter'] = 'isRead eq false'
response = requests.get(
f"{self.GRAPH_ENDPOINT}/me/messages",
headers=headers, params=params
)
response.raise_for_status()
return [self._parse_message(msg)
for msg in response.json().get('value', [])]
def _parse_message(self, msg: dict) -> EmailMessage:
sender = msg.get('from', {}).get('emailAddress', {})
to_list = msg.get('toRecipients', [])
to_str = ', '.join(
r.get('emailAddress', {}).get('address', '')
for r in to_list
)
body_content = msg.get('body', {}).get('content', '')
# Graph returns HTML by default; strip tags for plain text
if msg.get('body', {}).get('contentType') == 'html':
from html.parser import HTMLParser
class S(HTMLParser):
def __init__(self):
super().__init__()
self.t = []
def handle_data(self, d):
self.t.append(d)
s = S()
s.feed(body_content)
body_content = ' '.join(s.t)
return EmailMessage(
id=msg['id'],
thread_id=msg.get('conversationId', ''),
subject=msg.get('subject', '(no subject)'),
sender=sender.get('address', ''),
to=to_str,
date=datetime.fromisoformat(
msg['receivedDateTime'].replace('Z', '+00:00')
),
body=body_content,
snippet=msg.get('bodyPreview', ''),
is_read=msg.get('isRead', True),
provider="outlook"
)
def send_reply(self, message_id: str, body: str):
headers = {
'Authorization': f'Bearer {self.access_token}',
'Content-Type': 'application/json'
}
payload = {
"comment": body
}
response = requests.post(
f"{self.GRAPH_ENDPOINT}/me/messages/{message_id}/reply",
headers=headers, json=payload
)
response.raise_for_status()
def mark_as_read(self, message_id: str):
headers = {
'Authorization': f'Bearer {self.access_token}',
'Content-Type': 'application/json'
}
requests.patch(
f"{self.GRAPH_ENDPOINT}/me/messages/{message_id}",
headers=headers,
json={"isRead": True}
).raise_for_status()
Key difference: Gmail uses label-based organization; Outlook uses folders and categories. The EmailMessage dataclass normalizes this.
Part 2: The Triage Engine
Triage is where the agent earns its keep. We classify each email into categories and assign priority using an LLM, but with structured output to keep results parseable.
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser
from enum import Enum
class EmailCategory(str, Enum):
ACTION_REQUIRED = "action_required"
FYI = "fyi"
MEETING = "meeting"
NEWSLETTER = "newsletter"
PROMOTIONAL = "promotional"
SPAM = "spam"
FOLLOW_UP_NEEDED = "follow_up_needed"
PERSONAL = "personal"
class Priority(str, Enum):
URGENT = "urgent" # respond within hours
HIGH = "high" # respond today
MEDIUM = "medium" # respond this week
LOW = "low" # when convenient
NONE = "none" # no response needed
class TriageResult(BaseModel):
category: EmailCategory = Field(description="Email category")
priority: Priority = Field(description="Priority level")
reasoning: str = Field(description="Brief explanation of classification")
suggested_action: str = Field(description="What the user should do")
estimated_response_time: str = Field(
description="Estimated time to draft a response, e.g., '2 minutes' or '15 minutes'"
)
requires_context: bool = Field(
description="Whether responding requires information the agent doesn't have"
)
class TriageEngine:
def __init__(self, model_name: str = "gpt-4o-mini",
user_context: str = ""):
self.llm = ChatOpenAI(model=model_name, temperature=0)
self.parser = PydanticOutputParser(pydantic_object=TriageResult)
self.user_context = user_context
self.prompt = ChatPromptTemplate.from_messages([
("system", """You are an email triage assistant. Classify emails
accurately and conservatively. When in doubt, mark as higher priority.
User context: {user_context}
{format_instructions}
Rules:
- "action_required" means the sender explicitly needs something from the user
- "follow_up_needed" means the user previously asked something and this is a reply
- Mark emails from known contacts higher priority than unknowns
- Meeting invites should include the time sensitivity in suggested_action
- If an email looks like it requires domain expertise you lack, set requires_context=true"""),
("human", """Subject: {subject}
From: {sender}
To: {to}
Date: {date}
Body:
{body}""")
])
def triage(self, email_msg: EmailMessage) -> TriageResult:
chain = self.prompt | self.llm | self.parser
result = chain.invoke({
"subject": email_msg.subject,
"sender": email_msg.sender,
"to": email_msg.to,
"date": email_msg.date.isoformat(),
"body": email_msg.body[:3000], # truncate for token limits
"user_context": self.user_context,
"format_instructions": self.parser.get_format_instructions(),
})
return result
Batch Triage with Cost Control
Calling the LLM per email gets expensive. Here's a batching strategy:
import asyncio
from collections import defaultdict
class BatchTriageEngine(TriageEngine):
async def triage_batch(self, emails: list[EmailMessage],
batch_size: int = 5) -> dict[str, TriageResult]:
"""Process emails in batches to control rate limits and costs."""
results = {}
for i in range(0, len(emails), batch_size):
batch = emails[i:i + batch_size]
tasks = [self._triage_async(e) for e in batch]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
for email_msg, result in zip(batch, batch_results):
if isinstance(result, Exception):
# Default to medium priority on failure
results[email_msg.id] = TriageResult(
category=EmailCategory.FYI,
priority=Priority.MEDIUM,
reasoning=f"Triage failed: {result}",
suggested_action="Review manually",
estimated_response_time="unknown",
requires_context=True,
)
else:
results[email_msg.id] = result
# Rate limit courtesy pause
if i + batch_size < len(emails):
await asyncio.sleep(1)
return results
async def _triage_async(self, email_msg: EmailMessage) -> TriageResult:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.triage, email_msg)
Real-world cost note: At GPT-4o-mini pricing (~$0.15/1M input tokens), triaging 100 emails costs roughly $0.02-0.05. Using gpt-4o for the same batch runs $0.50-2.00. For most users, gpt-4o-mini is more than sufficient for classification. Reserve larger models for draft generation.
Part 3: Draft Generation
Draft generation is the most nuanced part. A bad auto-draft is worse than no draft at all. The key is giving the LLM enough context about the user's communication style and the email thread.
class DraftGenerator:
def __init__(self, model_name: str = "gpt-4o",
user_name: str = "",
user_role: str = "",
communication_style: str = "professional but friendly"):
self.llm = ChatOpenAI(model=model_name, temperature=0.3)
self.user_name = user_name
self.user_role = user_role
self.communication_style = communication_style
self.prompt = ChatPromptTemplate.from_messages([
("system", """You draft email replies. Match the user's communication style.
User profile:
- Name: {user_name}
- Role: {user_role}
- Style: {communication_style}
Rules:
- Keep replies concise. Most emails should get 2-5 sentence replies.
- Match the formality level of the sender.
- If the email requires information you don't have, insert [NEEDS INFO: describe what's needed] placeholders.
- Don't commit to deadlines, meetings, or promises unless explicitly instructed.
- If the email is a meeting invite, draft an acceptance or decline based on context.
- Never fabricate facts, dates, or commitments.
- Include a greeting and sign-off appropriate to the relationship."""),
("human", """Draft a reply to this email:
From: {sender}
Subject: {subject}
Date: {date}
Their email:
{body}
{additional_context}""")
])
def generate_draft(self, email_msg: EmailMessage,
triage: TriageResult,
thread_history: list[str] = None) -> str:
additional_parts = []
if triage.category == EmailCategory.MEETING:
additional_parts.append(
"This is a meeting invite. Draft a brief acceptance or "
"request to reschedule if timing seems tight."
)
elif triage.requires_context:
additional_parts.append(
"This email likely requires domain knowledge. Draft a "
"holding response acknowledging receipt and asking for "
"time to look into it."
)
elif triage.category == EmailCategory.FOLLOW_UP_NEEDED:
additional_parts.append(
"This is a follow-up to a previous conversation. "
"The sender is likely waiting for an update."
)
if thread_history:
additional_parts.append(
"Previous messages in this thread:\n" +
"\n---\n".join(thread_history[-3:]) # last 3 messages
)
chain = self.prompt | self.llm
result = chain.invoke({
"user_name": self.user_name,
"user_role": self.user_role,
"communication_style": self.communication_style,
"sender": email_msg.sender,
"subject": email_msg.subject,
"date": email_msg.date.isoformat(),
"body": email_msg.body[:4000],
"additional_context": "\n".join(additional_parts),
})
return result.content
Thread Context Retrieval
For replies, thread history matters enormously. Here's how to fetch it:
class ThreadManager:
def __init__(self, gmail_client: GmailClient = None,
outlook_client: OutlookClient = None):
self.gmail = gmail_client
self.outlook = outlook_client
def get_gmail_thread(self, thread_id: str) -> list[EmailMessage]:
if not self.gmail:
return []
thread = self.gmail.service.users().threads().get(
userId='me', id=thread_id
).execute()
return [
self.gmail._parse_message(msg)
for msg in thread.get('messages', [])
]
def get_outlook_thread(self, conversation_id: str) -> list[EmailMessage]:
if not self.outlook:
return []
headers = {'Authorization': f'Bearer {self.outlook.access_token}'}
response = requests.get(
f"{self.outlook.GRAPH_ENDPOINT}/me/messages",
headers=headers,
params={
'$filter': f"conversationId eq '{conversation_id}'",
'$orderby': 'receivedDateTime asc',
'$select': 'id,conversationId,subject,from,toRecipients,'
'receivedDateTime,body,bodyPreview,isRead',
}
)
response.raise_for_status()
return [
self.outlook._parse_message(msg)
for msg in response.json().get('value', [])
]
Part 4: Follow-Up Tracking
Follow-ups are where most email systems fail. The agent needs a persistent store to track what's pending, who owes whom, and when to remind.
from sqlalchemy import create_engine, Column, String, DateTime, Boolean, Text, Enum as SAEnum
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime, timedelta
Base = declarative_base()
class FollowUp(Base):
__tablename__ = "follow_ups"
id = Column(String, primary_key=True)
email_id = Column(String, nullable=False)
thread_id = Column(String)
provider = Column(String, default="gmail")
subject = Column(String)
contact_email = Column(String, nullable=False)
direction = Column(String) # "inbound" or "outbound"
status = Column(String, default="pending") # pending, reminded, resolved, snoozed
created_at = Column(DateTime, default=datetime.utcnow)
remind_at = Column(DateTime, nullable=False)
last_reminded_at = Column(DateTime, nullable=True)
notes = Column(Text, default="")
draft_sent = Column(Boolean, default=False)
class FollowUpTracker:
def __init__(self, db_path: str = "follow_ups.db"):
self.engine = create_engine(f"sqlite:///{db_path}")
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
def add_follow_up(self, email_msg: EmailMessage,
triage: TriageResult,
remind_in_days: int = 3) -> FollowUp:
session = self.Session()
try:
follow_up = FollowUp(
id=f"fu_{email_msg.id}_{int(datetime.utcnow().timestamp())}",
email_id=email_msg.id,
thread_id=email_msg.thread_id,
provider=email_msg.provider,
subject=email_msg.subject,
contact_email=email_msg.sender,
direction="inbound",
status="pending",
remind_at=datetime.utcnow() + timedelta(days=remind_in_days),
notes=f"Category: {triage.category.value}, "
f"Priority: {triage.priority.value}",
)
session.add(follow_up)
session.commit()
return follow_up
finally:
session.close()
def add_outbound_follow_up(self, email_msg: EmailMessage,
expected_response_days: int = 5) -> FollowUp:
"""Track emails we sent that we're waiting on a reply for."""
session = self.Session()
try:
follow_up = FollowUp(
id=f"fu_out_{email_msg.id}_{int(datetime.utcnow().timestamp())}",
email_id=email_msg.id,
thread_id=email_msg.thread_id,
provider=email_msg.provider,
subject=email_msg.subject,
contact_email=email_msg.to.split(',')[0].strip(),
direction="outbound",
status="pending",
remind_at=datetime.utcnow() + timedelta(days=expected_response_days),
)
session.add(follow_up)
session.commit()
return follow_up
finally:
session.close()
def resolve_if_replied(self, incoming_emails: list[EmailMessage]):
"""Check if any pending outbound follow-ups have been answered."""
session = self.Session()
try:
pending = session.query(FollowUp).filter(
FollowUp.direction == "outbound",
FollowUp.status == "pending"
).all()
for fu in pending:
for email_msg in incoming_emails:
if (fu.contact_email.lower() in email_msg.sender.lower() and
fu.thread_id == email_msg.thread_id):
fu.status = "resolved"
break
session.commit()
finally:
session.close()
def get_due_reminders(self) -> list[FollowUp]:
session = self.Session()
try:
return session.query(FollowUp).filter(
FollowUp.status.in_(["pending", "snoozed"]),
FollowUp.remind_at <= datetime.utcnow()
).all()
finally:
session.close()
def snooze(self, follow_up_id: str, days: int = 3):
session = self.Session()
try:
fu = session.query(FollowUp).get(follow_up_id)
if fu:
fu.status = "snoozed"
fu.remind_at = datetime.utcnow() + timedelta(days=days)
session.commit()
finally:
session.close()
def mark_reminded(self, follow_up_id: str):
session = self.Session()
try:
fu = session.query(FollowUp).get(follow_up_id)
if fu:
fu.last_reminded_at = datetime.utcnow()
fu.status = "reminded"
session.commit()
finally:
session.close()
Part 5: The Agent Loop
Now we tie everything together into the main agent:
import logging
from dataclasses import dataclass
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("email_agent")
@dataclass
class AgentConfig:
gmail_credentials: str = "credentials.json"
outlook_client_id: str = ""
outlook_tenant_id: str = ""
outlook_client_secret: str = ""
llm_model: str = "gpt-4o-mini"
draft_model: str = "gpt-4o"
user_name: str = ""
user_role: str = ""
user_context: str = ""
check_interval_minutes: int = 5
auto_send_threshold: Priority = None # None = never auto-send
db_path: str = "email_agent.db"
class EmailAgent:
def __init__(self, config: AgentConfig):
self.config = config
self.tracker = FollowUpTracker(config.db_path)
# Initialize providers
self.gmail = None
self.outlook = None
if config.gmail_credentials:
self.gmail = GmailClient(config.gmail_credentials)
if config.outlook_client_id:
self.outlook = OutlookClient(
config.outlook_client_id,
config.outlook_tenant_id,
config.outlook_client_secret,
)
self.thread_mgr = ThreadManager(self.gmail, self.outlook)
self.triage = BatchTriageEngine(
model_name=config.llm_model,
user_context=config.user_context,
)
self.drafter = DraftGenerator(
model_name=config.draft_model,
user_name=config.user_name,
user_role=config.user_role,
)
async def process_inbox(self) -> list[dict]:
"""Main processing loop. Returns a summary of actions taken."""
all_emails = []
results_summary = []
# Fetch from all providers
if self.gmail:
gmail_emails = self.gmail.fetch_recent_emails(max_results=20)
all_emails.extend(gmail_emails)
logger.info(f"Fetched {len(gmail_emails)} emails from Gmail")
if self.outlook:
outlook_emails = self.outlook.fetch_recent_emails(max_results=20)
all_emails.extend(outlook_emails)
logger.info(f"Fetched {len(outlook_emails)} emails from Outlook")
if not all_emails:
logger.info("No new emails to process")
return []
# Check if any pending follow-ups got resolved
self.tracker.resolve_if_replied(all_emails)
# Triage all emails
triage_results = await self.triage.triage_batch(all_emails)
for email_msg in all_emails:
triage = triage_results.get(email_msg.id)
if not triage:
continue
action = {
"email_id": email_msg.id,
"subject": email_msg.subject,
"sender": email_msg.sender,
"category": triage.category.value,
"priority": triage.priority.value,
"draft": None,
"follow_up_created": False,
"auto_sent": False,
}
# Generate draft for action-required emails
if triage.category in (
EmailCategory.ACTION_REQUIRED,
EmailCategory.FOLLOW_UP_NEEDED,
EmailCategory.MEETING,
):
thread_history = []
if email_msg.thread_id:
thread = self.thread_mgr.get_gmail_thread(email_msg.thread_id)
thread_history = [f"From: {m.sender}\n{m.body[:500]}"
for m in thread[:-1]]
draft = self.drafter.generate_draft(
email_msg, triage, thread_history
)
action["draft"] = draft
# Auto-send logic (use with extreme caution)
if (self.config.auto_send_threshold and
triage.priority == self.config.auto_send_threshold and
not triage.requires_context and
'[NEEDS INFO' not in draft):
self._send_reply(email_msg, draft)
action["auto_sent"] = True
# Create follow-up if needed
if triage.priority in (Priority.URGENT, Priority.HIGH):
remind_days = 1 if triage.priority == Priority.URGENT else 3
self.tracker.add_follow_up(email_msg, triage, remind_days)
action["follow_up_created"] = True
results_summary.append(action)
# Process due follow-up reminders
reminders = self.tracker.get_due_reminders()
for reminder in reminders:
logger.warning(
f"FOLLOW-UP DUE: {reminder.subject} | "
f"Contact: {reminder.contact_email} | "
f"Due: {reminder.remind_at}"
)
return results_summary
def _send_reply(self, email_msg: EmailMessage, body: str):
if email_msg.provider == "gmail" and self.gmail:
self.gmail.send_reply(email_msg.thread_id, email_msg.sender,
f"Re: {email_msg.subject}", body)
self.gmail.mark_as_read(email_msg.id)
elif email_msg.provider == "outlook" and self.outlook:
self.outlook.send_reply(email_msg.id, body)
self.outlook.mark_as_read(email_msg.id)
def run_daemon(self):
"""Run as a continuous service."""
import time
interval = self.config.check_interval_minutes * 60
logger.info(f"Email agent started. Checking every {self.config.check_interval_minutes} minutes.")
while True:
try:
import asyncio
results = asyncio.run(self.process_inbox())
if results:
logger.info(f"Processed {len(results)} emails")
for r in results:
logger.info(
f" [{r['priority'].upper()}] {r['subject']} "
f"-> {r['category']}"
f"{' (draft ready)' if r['draft'] else ''}"
)
except Exception as e:
logger.error(f"Processing failed: {e}", exc_info=True)
time.sleep(interval)
Running the Agent
if __name__ == "__main__":
config = AgentConfig(
gmail_credentials="credentials.json",
user_name="Alex Chen",
user_role="Engineering Manager at Acme Corp",
user_context="I manage a team of 8 engineers. Key stakeholders: "
"Sarah (VP Eng), Mike (PM). We're launching v2.1 on March 15.",
llm_model="gpt-4o-mini",
draft_model="gpt-4o",
check_interval_minutes=5,
# Uncomment below to auto-send low-risk replies:
# auto_send_threshold=Priority.LOW,
)
agent = EmailAgent(config)
agent.run_daemon()
Part 6: Privacy Considerations
This is the section most tutorials skip. Don't.
Data Flow Audit
Every email your agent processes passes through these stages:
| Stage | Data Location | Risk Level | Mitigation |
|---|---|---|---|
| Fetch from provider | In-memory | Medium | Don't log raw email bodies |
| LLM classification | OpenAI API / local | High | Use local models for sensitive orgs |
| Draft generation | OpenAI API / local | High | Same as above |
| SQLite storage | Local disk | Medium | Encrypt the database file |
| OAuth tokens | Local disk (pickle) | Critical | Use OS keychain instead |
Concrete Privacy Recommendations
1. Use local models for sensitive workloads.
from langchain_ollama import ChatOllama
# Replace OpenAI with a local model
local_llm = ChatOllama(model="llama3.1:8b", temperature=0)
# Triage engine with local model
triage_engine = TriageEngine(model_name="local")
triage_engine.llm = local_llm # override the LLM instance
Llama 3.1 8B handles classification well. Draft quality drops noticeably — you'll want at least a 70B model or fine-tuned 13B for professional drafts.
2. Encrypt the SQLite database.
# Use sqlcipher instead of plain sqlite
# pip install pysqlcipher3
from sqlalchemy import create_engine
def create_encrypted_engine(db_path: str, passphrase: str):
# Requires sqlcipher installed
engine = create_engine(
f"sqlite+pysqlcipher://:{passphrase}@/{db_path}",
module=__import__('pysqlcipher3')
)
return engine
3. Never log email bodies.
# BAD
logger.info(f"Processing email: {email_msg.body}")
# GOOD
logger.info(f"Processing email {email_msg.id} from {email_msg.sender}")
4. Implement data retention policies.
class DataRetention:
def __init__(self, tracker: FollowUpTracker, retention_days: int = 30):
self.tracker = tracker
self.retention_days = retention_days
def purge_old_records(self):
from datetime import datetime, timedelta
cutoff = datetime.utcnow() - timedelta(days=self.retention_days)
session = self.tracker.Session()
try:
deleted = session.query(FollowUp).filter(
FollowUp.status == "resolved",
FollowUp.created_at < cutoff
).delete()
session.commit()
logger.info(f"Purged {deleted} old follow-up records")
finally:
session.close()
5. Scope OAuth permissions minimally. Don't request gmail.modify if you only need read access. The Gmail scopes in our code include send and modify — strip what you don't need.
6. Handle PII in drafts. The LLM might reproduce sensitive information from the original email in the draft. Add a post-processing filter:
import re
def redact_sensitive_info(text: str) -> str:
"""Redact common PII patterns from generated drafts."""
# SSN patterns
text = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN REDACTED]', text)
# Credit card numbers
text = re.sub(r'\b\d{4}