Back to Home
Financial Agents

Building an AI Trading Bot with LangChain: A Step-by-Step Tutorial

Alex Chen

AI engineer and open-source contributor. Writes about agent architectures and LLM tooling.

March 6, 202617 min read

Most tutorials on "AI trading bots" are either vaporware or dangerously naive. They show you a LangChain agent calling a single indicator function, then skip straight to "now deploy it and make money....

Building a Quantitative Trading Bot with LangChain Agents

A Realistic Architecture for LLM-Augmented Trading Systems

Most tutorials on "AI trading bots" are either vaporware or dangerously naive. They show you a LangChain agent calling a single indicator function, then skip straight to "now deploy it and make money." This isn't that tutorial.

We're going to build something architecturally honest: a trading system where LangChain agents handle the parts LLMs are actually good at (unstructured data analysis, reasoning over multiple information sources, qualitative assessment) while traditional quantitative methods handle numerical signal generation. The LLM isn't the strategy — it's one component in a pipeline that also includes hard risk controls that no agent can override.

The full system will cover market data ingestion, a hybrid signal generation pipeline, layered risk management, backtesting with realistic assumptions, and paper trading through Alpaca.


Why LLMs in Trading (and Why Not)

Before writing a line of code, let's be clear about what LLMs bring to trading and where they fail:

LLMs are useful for:

  • Parsing and synthesizing news, earnings calls, SEC filings
  • Interpreting macroeconomic context
  • Reasoning across multiple qualitative signals
  • Explaining and justifying trade decisions (auditability)

LLMs are terrible for:

  • Pure numerical pattern recognition (traditional ML is better)
  • High-frequency anything (latency is measured in seconds, not microseconds)
  • Guaranteed consistency (same input can yield different outputs)
  • Mathematical precision

The architecture below reflects this reality.


Project Structure

trading-bot/
├── config.py
├── data/
│   ├── ingestion.py
│   └── store.py
├── agents/
│   ├── market_analyst.py
│   ├── risk_assessor.py
│   └── orchestrator.py
├── signals/
│   ├── technical.py
│   ├── sentiment.py
│   └── composite.py
├── risk/
│   ├── position_sizing.py
│   ├── limits.py
│   └── stops.py
├── execution/
│   ├── paper_trader.py
│   └── order_manager.py
├── backtest/
│   ├── engine.py
│   └── metrics.py
├── main.py
└── requirements.txt

1. Market Data Ingestion

We need two types of data: structured (OHLCV prices, fundamentals) and unstructured (news, filings). We'll use yfinance for price data and a news API for the unstructured side.

# config.py
from dataclasses import dataclass, field
from typing import List

@dataclass
class TradingConfig:
    symbols: List[str] = field(default_factory=lambda: [
        "AAPL", "MSFT", "GOOGL", "AMZN", "NVDA",
        "META", "TSLA", "JPM", "V", "UNH"
    ])
    lookback_days: int = 252  # 1 year of trading days
    initial_capital: float = 100_000.0
    max_position_pct: float = 0.10       # 10% max per position
    max_portfolio_risk: float = 0.02     # 2% max daily portfolio risk
    max_drawdown_pct: float = 0.15       # 15% max drawdown before halt
    stop_loss_pct: float = 0.05          # 5% trailing stop
    paper_trading: bool = True
    alpaca_api_key: str = ""
    alpaca_secret_key: str = ""
    openai_api_key: str = ""
    news_api_key: str = ""
# data/ingestion.py
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, Optional
import requests
import logging

logger = logging.getLogger(__name__)


class MarketDataFetcher:
    """Fetches and caches market data from multiple sources."""

    def __init__(self, config):
        self.config = config
        self._cache: Dict[str, pd.DataFrame] = {}

    def get_price_data(
        self, symbol: str, period_days: Optional[int] = None
    ) -> pd.DataFrame:
        """Fetch OHLCV data. Returns DataFrame with standard columns."""
        if symbol in self._cache:
            return self._cache[symbol]

        days = period_days or self.config.lookback_days
        end = datetime.now()
        start = end - timedelta(days=int(days * 1.5))  # buffer for weekends

        try:
            ticker = yf.Ticker(symbol)
            df = ticker.history(start=start, end=end, auto_adjust=True)

            if df.empty:
                logger.warning(f"No data returned for {symbol}")
                return pd.DataFrame()

            # Standardize columns
            df = df.rename(columns={
                "Open": "open", "High": "high", "Low": "low",
                "Close": "close", "Volume": "volume"
            })
            df = df[["open", "high", "low", "close", "volume"]]
            df.index = pd.to_datetime(df.index)
            df.index = df.index.tz_localize(None)  # Remove timezone
            df = df.tail(days)

            self._cache[symbol] = df
            return df

        except Exception as e:
            logger.error(f"Failed to fetch data for {symbol}: {e}")
            return pd.DataFrame()

    def get_fundamentals(self, symbol: str) -> dict:
        """Fetch basic fundamental data."""
        try:
            ticker = yf.Ticker(symbol)
            info = ticker.info
            return {
                "pe_ratio": info.get("trailingPE"),
                "forward_pe": info.get("forwardPE"),
                "peg_ratio": info.get("pegRatio"),
                "market_cap": info.get("marketCap"),
                "profit_margin": info.get("profitMargins"),
                "revenue_growth": info.get("revenueGrowth"),
                "debt_to_equity": info.get("debtToEquity"),
                "free_cash_flow": info.get("freeCashflow"),
                "sector": info.get("sector"),
                "industry": info.get("industry"),
            }
        except Exception as e:
            logger.error(f"Failed to fetch fundamentals for {symbol}: {e}")
            return {}

    def get_news(self, symbol: str, days: int = 7) -> list:
        """Fetch recent news articles for a symbol."""
        if not self.config.news_api_key:
            return self._get_yfinance_news(symbol)

        # NewsAPI approach
        end = datetime.now()
        start = end - timedelta(days=days)
        url = "https://newsapi.org/v2/everything"
        params = {
            "q": symbol,
            "from": start.strftime("%Y-%m-%d"),
            "to": end.strftime("%Y-%m-%d"),
            "sortBy": "relevancy",
            "language": "en",
            "pageSize": 10,
            "apiKey": self.config.news_api_key,
        }

        try:
            resp = requests.get(url, params=params, timeout=10)
            resp.raise_for_status()
            articles = resp.json().get("articles", [])
            return [
                {
                    "title": a["title"],
                    "description": a.get("description", ""),
                    "source": a["source"]["name"],
                    "published": a["publishedAt"],
                    "content": a.get("content", "")[:500],
                }
                for a in articles
                if a.get("title") and "[Removed]" not in a["title"]
            ]
        except Exception as e:
            logger.error(f"News API error for {symbol}: {e}")
            return []

    def _get_yfinance_news(self, symbol: str) -> list:
        """Fallback: use yfinance's built-in news."""
        try:
            ticker = yf.Ticker(symbol)
            news = ticker.news or []
            return [
                {
                    "title": n.get("title", ""),
                    "description": n.get("summary", ""),
                    "source": n.get("publisher", "Unknown"),
                    "published": datetime.fromtimestamp(
                        n.get("providerPublishTime", 0)
                    ).isoformat(),
                    "content": n.get("summary", "")[:500],
                }
                for n in news[:10]
            ]
        except Exception:
            return []

    def clear_cache(self):
        self._cache.clear()

Key design decisions here: the cache prevents redundant API calls during a single analysis cycle, yfinance is rate-limited but free, and the news fallback means the system degrades gracefully without an API key.


2. Technical Signal Generation

This is where most LLM trading tutorials go wrong. They ask the LLM to calculate RSI. Don't do that. Use pandas-ta for technical indicators and have the LLM interpret the results.

# signals/technical.py
import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import List, Optional

try:
    import pandas_ta as ta
except ImportError:
    ta = None


@dataclass
class TechnicalSignal:
    symbol: str
    signal_type: str       # "BUY", "SELL", "HOLD"
    strength: float        # -1.0 to 1.0
    indicators: dict       # Raw indicator values
    reasoning: str         # Human-readable explanation


class TechnicalAnalyzer:
    """Generates trading signals from price data using technical indicators."""

    def __init__(self):
        self.indicator_weights = {
            "trend": 0.35,
            "momentum": 0.25,
            "volatility": 0.20,
            "volume": 0.20,
        }

    def analyze(self, symbol: str, df: pd.DataFrame) -> TechnicalSignal:
        """Run full technical analysis on price data."""
        if df.empty or len(df) < 50:
            return TechnicalSignal(
                symbol=symbol, signal_type="HOLD",
                strength=0.0, indicators={}, reasoning="Insufficient data"
            )

        indicators = {}

        # --- Trend Indicators ---
        indicators.update(self._trend_indicators(df))

        # --- Momentum Indicators ---
        indicators.update(self._momentum_indicators(df))

        # --- Volatility Indicators ---
        indicators.update(self._volatility_indicators(df))

        # --- Volume Indicators ---
        indicators.update(self._volume_indicators(df))

        # --- Composite Score ---
        score = self._compute_composite_score(indicators)

        signal_type = "BUY" if score > 0.2 else "SELL" if score < -0.2 else "HOLD"

        return TechnicalSignal(
            symbol=symbol,
            signal_type=signal_type,
            strength=round(score, 3),
            indicators=indicators,
            reasoning=self._generate_reasoning(indicators, signal_type, score),
        )

    def _trend_indicators(self, df: pd.DataFrame) -> dict:
        close = df["close"]
        result = {}

        # Moving Averages
        result["sma_20"] = close.rolling(20).mean().iloc[-1]
        result["sma_50"] = close.rolling(50).mean().iloc[-1]
        result["ema_12"] = close.ewm(span=12).mean().iloc[-1]
        result["ema_26"] = close.ewm(span=26).mean().iloc[-1]

        # MACD
        macd_line = result["ema_12"] - result["ema_26"]
        signal_line = (close.ewm(span=12).mean() - close.ewm(span=26).mean()).ewm(span=9).mean().iloc[-1]
        result["macd"] = macd_line
        result["macd_signal"] = signal_line
        result["macd_histogram"] = macd_line - signal_line

        # Price relative to MAs
        current_price = close.iloc[-1]
        result["price_vs_sma20"] = (current_price - result["sma_20"]) / result["sma_20"]
        result["price_vs_sma50"] = (current_price - result["sma_50"]) / result["sma_50"]

        # Trend direction (slope of 20-day SMA)
        sma20 = close.rolling(20).mean()
        result["trend_slope"] = (sma20.iloc[-1] - sma20.iloc[-5]) / sma20.iloc[-5]

        return result

    def _momentum_indicators(self, df: pd.DataFrame) -> dict:
        close = df["close"]
        high = df["high"]
        low = df["low"]
        result = {}

        # RSI (14-period)
        delta = close.diff()
        gain = delta.where(delta > 0, 0).rolling(14).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
        rs = gain / loss.replace(0, np.nan)
        result["rsi_14"] = (100 - (100 / (1 + rs))).iloc[-1]

        # Stochastic Oscillator
        lowest_low = low.rolling(14).min()
        highest_high = high.rolling(14).max()
        result["stoch_k"] = (
            100 * (close - lowest_low) / (highest_high - lowest_low)
        ).iloc[-1]
        result["stoch_d"] = (
            100 * (close - lowest_low) / (highest_high - lowest_low)
        ).rolling(3).mean().iloc[-1]

        # Rate of Change
        result["roc_10"] = ((close.iloc[-1] / close.iloc[-10]) - 1) * 100

        return result

    def _volatility_indicators(self, df: pd.DataFrame) -> dict:
        close = df["close"]
        high = df["high"]
        low = df["low"]
        result = {}

        # Bollinger Bands
        sma20 = close.rolling(20).mean()
        std20 = close.rolling(20).std()
        result["bb_upper"] = (sma20 + 2 * std20).iloc[-1]
        result["bb_lower"] = (sma20 - 2 * std20).iloc[-1]
        result["bb_width"] = (
            (result["bb_upper"] - result["bb_lower"]) / sma20.iloc[-1]
        )
        result["bb_position"] = (
            (close.iloc[-1] - result["bb_lower"])
            / (result["bb_upper"] - result["bb_lower"])
        )

        # ATR (Average True Range)
        tr = pd.DataFrame({
            "hl": high - low,
            "hc": abs(high - close.shift(1)),
            "lc": abs(low - close.shift(1)),
        }).max(axis=1)
        result["atr_14"] = tr.rolling(14).mean().iloc[-1]
        result["atr_pct"] = result["atr_14"] / close.iloc[-1]

        # Historical Volatility (20-day)
        returns = close.pct_change().dropna()
        result["hist_vol_20"] = returns.tail(20).std() * np.sqrt(252)

        return result

    def _volume_indicators(self, df: pd.DataFrame) -> dict:
        close = df["close"]
        volume = df["volume"]
        result = {}

        # Volume Moving Average
        result["vol_sma_20"] = volume.rolling(20).mean().iloc[-1]
        result["vol_ratio"] = volume.iloc[-1] / result["vol_sma_20"]

        # On-Balance Volume
        obv = (np.sign(close.diff()) * volume).cumsum()
        obv_sma = obv.rolling(20).mean()
        result["obv_trend"] = 1 if obv.iloc[-1] > obv_sma.iloc[-1] else -1

        # Volume-Price Trend
        vpt = (close.pct_change() * volume).cumsum()
        result["vpt_slope"] = (vpt.iloc[-1] - vpt.iloc[-10]) / abs(vpt.iloc[-10]) if vpt.iloc[-10] != 0 else 0

        return result

    def _compute_composite_score(self, indicators: dict) -> float:
        """Weighted composite score from all indicator categories."""
        scores = {}

        # Trend score
        trend_signals = []
        if indicators.get("macd_histogram", 0) > 0:
            trend_signals.append(0.5)
        else:
            trend_signals.append(-0.5)
        if indicators.get("price_vs_sma20", 0) > 0:
            trend_signals.append(min(indicators["price_vs_sma20"] * 5, 1.0))
        else:
            trend_signals.append(max(indicators["price_vs_sma20"] * 5, -1.0))
        trend_signals.append(np.clip(indicators.get("trend_slope", 0) * 20, -1, 1))
        scores["trend"] = np.mean(trend_signals)

        # Momentum score
        rsi = indicators.get("rsi_14", 50)
        if rsi > 70:
            mom_signals = [-0.7]  # Overbought
        elif rsi < 30:
            mom_signals = [0.7]   # Oversold
        else:
            mom_signals = [(rsi - 50) / 50 * -0.3]  # Mild mean reversion

        roc = indicators.get("roc_10", 0)
        mom_signals.append(np.clip(roc / 10, -1, 1))
        scores["momentum"] = np.mean(mom_signals)

        # Volatility score (higher vol = more caution)
        bb_pos = indicators.get("bb_position", 0.5)
        if bb_pos > 0.9:
            scores["volatility"] = -0.5  # Near upper band, caution
        elif bb_pos < 0.1:
            scores["volatility"] = 0.5   # Near lower band, opportunity
        else:
            scores["volatility"] = (0.5 - bb_pos) * 0.5

        # Volume score
        vol_ratio = indicators.get("vol_ratio", 1.0)
        obv_trend = indicators.get("obv_trend", 0)
        if vol_ratio > 1.5 and obv_trend > 0:
            scores["volume"] = 0.6
        elif vol_ratio > 1.5 and obv_trend < 0:
            scores["volume"] = -0.6
        else:
            scores["volume"] = obv_trend * 0.2

        # Weighted composite
        composite = sum(
            scores[cat] * weight
            for cat, weight in self.indicator_weights.items()
        )

        return np.clip(composite, -1.0, 1.0)

    def _generate_reasoning(
        self, indicators: dict, signal: str, score: float
    ) -> str:
        """Generate human-readable reasoning for the signal."""
        parts = []

        rsi = indicators.get("rsi_14", 50)
        if rsi > 70:
            parts.append(f"RSI at {rsi:.1f} indicates overbought conditions")
        elif rsi < 30:
            parts.append(f"RSI at {rsi:.1f} indicates oversold conditions")
        else:
            parts.append(f"RSI at {rsi:.1f} is in neutral range")

        macd_hist = indicators.get("macd_histogram", 0)
        if macd_hist > 0:
            parts.append("MACD histogram is positive (bullish momentum)")
        else:
            parts.append("MACD histogram is negative (bearish momentum)")

        atr_pct = indicators.get("atr_pct", 0)
        parts.append(f"ATR is {atr_pct*100:.1f}% of price")

        vol_ratio = indicators.get("vol_ratio", 1)
        if vol_ratio > 1.5:
            parts.append(f"Volume is {vol_ratio:.1f}x the 20-day average (elevated)")
        elif vol_ratio < 0.5:
            parts.append(f"Volume is {vol_ratio:.1f}x average (below normal)")

        return ". ".join(parts) + f". Composite score: {score:.3f}."

This is deterministic, fast, and numerically precise. Now let's add the LLM layer that makes this system more than just a traditional quant bot.


3. LangChain Agent for Qualitative Analysis

This is where LangChain earns its place. The agent synthesizes news, fundamentals, and technical signals into a holistic assessment.

# agents/market_analyst.py
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.tools import tool
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import SystemMessage
from pydantic import BaseModel, Field
from typing import List, Optional
import json
import logging

logger = logging.getLogger(__name__)


class MarketAssessment(BaseModel):
    """Structured output from the market analyst agent."""
    symbol: str
    overall_bias: str = Field(description="BULLISH, BEARISH, or NEUTRAL")
    confidence: float = Field(description="0.0 to 1.0", ge=0, le=1)
    time_horizon: str = Field(description="SHORT_TERM, MEDIUM_TERM, LONG_TERM")
    key_factors: List[str] = Field(description="Top 3-5 factors driving the assessment")
    risk_factors: List[str] = Field(description="Key risks to the thesis")
    sentiment_score: float = Field(description="-1.0 (very bearish) to 1.0 (very bullish)")
    recommendation: str = Field(description="BUY, SELL, HOLD with sizing suggestion")
    reasoning: str = Field(description="Detailed reasoning")


def create_market_analyst_agent(config, data_fetcher, technical_analyzer):
    """Create a LangChain agent for market analysis."""

    llm = ChatOpenAI(
        model="gpt-4o",
        temperature=0.1,  # Low temperature for consistency
        api_key=config.openai_api_key,
    )

    # Define tools the agent can use
    @tool
    def get_price_analysis(symbol: str) -> str:
        """Get technical analysis results for a stock symbol.
        Returns indicators, signals, and composite scores."""
        df = data_fetcher.get_price_data(symbol)
        signal = technical_analyzer.analyze(symbol, df)
        return json.dumps({
            "symbol": signal.symbol,
            "signal": signal.signal_type,
            "strength": signal.strength,
            "reasoning": signal.reasoning,
            "key_indicators": {
                k: round(v, 4) if isinstance(v, float) else v
                for k, v in signal.indicators.items()
            }
        }, indent=2)

    @tool
    def get_recent_news(symbol: str) -> str:
        """Get recent news articles for a stock symbol.
        Returns titles, descriptions, and sources."""
        news = data_fetcher.get_news(symbol)
        if not news:
            return "No recent news found."
        return json.dumps(news[:7], indent=2)

    @tool
    def get_fundamentals(symbol: str) -> str:
        """Get fundamental data for a stock including valuation
        metrics, profitability, and growth."""
        fundamentals = data_fetcher.get_fundamentals(symbol)
        if not fundamentals:
            return "No fundamental data available."
        return json.dumps({
            k: v for k, v in fundamentals.items() if v is not None
        }, indent=2)

    @tool
    def get_price_summary(symbol: str) -> str:
        """Get a summary of recent price action including
        returns over multiple periods and current price levels."""
        df = data_fetcher.get_price_data(symbol)
        if df.empty:
            return "No price data available."

        close = df["close"]
        current = close.iloc[-1]

        def pct_return(n):
            if len(close) >= n:
                return round((current / close.iloc[-n] - 1) * 100, 2)
            return None

        return json.dumps({
            "current_price": round(current, 2),
            "5d_return_pct": pct_return(5),
            "20d_return_pct": pct_return(20),
            "60d_return_pct": pct_return(60),
            "252d_return_pct": pct_return(252),
            "52w_high": round(close.max(), 2),
            "52w_low": round(close.min(), 2),
            "pct_from_52w_high": round((current / close.max() - 1) * 100, 2),
            "avg_daily_volume": int(df["volume"].tail(20).mean()),
        }, indent=2)

    tools = [get_price_analysis, get_recent_news, get_fundamentals, get_price_summary]

    system_prompt = """You are a senior quantitative market analyst. Your job is to
synthesize technical analysis, news sentiment, and fundamental data into a clear
trading assessment for a given stock.

IMPORTANT GUIDELINES:
- Be specific and data-driven. Cite actual numbers from the tools.
- Distinguish between short-term trading signals and longer-term fundamentals.
- Always identify the strongest counter-arguments to your assessment.
- Be honest about uncertainty. Low confidence is acceptable.
- Consider macro context: is this a stock-specific move or sector-wide?
- News sentiment can be misleading. Cross-reference with price action.
- Never fabricate data. If you don't have information, say so.

Your output must be a structured MarketAssessment with:
- overall_bias: BULLISH, BEARISH, or NEUTRAL
- confidence: 0.0 to 1.0 (be conservative; most assessments should be 0.3-0.7)
- key_factors: the 3-5 most important drivers
- risk_factors: what could go wrong
- sentiment_score: -1.0 to 1.0 based on news/qualitative factors
- recommendation: BUY, SELL, or HOLD with position sizing guidance
- reasoning: detailed explanation"""

    prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content=system_prompt),
        MessagesPlaceholder(variable_name="chat_history", optional=True),
        ("human", "{input}"),
        MessagesPlaceholder(variable_name="agent_scratchpad"),
    ])

    agent = create_openai_functions_agent(llm, tools, prompt)

    executor = AgentExecutor(
        agent=agent,
        tools=tools,
        verbose=False,
        max_iterations=8,
        handle_parsing_errors=True,
        return_intermediate_steps=True,
    )

    return executor


class MarketAnalyst:
    """High-level wrapper that runs the agent and parses output."""

    def __init__(self, config, data_fetcher, technical_analyzer):
        self.agent_executor = create_market_analyst_agent(
            config, data_fetcher, technical_analyzer
        )
        self.config = config

    def analyze(self, symbol: str) -> MarketAssessment:
        """Run full analysis on a symbol and return structured assessment."""
        try:
            result = self.agent_executor.invoke({
                "input": (
                    f"Provide a comprehensive trading analysis for {symbol}. "
                    f"Use all available tools to gather data, then synthesize "
                    f"your findings into a structured assessment."
                )
            })

            # Parse the agent's output into a MarketAssessment
            output = result.get("output", "")

            # The agent's final output should contain the structured assessment
            # We'll use the LLM to extract structured data
            return self._parse_assessment(symbol, output)

        except Exception as e:
            logger.error(f"Analysis failed for {symbol}: {e}")
            return MarketAssessment(
                symbol=symbol,
                overall_bias="NEUTRAL",
                confidence=0.0,
                time_horizon="SHORT_TERM",
                key_factors=[f"Analysis failed: {str(e)}"],
                risk_factors=["System error - no analysis available"],
                sentiment_score=0.0,
                recommendation="HOLD",
                reasoning=f"Could not complete analysis: {e}",
            )

    def _parse_assessment(self, symbol: str, output: str) -> MarketAssessment:
        """Parse agent output into structured assessment."""
        # Use a separate LLM call with structured output for reliability
        from langchain_openai import ChatOpenAI

        llm = ChatOpenAI(
            model="gpt-4o-mini",  # Cheaper model for parsing
            temperature=0,
            api_key=self.config.openai_api_key,
        )

        structured_llm = llm.with_structured_output(MarketAssessment)

        try:
            assessment = structured_llm.invoke(
                f"Parse this market analysis into a structured assessment "
                f"for {symbol}:\n\n{output}"
            )
            return assessment
        except Exception as e:
            logger.error(f"Failed to parse assessment for {symbol}: {e}")
            return MarketAssessment(
                symbol=symbol,
                overall_bias="NEUTRAL",
                confidence=0.1,
                time_horizon="SHORT_TERM",
                key_factors=["Parse failure"],
                risk_factors=["Assessment could not be structured"],
                sentiment_score=0.0,
                recommendation="HOLD",
                reasoning=output[:500],
            )

The two-step pattern (agent generates → structured parser extracts) is more reliable than asking the agent to output JSON directly. The agent gets to reason freely, then a deterministic extraction step ensures we get valid structured data.


4. Composite Signal Generation

Now we combine technical and qualitative signals:

# signals/composite.py
from dataclasses import dataclass
from typing import List
import numpy as np


@dataclass
class CompositeSignal:
    symbol: str
    action: str          # "BUY", "SELL", "HOLD"
    strength: float      # -1.0 to 1.0
    technical_score: float
    sentiment_score: float
    confidence: float
    position_size_hint: float  # 0.0 to 1.0 (fraction of max position)
    reasoning: str


class CompositeSignalGenerator:
    """Combines technical and LLM-based signals into actionable recommendations."""

    def __init__(self, technical_weight: float = 0.6, sentiment_weight: float = 0.4):
        self.technical_weight = technical_weight
        self.sentiment_weight = sentiment_weight

        # Thresholds
        self.buy_threshold = 0.25
        self.sell_threshold = -0.25
        self.min_confidence = 0.3

    def generate(
        self,
        technical_signal,
        market_assessment,
    ) -> CompositeSignal:
        """Combine technical and qualitative signals."""

        t_score = technical_signal.strength
        s_score = market_assessment.sentiment_score

        # Weighted composite
        composite = (
            self.technical_weight * t_score
            + self.sentiment_weight * s_score
        )

        # Confidence adjustment: if signals disagree, reduce confidence
        agreement = 1.0 - abs(t_score - s_score) / 2.0
        confidence = market_assessment.confidence * agreement

        # Signal agreement bonus/penalty
        if (t_score > 0 and s_score > 0) or (t_score < 0 and s_score < 0):
            # Signals agree - boost composite slightly
            composite *= 1.15
        else:
            # Signals disagree - dampen
            composite *= 0.7

        composite = np.clip(composite, -1.0, 1.0)

        # Determine action
        if confidence < self.min_confidence:
            action = "HOLD"
        elif composite > self.buy_threshold:
            action = "BUY"
        elif composite < self.sell_threshold:
            action = "SELL"
        else:
            action = "HOLD"

        # Position sizing hint (scaled by confidence and strength)
        position_hint = abs(composite) * confidence if action != "HOLD" else 0.0

        reasoning = (
            f"Technical: {technical_signal.signal_type} ({t_score:.3f}), "
            f"Sentiment: {market_assessment.overall_bias} ({s_score:.3f}), "
            f"Composite: {composite:.3f}, Confidence: {confidence:.3f}. "
            f"Technical: {technical_signal.reasoning} "
            f"Qualitative: {'; '.join(market_assessment.key_factors[:3])}"
        )

        return CompositeSignal(
            symbol=technical_signal.symbol,
            action=action,
            strength=round(composite, 3),
            technical_score=round(t_score, 3),
            sentiment_score=round(s_score, 3),
            confidence=round(confidence, 3),
            position_size_hint=round(position_hint, 3),
            reasoning=reasoning,
        )

5. Risk Management (The Non-Negotiable Layer)

This is the most important part of the system. Risk management is a hard constraint layer — it sits between signal generation and execution, and nothing bypasses it.

# risk/limits.py
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import numpy as np

logger = logging.getLogger(__name__)


@dataclass
class Position:
    symbol: str
    shares: int
    entry_price: float
    current_price: float
    stop_loss: float
    highest_price: float  # For trailing stop
    entry_date: str

    @property
    def market_value(self):
        return self.shares * self.current_price

    @property
    def pnl(self):
        return (self.current_price - self.entry_price) * self.shares

    @property
    def pnl_pct(self):
        if self.entry_price == 0:
            return 0
        return (self.current_price / self.entry_price - 1) * 100


@dataclass
class Portfolio:
    cash: float
    positions: Dict[str, Position] = field(default_factory=dict)
    peak_value: float = 0.0
    trade_history: List[dict] = field(default_factory=list)
    halted: bool = False

    @property
    def total_value(self):
        pos_value = sum(p.market_value for p in self.positions.values())
        return self.cash + pos_value

    @property
    def total_pnl(self):
        return sum(p.pnl for p in self.positions.values())

    @property
    def drawdown(self):
        if self.peak_value == 0:
            return 0
        return (self.peak_value - self.total_value) / self.peak_value


class RiskManager:
    """Enforces risk constraints on all trades. Cannot be overridden by agents."""

    def __init__(self, config):
        self.config = config
        self.max_position_pct = config.max_position_pct
        self.max_portfolio_risk = config.max_portfolio_risk
        self.max_drawdown_pct = config.max_drawdown_pct
        self.stop_loss_pct = config.stop_loss_pct

    def check_trade(
        self,
        symbol: str,
        action: str,
        shares: int,
        price: float,
        portfolio: Portfolio,
        signal_confidence: float,
    ) -> tuple[bool, int, str]:
        """
        Validate a trade against all risk constraints.
        Returns (approved, adjusted_shares, reason).
        """
        if portfolio.halted:
            return False, 0, "PORTFOLIO HALTED: Max drawdown exceeded"

        if action == "HOLD":
            return True, 0, "No trade needed"

        trade_value = shares * price

        # --- Constraint 1: Cash available ---
        if action == "BUY":
            if trade_value > portfolio.cash:
                max_affordable = int(portfolio.cash / price)
                if max_affordable ==

Keywords

AI agentfinancial-agents