Back to Home
Conversational Agents

How to Build a Sales Qualification Agent with Multi-Turn Conversation

Sarah Kim

Quantitative researcher turned AI writer. Specializes in financial AI agents.

March 21, 202615 min read

The gap between "hello" and "meeting booked" is where most sales bots fail. They handle single queries well but crumble during real conversations where prospects raise objections, ask complex question...

Building a Multi-Turn Sales Qualification Agent: A Technical Deep Dive

The gap between "hello" and "meeting booked" is where most sales bots fail. They handle single queries well but crumble during real conversations where prospects raise objections, ask complex questions, and require nuanced qualification. In this tutorial, we'll build a production-grade sales qualification agent that actually navigates multi-turn conversations.

Architecture Overview

Our agent will use a stateful conversation manager with four core modules:

┌─────────────────┐    ┌─────────────────┐
│  Lead Scoring   │◄───┤  Conversation   │
│    Engine       │    │    Manager      │
└────────┬────────┘    └────────┬────────┘
         │                      │
         ▼                      ▼
┌─────────────────┐    ┌─────────────────┐
│  CRM Integration│    │  Objection      │
│    Layer        │◄───┤  Handler        │
└─────────────────┘    └─────────────────┘

We'll use LangChain for conversation orchestration, FastAPI for the backend, and integrate with HubSpot CRM (principles apply to Salesforce, Pipedrive, etc.).

Setting Up the Project

mkdir sales-agent && cd sales-agent
python -m venv venv
source venv/bin/activate
pip install langchain openai fastapi uvicorn hubspot-api-client pydantic python-dateutil

Create the project structure:

sales-agent/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── agent.py
│   ├── scoring.py
│   ├── objections.py
│   ├── crm.py
│   ├── booking.py
│   └── models.py
├── .env
└── requirements.txt

Core Conversation State Management

First, we need to track conversation state across turns. This isn't just message history—it's structured data about where we are in the qualification process.

# app/models.py
from pydantic import BaseModel, Field
from typing import Dict, List, Optional, Any
from datetime import datetime
from enum import Enum

class ConversationStage(str, Enum):
    GREETING = "greeting"
    DISCOVERY = "discovery"
    QUALIFICATION = "qualification"
    OBJECTION = "objection"
    BOOKING = "booking"
    CLOSED = "closed"

class LeadProfile(BaseModel):
    email: Optional[str] = None
    company: Optional[str] = None
    role: Optional[str] = None
    company_size: Optional[int] = None
    industry: Optional[str] = None
    pain_points: List[str] = Field(default_factory=list)
    budget_range: Optional[str] = None
    timeline: Optional[str] = None

class ConversationState(BaseModel):
    lead_id: str
    session_id: str
    stage: ConversationStage = ConversationStage.GREETING
    lead_profile: LeadProfile = Field(default_factory=LeadProfile)
    lead_score: float = 0.0
    objections_raised: List[str] = Field(default_factory=list)
    meeting_proposed: bool = False
    messages: List[Dict[str, Any]] = Field(default_factory=list)
    context: Dict[str, Any] = Field(default_factory=dict)
    last_updated: datetime = Field(default_factory=datetime.now)

Lead Scoring Engine

Real lead scoring isn't just about company size—it's about fit, intent, and timing. We'll implement a weighted scoring system that updates as the conversation progresses.

# app/scoring.py
from typing import Dict, List, Tuple
from dataclasses import dataclass
import re

@dataclass
class ScoringRule:
    field: str
    weight: float
    scoring_function: callable

class LeadScoringEngine:
    def __init__(self):
        self.rules = self._initialize_rules()
        self.thresholds = {
            'hot': 80,
            'warm': 50,
            'cold': 0
        }
    
    def _initialize_rules(self) -> List[ScoringRule]:
        return [
            ScoringRule('company_size', 0.2, self._score_company_size),
            ScoringRule('role', 0.25, self._score_role),
            ScoringRule('pain_points', 0.3, self._score_pain_points),
            ScoringRule('budget_range', 0.15, self._score_budget),
            ScoringRule('timeline', 0.1, self._score_timeline),
        ]
    
    def calculate_score(self, profile: 'LeadProfile') -> Tuple[float, Dict]:
        """Calculate lead score with breakdown"""
        total_score = 0
        breakdown = {}
        
        for rule in self.rules:
            value = getattr(profile, rule.field, None)
            if value is not None:
                rule_score = rule.scoring_function(value)
                weighted_score = rule_score * rule.weight
                total_score += weighted_score
                breakdown[rule.field] = {
                    'raw': rule_score,
                    'weighted': weighted_score,
                    'weight': rule.weight
                }
        
        # Apply conversation engagement multiplier
        engagement_bonus = self._calculate_engagement(profile)
        total_score *= engagement_bonus
        
        return min(100, total_score), breakdown
    
    def _score_company_size(self, size: int) -> float:
        """Score based on ideal customer profile"""
        # Example: Target is 100-1000 employees
        if 100 <= size <= 1000:
            return 100
        elif 50 <= size < 100 or 1000 < size <= 5000:
            return 70
        elif size < 50:
            return 30  # Too small
        else:
            return 50  # Enterprise, longer sales cycle
    
    def _score_role(self, role: str) -> float:
        """Score based on decision-making authority"""
        role_lower = role.lower()
        if any(title in role_lower for title in ['ceo', 'cto', 'vp', 'director']):
            return 100
        elif 'manager' in role_lower:
            return 70
        elif 'lead' in role_lower:
            return 50
        else:
            return 30
    
    def _score_pain_points(self, pain_points: List[str]) -> float:
        """Score based on problem-product fit"""
        high_value_pains = ['scaling', 'efficiency', 'cost reduction', 'automation']
        matches = sum(1 for pain in pain_points 
                     if any(keyword in pain.lower() 
                           for keyword in high_value_pains))
        return min(100, matches * 25)
    
    def _score_budget(self, budget: str) -> float:
        """Parse and score budget range"""
        if not budget:
            return 0
        
        # Extract numbers from budget string
        numbers = re.findall(r'\d+', budget.replace(',', ''))
        if len(numbers) >= 2:
            avg_budget = (int(numbers[0]) + int(numbers[1])) / 2
            if avg_budget >= 50000:
                return 100
            elif avg_budget >= 20000:
                return 70
            else:
                return 30
        return 50  # Default if can't parse
    
    def _score_timeline(self, timeline: str) -> float:
        """Score urgency"""
        timeline_lower = timeline.lower()
        if any(word in timeline_lower for word in ['asap', 'urgent', 'this quarter']):
            return 100
        elif any(word in timeline_lower for word in ['next quarter', '3 months']):
            return 70
        elif '6 months' in timeline_lower:
            return 40
        else:
            return 20
    
    def _calculate_engagement(self, profile: 'LeadProfile') -> float:
        """Multiplier based on conversation engagement"""
        # This would track response times, question quality, etc.
        return 1.0  # Placeholder

Objection Handling System

Objections aren't obstacles—they're buying signals. Our handler uses pattern matching combined with contextual responses.

# app/objections.py
from typing import Dict, List, Optional
import re

class ObjectionHandler:
    def __init__(self):
        self.objection_patterns = self._load_patterns()
        self.response_templates = self._load_templates()
    
    def _load_patterns(self) -> Dict[str, List[str]]:
        """Define objection patterns with regex"""
        return {
            'price': [
                r'too expensive',
                r'budget.{0,20}tight',
                r'cost.{0,20}high',
                r'can.{0,10}afford',
                r'price.{0,20}concern'
            ],
            'timing': [
                r'not.{0,10}ready',
                r'bad time',
                r'later',
                r'next.{0,10}year',
                r'no.{0,10}urgency'
            ],
            'competitor': [
                r'using.{0,20}competitor',
                r'already.{0,10}have.{0,20}solution',
                r'happy with',
                r'current.{0,10}vendor'
            ],
            'authority': [
                r'need.{0,10}check',
                r'talk to.{0,10}team',
                r'not my decision',
                r'approval'
            ],
            'value': [
                r'not sure.{0,20}roi',
                r'don\'t see.{0,10}value',
                r'what.{0,10}difference',
                r'why.{0,10}better'
            ]
        }
    
    def detect_objection(self, message: str) -> Optional[str]:
        """Detect objection type from message"""
        message_lower = message.lower()
        
        for objection_type, patterns in self.objection_patterns.items():
            for pattern in patterns:
                if re.search(pattern, message_lower):
                    return objection_type
        return None
    
    def generate_response(self, objection_type: str, context: Dict) -> str:
        """Generate contextual response to objection"""
        templates = self.response_templates.get(objection_type, [])
        
        if not templates:
            return self._generic_response(objection_type)
        
        # Select template based on context
        template = self._select_template(templates, context)
        
        # Fill in context variables
        response = template.format(**context)
        return response
    
    def _load_templates(self) -> Dict[str, List[str]]:
        """Response templates with variables"""
        return {
            'price': [
                "I understand budget is important. Many of our clients like {company} initially had similar concerns, but found that {value_prop} delivered {roi_metric} within {timeframe}.",
                "Let's look at the ROI. Based on what you've shared about {pain_point}, our solution typically saves {percentage}% in {area}. Would it help to see a cost-benefit analysis?",
                "We have flexible pricing options. Could you share what budget range you're working with? I want to make sure we find the right fit."
            ],
            'timing': [
                "I hear you on timing. What would need to change for this to become a priority? Sometimes starting with a pilot can help build momentum.",
                "Many clients start planning now to be ready for {optimal_time}. Would it make sense to at least explore the options so you're prepared?",
                "What if we scheduled a brief call to discuss your roadmap? No commitment, just to understand your timeline better."
            ],
            'competitor': [
                "That's great you're already investing in this space. What's working well with {competitor}? What would you improve?",
                "Many of our clients switched from {competitor} because of {differentiator}. Would it be helpful to see a comparison?",
                "We integrate well with {competitor}. Some clients use us alongside their existing solution for {specific_use_case}."
            ],
            'authority': [
                "Absolutely, involving the team makes sense. Who else would need to be part of this decision? I can prepare materials for them.",
                "Would it help if I shared some case studies from similar {industry} companies you could share with your team?",
                "What information would help you make the case internally? I'm happy to provide ROI calculations or reference calls."
            ],
            'value': [
                "That's a fair question. Let me share how {similar_company} achieved {specific_result} in {timeframe}.",
                "What metrics matter most to your team? I can tailor our discussion around {key_metric} improvements.",
                "Would a demo focused on your specific use case with {pain_point} help illustrate the value?"
            ]
        }
    
    def _select_template(self, templates: List[str], context: Dict) -> str:
        """Select most appropriate template based on context"""
        # Simple selection - in production, use ML model
        return templates[0]
    
    def _generic_response(self, objection_type: str) -> str:
        return f"I understand your concern about {objection_type}. Could you tell me more about what specifically worries you?"

Meeting Booking Integration

We'll integrate with Calendly's API for actual booking functionality. This handles timezone conversions and availability checks.

# app/booking.py
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from pydantic import BaseModel

class TimeSlot(BaseModel):
    start: datetime
    end: datetime
    available: bool

class MeetingBooker:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.calendly.com"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    def get_available_slots(self, event_type_uri: str, 
                           start_time: datetime, 
                           days_ahead: int = 7) -> List[TimeSlot]:
        """Get available time slots from Calendly"""
        end_time = start_time + timedelta(days=days_ahead)
        
        params = {
            "event_type": event_type_uri,
            "start_time": start_time.isoformat(),
            "end_time": end_time.isoformat()
        }
        
        response = requests.get(
            f"{self.base_url}/scheduled_events/availability",
            headers=self.headers,
            params=params
        )
        
        if response.status_code == 200:
            data = response.json()
            return self._parse_availability(data)
        else:
            # Fallback to default slots
            return self._generate_default_slots(start_time, days_ahead)
    
    def book_meeting(self, invitee_email: str, 
                    invitee_name: str,
                    event_type_uri: str,
                    start_time: datetime,
                    questions: Dict = None) -> Dict:
        """Book a meeting via Calendly"""
        payload = {
            "event_type": event_type_uri,
            "invitee": {
                "email": invitee_email,
                "name": invitee_name,
                "timezone": "America/New_York"  # Would detect from context
            },
            "start_time": start_time.isoformat(),
            "questions": questions or []
        }
        
        response = requests.post(
            f"{self.base_url}/scheduled_events",
            headers=self.headers,
            json=payload
        )
        
        if response.status_code == 201:
            return {
                "success": True,
                "meeting_url": response.json()["resource"]["uri"],
                "confirmation_id": response.json()["resource"]["id"]
            }
        else:
            return {
                "success": False,
                "error": response.text
            }
    
    def _parse_availability(self, data: Dict) -> List[TimeSlot]:
        """Parse Calendly availability response"""
        slots = []
        for event in data.get("collection", []):
            if event.get("status") == "available":
                slots.append(TimeSlot(
                    start=datetime.fromisoformat(event["start_time"]),
                    end=datetime.fromisoformat(event["end_time"]),
                    available=True
                ))
        return slots
    
    def _generate_default_slots(self, start: datetime, days: int) -> List[TimeSlot]:
        """Generate default slots if API fails"""
        slots = []
        current = start.replace(hour=9, minute=0, second=0, microsecond=0)
        
        for day in range(days):
            for hour in [9, 11, 14, 16]:  # 9am, 11am, 2pm, 4pm
                slot_start = current + timedelta(days=day, hours=hour-9)
                slot_end = slot_start + timedelta(minutes=30)
                slots.append(TimeSlot(
                    start=slot_start,
                    end=slot_end,
                    available=True
                ))
        
        return slots

CRM Integration Layer

We'll implement a HubSpot integration that creates/updates contacts and logs conversation activities.

# app/crm.py
from hubspot import HubSpot
from hubspot.crm.contacts import SimplePublicObjectInputForCreate
from typing import Dict, Optional
import json

class CRMIntegration:
    def __init__(self, access_token: str):
        self.client = HubSpot(access_token=access_token)
    
    def create_or_update_contact(self, profile: 'LeadProfile') -> Dict:
        """Create or update contact in HubSpot"""
        if not profile.email:
            return {"error": "Email required"}
        
        # Search for existing contact
        existing = self._find_contact_by_email(profile.email)
        
        if existing:
            return self._update_contact(existing['id'], profile)
        else:
            return self._create_contact(profile)
    
    def log_conversation(self, contact_id: str, 
                        conversation_summary: str,
                        lead_score: float,
                        stage: str) -> Dict:
        """Log conversation activity in CRM"""
        activity = {
            "properties": {
                "hs_note_body": conversation_summary,
                "hs_timestamp": str(int(datetime.now().timestamp() * 1000)),
                "hs_lead_score": str(lead_score),
                "hs_conversation_stage": stage
            }
        }
        
        try:
            response = self.client.crm.objects.notes.basic_api.create(
                simple_public_object_input_for_create=activity
            )
            return {"success": True, "note_id": response.id}
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    def create_deal(self, contact_id: str, 
                   company: str,
                   amount: float = 0) -> Dict:
        """Create deal in HubSpot"""
        deal = {
            "properties": {
                "dealname": f"Sales Qualified Lead - {company}",
                "amount": str(amount),
                "pipeline": "default",
                "dealstage": "qualifiedtobuy",
                "associated_contact_ids": [contact_id]
            }
        }
        
        try:
            response = self.client.crm.deals.basic_api.create(
                simple_public_object_input_for_create=deal
            )
            return {"success": True, "deal_id": response.id}
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    def _find_contact_by_email(self, email: str) -> Optional[Dict]:
        """Search for contact by email"""
        try:
            response = self.client.crm.contacts.search_api.do_search(
                public_object_search_request={
                    "filterGroups": [{
                        "filters": [{
                            "propertyName": "email",
                            "operator": "EQ",
                            "value": email
                        }]
                    }]
                }
            )
            
            if response.total > 0:
                return response.results[0].to_dict()
            return None
        except:
            return None
    
    def _create_contact(self, profile: 'LeadProfile') -> Dict:
        """Create new contact"""
        contact_data = SimplePublicObjectInputForCreate(
            properties={
                "email": profile.email,
                "company": profile.company,
                "jobtitle": profile.role,
                "hs_lead_status": "open",
                "numemployees": str(profile.company_size) if profile.company_size else None
            }
        )
        
        try:
            response = self.client.crm.contacts.basic_api.create(
                simple_public_object_input_for_create=contact_data
            )
            return {"success": True, "contact_id": response.id}
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    def _update_contact(self, contact_id: str, profile: 'LeadProfile') -> Dict:
        """Update existing contact"""
        update_data = SimplePublicObjectInputForCreate(
            properties={
                "company": profile.company,
                "jobtitle": profile.role,
                "numemployees": str(profile.company_size) if profile.company_size else None,
                "hs_lead_status": "in_progress"
            }
        )
        
        try:
            response = self.client.crm.contacts.basic_api.update(
                contact_id=contact_id,
                simple_public_object_input_for_create=update_data
            )
            return {"success": True, "contact_id": contact_id}
        except Exception as e:
            return {"success": False, "error": str(e)}

Main Conversation Agent

Now we'll build the LangChain agent that orchestrates everything.

# app/agent.py
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain.tools import BaseTool
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from typing import Dict, Any, List
import json

class SalesQualificationAgent:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4-1106-preview", temperature=0.3)
        self.conversation_states: Dict[str, ConversationState] = {}
        self.scoring_engine = LeadScoringEngine()
        self.objection_handler = ObjectionHandler()
        self.crm = CRMIntegration(os.getenv("HUBSPOT_TOKEN"))
        self.booker = MeetingBooker(os.getenv("CALENDLY_TOKEN"))
        
        # Initialize tools
        self.tools = self._initialize_tools()
        
        # Create agent
        self.agent = self._create_agent()
    
    def _initialize_tools(self) -> List[BaseTool]:
        """Initialize LangChain tools"""
        return [
            self._create_update_profile_tool(),
            self._create_score_lead_tool(),
            self._create_handle_objection_tool(),
            self._create_book_meeting_tool(),
            self._create_update_crm_tool()
        ]
    
    def _create_agent(self) -> AgentExecutor:
        """Create the main conversation agent"""
        prompt = ChatPromptTemplate.from_messages([
            ("system", """You are a professional sales qualification agent for DriftSeas AI solutions.
            
Your goal is to qualify leads through natural conversation. Follow these guidelines:
1. Start with greeting and discovery questions
2. Gradually collect qualification information
3. Address objections empathetically
4. Propose meeting when lead is qualified
5. Always maintain professional, consultative tone

Current conversation state: {conversation_state}
Lead profile: {lead_profile}
Lead score: {lead_score}
Objections raised: {objections}

Use tools when needed. Always explain your reasoning."""),
            MessagesPlaceholder(variable_name="chat_history"),
            ("human", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ])
        
        agent = create_openai_tools_agent(self.llm, self.tools, prompt)
        return AgentExecutor(agent=agent, tools=self.tools, verbose=True)
    
    async def process_message(self, session_id: str, message: str) -> Dict[str, Any]:
        """Process incoming message and return response"""
        # Get or create conversation state
        if session_id not in self.conversation_states:
            self.conversation_states[session_id] = ConversationState(
                lead_id=f"lead_{session_id}",
                session_id=session_id
            )
        
        state = self.conversation_states[session_id]
        
        # Update message history
        state.messages.append({
            "role": "user",
            "content": message,
            "timestamp": datetime.now().isoformat()
        })
        
        # Detect objection if in appropriate stage
        if state.stage in [ConversationStage.DISCOVERY, ConversationStage.QUALIFICATION]:
            objection_type = self.objection_handler.detect_objection(message)
            if objection_type:
                state.objections_raised.append(objection_type)
                state.stage = ConversationStage.OBJECTION
        
        # Prepare agent input
        agent_input = {
            "input": message,
            "conversation_state": state.stage.value,
            "lead_profile": state.lead_profile.dict(),
            "lead_score": state.lead_score,
            "objections": state.objections_raised,
            "chat_history": state.messages[-10:]  # Last 10 messages for context
        }
        
        # Run agent
        try:
            response = await self.agent.ainvoke(agent_input)
            
            # Update state with agent's actions
            self._update_state_from_actions(state, response.get("intermediate_steps", []))
            
            # Add response to history
            state.messages.append({
                "role": "assistant",
                "content": response["output"],
                "timestamp": datetime.now().isoformat()
            })
            
            # Update last activity
            state.last_updated = datetime.now()
            
            return {
                "response": response["output"],
                "state": {
                    "stage": state.stage.value,
                    "lead_score": state.lead_score,
                    "meeting_proposed": state.meeting_proposed
                }
            }
            
        except Exception as e:
            return {
                "response": "I apologize, I encountered an error. Let me try again.",
                "error": str(e)
            }
    
    def _update_state_from_actions(self, state: ConversationState, 
                                  actions: List) -> None:
        """Update conversation state based on agent actions"""
        for action in actions:
            tool_name = action[0].tool
            tool_input = action[0].tool_input
            
            if tool_name == "update_lead_profile":
                # Update lead profile with new information
                for key, value in tool_input.items():
                    if hasattr(state.lead_profile, key):
                        setattr(state.lead_profile, key, value)
                
                # Recalculate score
                state.lead_score, _ = self.scoring_engine.calculate_score(
                    state.lead_profile
                )
                
            elif tool_name == "handle_objection":
                # Mark objection as addressed
                if state.stage == ConversationStage.OBJECTION:
                    state.stage = ConversationStage.QUALIFICATION
                    
            elif tool_name == "book_meeting":
                state.meeting_proposed = True
                state.stage = ConversationStage.BOOKING
                
            elif tool_name == "update_crm":
                # CRM updated, conversation can close
                state.stage = ConversationStage.CLOSED

FastAPI Backend

Finally, let's create the API endpoint.

# app/main.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Dict, Any
import uuid

app = FastAPI(title="Sales Qualification Agent API")

# CORS for frontend
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Initialize agent
agent = SalesQualificationAgent()

class MessageRequest(BaseModel):
    session_id: str = None
    message: str
    metadata: Dict[str, Any] = {}

class MessageResponse(BaseModel):
    response: str
    session_id: str
    state: Dict[str, Any]

@app.post("/chat", response_model=MessageResponse)
async def chat_endpoint(request: MessageRequest):
    """Main chat endpoint"""
    session_id = request.session_id or str(uuid.uuid4())
    
    try:
        result = await agent.process_message(session_id, request.message)
        
        return MessageResponse(
            response=result["response"],
            session_id=session_id,
            state=result.get("state", {})
        )
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Testing the Agent

Create a test script to simulate conversations:

# test_agent.py
import asyncio
import httpx

async def test_conversation():
    async with httpx.AsyncClient() as client:
        # Start conversation
        response = await client.post("http://localhost:8000/chat", json={
            "message": "Hi, I'm interested in your AI solutions."
        })
        
        session_id = response.json()["session_id"]
        print(f"Bot: {response.json()['response']}")
        
        # Simulate multi-turn conversation
        test_messages = [
            "We're a mid-sized tech company, about 200 employees.",
            "I'm the VP of Engineering, looking to improve our development workflow.",
            "Our main pain point is that our current tools don't scale well.",
            "Budget is around $50k-$100k for the right solution.",
            "We'd like to implement something within the next quarter.",
            "What makes you different from CompetitorX?",
            "That's interesting. How does pricing work?",
            "Let me check with my team first.",
            "Actually, can we schedule a demo next week?"
        ]
        
        for message in test_messages:
            print(f"\nUser: {message}")
            response = await client.post(
                "http://localhost:8000/chat",
                json={
                    "session_id": session_id,
                    "message": message
                }
            )
            data = response.json()
            print(f"Bot: {data['response']}")
            print(f"State: {data['state']}")

if __name__ == "__main__":
    asyncio.run(test_conversation())

Production Considerations

1. Conversation Memory Management

# Add to agent.py
def _cleanup_old_sessions(self, max_age_hours: int = 24):
    """Remove inactive sessions"""
    cutoff = datetime.now() - timedelta(hours=max_age_hours)
    inactive_sessions = [
        session_id for session_id, state in self.conversation_states.items()
        if state.last_updated < cutoff
    ]
    
    for session_id in inactive_sessions:
        del self.conversation_states[session_id]

2. Error Handling and Fallbacks

# Add to main.py
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
    return JSONResponse(
        status_code=500,
        content={
            "response": "I apologize for the technical difficulty. Let me connect you with a human agent.",
            "error": str(exc),
            "fallback": True
        }
    )

3. Analytics and Monitoring

# Add analytics tracking
class ConversationAnalytics:
    def track_event(self, session_id: str, event_type: str, data: Dict):
        """Track conversation events for analysis"""
        # Send to your analytics platform
        # (Mixpanel, Amplitude, custom database)
        pass

Key Takeaways

  1. State Management is Critical: Track conversation stage, lead profile, and context separately
  2. Scoring Should Evolve: Update lead score as new information emerges
  3. Objections Are Opportunities: Pattern match and respond contextually
  4. Integration Points Matter: CRM updates should happen at natural conversation breaks
  5. Test Real Conversations: Multi-turn testing reveals edge cases single-turn testing misses

The complete codebase is available on GitHub at driftseas/sales-agent-tutorial. This implementation handles the messy reality of sales conversations—where prospects change their minds, raise unexpected objections, and require careful nurturing through the qualification process.

Remember: The goal isn't to replace sales reps, but to handle initial qualification so your team can focus on high-value conversations with well-qualified leads.

Keywords

AI agentconversational-agents