Back to Home
Data Agents

Building an ETL Pipeline Agent with LangChain and Pandas

Marcus Rivera

Full-stack developer and agent builder. Covers coding assistants and dev tools.

April 21, 202616 min read

ETL (Extract, Transform, Load) pipelines have traditionally been rigid, code-heavy systems that break when data sources change. What if your pipeline could adapt to schema changes, intelligently handl...

Building an AI-Powered ETL Agent: From Data Sources to Production Pipelines

ETL (Extract, Transform, Load) pipelines have traditionally been rigid, code-heavy systems that break when data sources change. What if your pipeline could adapt to schema changes, intelligently handle unexpected data formats, and self-heal when errors occur? This tutorial walks you through building an AI agent that does exactly that—combining traditional ETL patterns with modern AI capabilities.

We'll build a production-ready agent using Python, LangChain, and Apache Airflow that connects to multiple data sources, applies intelligent transformations, handles errors gracefully, and runs on a schedule.

Architecture Overview

Our AI ETL agent consists of four core components:

┌─────────────┐     ┌─────────────────┐     ┌─────────────┐
│   Data       │     │   AI Transform  │     │   Data      │
│   Sources    │────▶│   Engine        │────▶│   Warehouse │
│   (Extract)  │     │   (Transform)   │     │   (Load)    │
└─────────────┘     └─────────────────┘     └─────────────┘
       │                    │                       │
       └────────────────────┴───────────────────────┘
                           │
                    ┌──────┴──────┐
                    │   Orchestrator  │
                    │   (Airflow)     │
                    └───────────────┘

The agent uses:

  • LangChain for AI reasoning and tool execution
  • SQLAlchemy for database connections
  • Pandas for data manipulation
  • Apache Airflow for scheduling and orchestration
  • Pydantic for data validation

Part 1: Data Source Connection Layer

First, let's build a flexible connection manager that handles multiple data sources:

from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass
from enum import Enum
import pandas as pd
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine import Engine
import boto3
from google.cloud import bigquery
import requests
from io import StringIO
import json

class DataSourceType(Enum):
    POSTGRESQL = "postgresql"
    MYSQL = "mysql"
    S3 = "s3"
    GCS = "gcs"
    REST_API = "rest_api"
    CSV_FILE = "csv_file"
    BIGQUERY = "bigquery"

@dataclass
class DataSourceConfig:
    """Configuration for a data source."""
    source_type: DataSourceType
    connection_params: Dict[str, Any]
    name: str
    description: str = ""
    retry_attempts: int = 3
    timeout: int = 30

class DataSourceConnector:
    """Handles connections to various data sources with retry logic."""
    
    def __init__(self):
        self._engines: Dict[str, Engine] = {}
        self._clients: Dict[str, Any] = {}
    
    def get_engine(self, config: DataSourceConfig) -> Engine:
        """Get or create a SQLAlchemy engine with connection pooling."""
        if config.name not in self._engines:
            if config.source_type == DataSourceType.POSTGRESQL:
                conn_str = (
                    f"postgresql://{config.connection_params['user']}:"
                    f"{config.connection_params['password']}@"
                    f"{config.connection_params['host']}:"
                    f"{config.connection_params['port']}/"
                    f"{config.connection_params['database']}"
                )
            elif config.source_type == DataSourceType.MYSQL:
                conn_str = (
                    f"mysql+pymysql://{config.connection_params['user']}:"
                    f"{config.connection_params['password']}@"
                    f"{config.connection_params['host']}:"
                    f"{config.connection_params['port']}/"
                    f"{config.connection_params['database']}"
                )
            else:
                raise ValueError(f"Unsupported SQL source: {config.source_type}")
            
            self._engines[config.name] = create_engine(
                conn_str,
                pool_size=5,
                max_overflow=10,
                pool_pre_ping=True  # Verify connections before use
            )
        
        return self._engines[config.name]
    
    def extract_sql_data(
        self, 
        config: DataSourceConfig, 
        query: str,
        params: Optional[Dict] = None
    ) -> pd.DataFrame:
        """Extract data from SQL databases with automatic retry."""
        import time
        
        engine = self.get_engine(config)
        
        for attempt in range(config.retry_attempts):
            try:
                with engine.connect() as conn:
                    df = pd.read_sql(
                        query, 
                        conn, 
                        params=params,
                        parse_dates=True
                    )
                    return df
            except Exception as e:
                if attempt == config.retry_attempts - 1:
                    raise
                print(f"Attempt {attempt + 1} failed: {e}. Retrying...")
                time.sleep(2 ** attempt)  # Exponential backoff
    
    def extract_s3_data(
        self, 
        config: DataSourceConfig,
        file_key: str,
        file_format: str = "csv"
    ) -> pd.DataFrame:
        """Extract data from S3 with intelligent format detection."""
        s3_client = boto3.client(
            's3',
            aws_access_key_id=config.connection_params['aws_access_key_id'],
            aws_secret_access_key=config.connection_params['aws_secret_access_key'],
            region_name=config.connection_params.get('region', 'us-east-1')
        )
        
        response = s3_client.get_object(
            Bucket=config.connection_params['bucket'],
            Key=file_key
        )
        
        content = response['Body'].read().decode('utf-8')
        
        if file_format == "csv":
            return pd.read_csv(StringIO(content))
        elif file_format == "json":
            return pd.read_json(StringIO(content))
        elif file_format == "parquet":
            import pyarrow.parquet as pq
            import io
            return pq.read_table(io.BytesIO(content.encode())).to_pandas()
        else:
            # Let AI infer the format
            return self._infer_and_parse(content, file_key)
    
    def _infer_and_parse(self, content: str, filename: str) -> pd.DataFrame:
        """Use heuristics to determine file format."""
        if content.strip().startswith('{') or content.strip().startswith('['):
            return pd.read_json(StringIO(content))
        elif ',' in content.split('\n')[0]:
            return pd.read_csv(StringIO(content))
        else:
            # Try space-separated
            return pd.read_csv(StringIO(content), sep=r'\s+')
    
    def extract_api_data(
        self,
        config: DataSourceConfig,
        endpoint: str,
        method: str = "GET",
        headers: Optional[Dict] = None,
        params: Optional[Dict] = None,
        pagination_key: Optional[str] = None
    ) -> pd.DataFrame:
        """Extract data from REST APIs with pagination support."""
        all_data = []
        url = f"{config.connection_params['base_url']}{endpoint}"
        
        while url:
            response = requests.request(
                method=method,
                url=url,
                headers=headers or config.connection_params.get('headers', {}),
                params=params,
                timeout=config.timeout
            )
            response.raise_for_status()
            
            data = response.json()
            
            if isinstance(data, list):
                all_data.extend(data)
            elif isinstance(data, dict):
                all_data.append(data)
            
            # Handle pagination
            if pagination_key and pagination_key in data:
                url = data[pagination_key]
                params = {}  # Reset params for subsequent requests
            else:
                url = None
        
        return pd.DataFrame(all_data)
    
    def introspect_source(self, config: DataSourceConfig) -> Dict[str, Any]:
        """Use AI to understand data source structure."""
        if config.source_type in [DataSourceType.POSTGRESQL, DataSourceType.MYSQL]:
            engine = self.get_engine(config)
            inspector = inspect(engine)
            
            tables = inspector.get_table_names()
            schema_info = {}
            
            for table in tables[:10]:  # Limit to first 10 tables
                columns = inspector.get_columns(table)
                schema_info[table] = [
                    {
                        'name': col['name'],
                        'type': str(col['type']),
                        'nullable': col.get('nullable', True)
                    }
                    for col in columns
                ]
            
            return {
                'type': 'sql',
                'tables': tables,
                'schema': schema_info,
                'sample_query': f"SELECT * FROM {tables[0]} LIMIT 5" if tables else None
            }
        
        return {'type': 'unknown', 'message': 'Introspection not implemented for this source'}

Part 2: AI Transformation Engine

Now let's build the intelligent transformation layer that uses LLMs to understand and transform data:

from langchain.agents import AgentExecutor, create_structured_chat_agent
from langchain.tools import BaseTool, StructuredTool, tool
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
import pandas as pd
from typing import List, Dict, Any, Optional
import numpy as np

class DataTransformationAgent:
    """AI agent for intelligent data transformations."""
    
    def __init__(self, llm: ChatOpenAI = None):
        self.llm = llm or ChatOpenAI(
            model="gpt-4-turbo-preview",
            temperature=0.1  # Low temperature for consistent transformations
        )
        self.df: Optional[pd.DataFrame] = None
        self.transformation_history: List[Dict] = []
        
    def load_data(self, df: pd.DataFrame, sample_size: int = 1000):
        """Load data and create a sample for AI analysis."""
        self.df = df
        self.sample = df.head(sample_size)
        self._analyze_data()
    
    def _analyze_data(self):
        """Analyze data structure and quality."""
        analysis = {
            'shape': self.df.shape,
            'dtypes': self.df.dtypes.to_dict(),
            'missing_values': self.df.isnull().sum().to_dict(),
            'numeric_stats': {},
            'categorical_stats': {}
        }
        
        # Get statistics for numeric columns
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            analysis['numeric_stats'][col] = {
                'mean': self.df[col].mean(),
                'std': self.df[col].std(),
                'min': self.df[col].min(),
                'max': self.df[col].max()
            }
        
        # Get value counts for categorical columns
        categorical_cols = self.df.select_dtypes(include=['object', 'category']).columns
        for col in categorical_cols:
            if self.df[col].nunique() < 20:  # Only for low-cardinality columns
                analysis['categorical_stats'][col] = self.df[col].value_counts().head(10).to_dict()
        
        self.data_analysis = analysis
    
    def create_transformation_tools(self) -> List[BaseTool]:
        """Create tools for the AI agent to use."""
        
        @tool
        def rename_columns(mapping: Dict[str, str]) -> str:
            """Rename columns in the DataFrame.
            
            Args:
                mapping: Dictionary of {old_name: new_name}
            """
            self.df = self.df.rename(columns=mapping)
            self.transformation_history.append({
                'operation': 'rename_columns',
                'params': {'mapping': mapping}
            })
            return f"Renamed columns: {mapping}"
        
        @tool
        def convert_column_type(column: str, target_type: str) -> str:
            """Convert column data type.
            
            Args:
                column: Column name to convert
                target_type: Target type (str, int, float, datetime, category)
            """
            try:
                if target_type == 'datetime':
                    self.df[column] = pd.to_datetime(self.df[column])
                elif target_type == 'category':
                    self.df[column] = self.df[column].astype('category')
                else:
                    self.df[column] = self.df[column].astype(target_type)
                
                self.transformation_history.append({
                    'operation': 'convert_column_type',
                    'params': {'column': column, 'target_type': target_type}
                })
                return f"Converted {column} to {target_type}"
            except Exception as e:
                return f"Error converting {column}: {str(e)}"
        
        @tool
        def fill_missing_values(
            column: str, 
            method: str = "mean",
            fill_value: Optional[Any] = None
        ) -> str:
            """Fill missing values in a column.
            
            Args:
                column: Column name
                method: Method to use (mean, median, mode, forward, backward, value)
                fill_value: Value to use if method is 'value'
            """
            if method == "mean":
                self.df[column] = self.df[column].fillna(self.df[column].mean())
            elif method == "median":
                self.df[column] = self.df[column].fillna(self.df[column].median())
            elif method == "mode":
                self.df[column] = self.df[column].fillna(self.df[column].mode()[0])
            elif method == "forward":
                self.df[column] = self.df[column].fillna(method='ffill')
            elif method == "backward":
                self.df[column] = self.df[column].fillna(method='bfill')
            elif method == "value":
                self.df[column] = self.df[column].fillna(fill_value)
            
            self.transformation_history.append({
                'operation': 'fill_missing_values',
                'params': {'column': column, 'method': method, 'fill_value': fill_value}
            })
            return f"Filled missing values in {column} using {method}"
        
        @tool
        def create_derived_column(
            new_column: str,
            expression: str,
            description: str = ""
        ) -> str:
            """Create a new column based on an expression.
            
            Args:
                new_column: Name for the new column
                expression: Python expression using column names (use df['col'] syntax)
                description: Description of what the column represents
            """
            try:
                # Safety check - only allow certain operations
                allowed_names = {'df', 'pd', 'np', 'col'}
                allowed_funcs = {'abs', 'round', 'int', 'float', 'str', 'len'}
                
                self.df[new_column] = eval(expression, {"__builtins__": {}}, {
                    'df': self.df,
                    'pd': pd,
                    'np': np,
                    'col': self.df.columns.tolist()
                })
                
                self.transformation_history.append({
                    'operation': 'create_derived_column',
                    'params': {
                        'new_column': new_column,
                        'expression': expression,
                        'description': description
                    }
                })
                return f"Created new column: {new_column}"
            except Exception as e:
                return f"Error creating column: {str(e)}"
        
        @tool
        def filter_rows(condition: str) -> str:
            """Filter rows based on a condition.
            
            Args:
                condition: Python condition using column names
            """
            try:
                initial_count = len(self.df)
                self.df = self.df.query(condition)
                removed = initial_count - len(self.df)
                
                self.transformation_history.append({
                    'operation': 'filter_rows',
                    'params': {'condition': condition}
                })
                return f"Filtered rows. Removed {removed} rows."
            except Exception as e:
                return f"Error filtering rows: {str(e)}"
        
        @tool
        def normalize_column(column: str, method: str = "minmax") -> str:
            """Normalize a numeric column.
            
            Args:
                column: Column to normalize
                method: Normalization method (minmax, zscore, log)
            """
            if method == "minmax":
                min_val = self.df[column].min()
                max_val = self.df[column].max()
                self.df[column] = (self.df[column] - min_val) / (max_val - min_val)
            elif method == "zscore":
                mean = self.df[column].mean()
                std = self.df[column].std()
                self.df[column] = (self.df[column] - mean) / std
            elif method == "log":
                self.df[column] = np.log1p(self.df[column])
            
            self.transformation_history.append({
                'operation': 'normalize_column',
                'params': {'column': column, 'method': method}
            })
            return f"Normalized {column} using {method}"
        
        return [
            rename_columns,
            convert_column_type,
            fill_missing_values,
            create_derived_column,
            filter_rows,
            normalize_column
        ]
    
    def create_transformation_agent(self) -> AgentExecutor:
        """Create the AI transformation agent."""
        tools = self.create_transformation_tools()
        
        system_prompt = """You are an expert data transformation agent. Your job is to clean, 
        transform, and prepare data for analysis. You have access to a pandas DataFrame.
        
        Current DataFrame Info:
        - Shape: {shape}
        - Columns: {columns}
        - Data Types: {dtypes}
        - Missing Values: {missing_values}
        
        Sample Data:
        {sample}
        
        When transforming data:
        1. First analyze the data structure and quality issues
        2. Suggest a transformation plan
        3. Execute transformations step by step
        4. Validate results after each step
        
        Always explain your reasoning and ask for confirmation before making destructive changes."""
        
        prompt = ChatPromptTemplate.from_messages([
            ("system", system_prompt.format(
                shape=self.df.shape,
                columns=self.df.columns.tolist(),
                dtypes=self.df.dtypes.to_dict(),
                missing_values=self.df.isnull().sum().to_dict(),
                sample=self.sample.head(5).to_string()
            )),
            MessagesPlaceholder(variable_name="chat_history", optional=True),
            ("human", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ])
        
        agent = create_structured_chat_agent(self.llm, tools, prompt)
        
        return AgentExecutor(
            agent=agent,
            tools=tools,
            verbose=True,
            handle_parsing_errors=True,
            max_iterations=15
        )
    
    def suggest_transformations(self) -> List[Dict[str, str]]:
        """Use AI to suggest transformations based on data analysis."""
        prompt = f"""Based on this data analysis, suggest transformations to clean and prepare the data:
        
        Data Shape: {self.data_analysis['shape']}
        Missing Values: {self.data_analysis['missing_values']}
        Numeric Stats: {self.data_analysis['numeric_stats']}
        Categorical Stats: {self.data_analysis['categorical_stats']}
        
        Provide a JSON array of transformation suggestions with:
        - operation: The transformation to apply
        - column: Column to transform (if applicable)
        - params: Parameters for the transformation
        - reason: Why this transformation is needed
        
        Focus on:
        1. Handling missing values
        2. Fixing data types
        3. Normalizing/standardizing data
        4. Creating useful derived columns
        5. Removing outliers if necessary"""
        
        response = self.llm.invoke(prompt)
        
        # Parse the JSON response
        try:
            import json
            # Extract JSON from response
            json_str = response.content
            if "```json" in json_str:
                json_str = json_str.split("```json")[1].split("```")[0]
            elif "```" in json_str:
                json_str = json_str.split("```")[1].split("```")[0]
            
            suggestions = json.loads(json_str)
            return suggestions
        except:
            return [{'error': 'Could not parse AI response', 'response': response.content}]
    
    def apply_transformation_plan(self, plan: List[Dict]) -> pd.DataFrame:
        """Apply a transformation plan to the DataFrame."""
        results = []
        
        for step in plan:
            operation = step.get('operation')
            column = step.get('column')
            params = step.get('params', {})
            
            try:
                if operation == 'fill_missing':
                    method = params.get('method', 'mean')
                    fill_value = params.get('fill_value')
                    self.df[column] = self.df[column].fillna(
                        self.df[column].mean() if method == 'mean' else
                        self.df[column].median() if method == 'median' else
                        self.df[column].mode()[0] if method == 'mode' else
                        fill_value
                    )
                    results.append({'step': step, 'status': 'success'})
                    
                elif operation == 'convert_type':
                    target_type = params.get('target_type')
                    self.df[column] = self.df[column].astype(target_type)
                    results.append({'step': step, 'status': 'success'})
                    
                elif operation == 'normalize':
                    method = params.get('method', 'minmax')
                    if method == 'minmax':
                        min_val = self.df[column].min()
                        max_val = self.df[column].max()
                        self.df[column] = (self.df[column] - min_val) / (max_val - min_val)
                    results.append({'step': step, 'status': 'success'})
                    
            except Exception as e:
                results.append({'step': step, 'status': 'error', 'error': str(e)})
        
        return self.df

Part 3: Error Handling and Recovery

Robust error handling is critical for production ETL pipelines:

import logging
from datetime import datetime
from typing import Callable, Any
import traceback
from functools import wraps
import json

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('etl_agent.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger('ETLAgent')

class ETLErrorHandler:
    """Comprehensive error handling for ETL operations."""
    
    def __init__(self, max_retries: int = 3, backoff_factor: float = 2.0):
        self.max_retries = max_retries
        self.backoff_factor = backoff_factor
        self.error_log = []
        self.alert_callbacks = []
    
    def retry_with_backoff(self, func: Callable) -> Callable:
        """Decorator for automatic retry with exponential backoff."""
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(self.max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    wait_time = self.backoff_factor ** attempt
                    
                    error_info = {
                        'function': func.__name__,
                        'attempt': attempt + 1,
                        'error': str(e),
                        'traceback': traceback.format_exc(),
                        'timestamp': datetime.now().isoformat(),
                        'args': str(args),
                        'kwargs': str(kwargs)
                    }
                    self.error_log.append(error_info)
                    
                    logger.warning(
                        f"Attempt {attempt + 1}/{self.max_retries} failed for {func.__name__}: {e}. "
                        f"Retrying in {wait_time:.1f} seconds..."
                    )
                    
                    if attempt < self.max_retries - 1:
                        import time
                        time.sleep(wait_time)
            
            # All retries failed
            self._handle_critical_error(func.__name__, last_exception)
            raise last_exception
        
        return wrapper
    
    def _handle_critical_error(self, operation: str, error: Exception):
        """Handle critical errors that couldn't be resolved."""
        error_report = {
            'operation': operation,
            'error': str(error),
            'full_traceback': traceback.format_exc(),
            'timestamp': datetime.now().isoformat(),
            'context': 'ETL Pipeline Critical Failure'
        }
        
        logger.error(f"CRITICAL ERROR in {operation}: {error}")
        
        # Send alerts
        for callback in self.alert_callbacks:
            try:
                callback(error_report)
            except Exception as alert_error:
                logger.error(f"Failed to send alert: {alert_error}")
    
    def add_alert_callback(self, callback: Callable):
        """Add a callback function for error alerts."""
        self.alert_callbacks.append(callback)
    
    def validate_data_quality(self, df: pd.DataFrame, rules: Dict[str, Any]) -> Dict[str, Any]:
        """Validate data quality against defined rules."""
        validation_results = {
            'passed': True,
            'checks': [],
            'warnings': [],
            'errors': []
        }
        
        # Check for null values
        if 'null_threshold' in rules:
            for column, threshold in rules['null_threshold'].items():
                if column in df.columns:
                    null_pct = df[column].isnull().mean()
                    if null_pct > threshold:
                        validation_results['errors'].append(
                            f"Column '{column}' has {null_pct:.2%} null values "
                            f"(threshold: {threshold:.2%})"
                        )
                        validation_results['passed'] = False
        
        # Check data types
        if 'expected_types' in rules:
            for column, expected_type in rules['expected_types'].items():
                if column in df.columns:
                    actual_type = str(df[column].dtype)
                    if actual_type != expected_type:
                        validation_results['warnings'].append(
                            f"Column '{column}' has type '{actual_type}', "
                            f"expected '{expected_type}'"
                        )
        
        # Check value ranges
        if 'value_ranges' in rules:
            for column, (min_val, max_val) in rules['value_ranges'].items():
                if column in df.columns and pd.api.types.is_numeric_dtype(df[column]):
                    out_of_range = df[
                        (df[column] < min_val) | (df[column] > max_val)
                    ]
                    if len(out_of_range) > 0:
                        validation_results['warnings'].append(
                            f"Column '{column}' has {len(out_of_range)} values "
                            f"outside range [{min_val}, {max_val}]"
                        )
        
        # Check for duplicates
        if 'unique_columns' in rules:
            for column in rules['unique_columns']:
                if column in df.columns:
                    duplicates = df[column].duplicated().sum()
                    if duplicates > 0:
                        validation_results['warnings'].append(
                            f"Column '{column}' has {duplicates} duplicate values"
                        )
        
        return validation_results
    
    def create_checkpoint(self, df: pd.DataFrame, step_name: str) -> str:
        """Create a checkpoint of the DataFrame for recovery."""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        checkpoint_id = f"{step_name}_{timestamp}"
        
        # Save to parquet for efficiency
        checkpoint_path = f"checkpoints/{checkpoint_id}.parquet"
        df.to_parquet(checkpoint_path, index=False)
        
        # Save metadata
        metadata = {
            'checkpoint_id': checkpoint_id,
            'timestamp': timestamp,
            'step_name': step_name,
            'shape': df.shape,
            'columns': df.columns.tolist(),
            'path': checkpoint_path
        }
        
        with open(f"checkpoints/{checkpoint_id}_meta.json", 'w') as f:
            json.dump(metadata, f, indent=2)
        
        logger.info(f"Created checkpoint: {checkpoint_id}")
        return checkpoint_id
    
    def recover_from_checkpoint(self, checkpoint_id: str) -> pd.DataFrame:
        """Recover DataFrame from a checkpoint."""
        checkpoint_path = f"checkpoints/{checkpoint_id}.parquet"
        
        if not os.path.exists(checkpoint_path):
            raise FileNotFoundError(f"Checkpoint not found: {checkpoint_id}")
        
        df = pd.read_parquet(checkpoint_path)
        logger.info(f"Recovered from checkpoint: {checkpoint_id}")
        return df

# Example usage with decorators
error_handler = ETLErrorHandler(max_retries=3)

@error_handler.retry_with_backoff
def extract_from_api(url: str, params: Dict = None) -> pd.DataFrame:
    """Extract data from API with automatic retry."""
    response = requests.get(url, params=params, timeout=30)
    response.raise_for_status()
    return pd.DataFrame(response.json())

def send_slack_alert(error_report: Dict):
    """Send error alert to Slack."""
    webhook_url = os.getenv('SLACK_WEBHOOK_URL')
    if not webhook_url:
        return
    
    message = {
        "text": f"🚨 ETL Pipeline Error: {error_report['operation']}",
        "attachments": [{
            "color": "danger",
            "fields": [
                {"title": "Error", "value": str(error_report['error'])[:1000]},
                {"title": "Timestamp", "value": error_report['timestamp']},
                {"title": "Operation", "value": error_report['operation']}
            ]
        }]
    }
    
    requests.post(webhook_url, json=message)

error_handler.add_alert_callback(send_slack_alert)

Part 4: Scheduling and Orchestration with Airflow

Now let's integrate our agent into an Airflow DAG for scheduling:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import Variable
import pandas as pd
from typing import Dict, Any

# Import our ETL components
from etl_agent.data_connector import DataSourceConnector, DataSourceConfig, DataSourceType
from etl_agent.transformation_agent import DataTransformationAgent
from etl_agent.error_handler import ETLErrorHandler

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
}

def create_etl_dag(
    dag_id: str,
    schedule: str,
    source_config: Dict[str, Any],
    target_config: Dict[str, Any],
    transformation_rules: Dict[str, Any],
    **kwargs
):
    """Factory function to create ETL DAGs."""
    
    dag = DAG(
        dag_id=dag_id,
        default_args=default_args,
        description=f'AI-powered ETL pipeline for {dag_id}',
        schedule_interval=schedule,
        start_date=datetime(2024, 1, 1),
        catchup=False,
        max_active_runs=1,
        tags=['etl', 'ai-agent'],
    )
    
    def extract_task(**context):
        """Extract data from source."""
        connector = DataSourceConnector()
        config = DataSourceConfig(**source_config)
        
        # Get the extraction query/endpoint from Airflow variables
        extraction_query = Variable.get(f"{dag_id}_extraction_query", default_var=None)
        
        if config.source_type in [DataSourceType.POSTGRESQL, DataSourceType.MYSQL]:
            df = connector.extract_sql_data(config, extraction_query)
        elif config.source_type == DataSourceType.S3:
            df = connector.extract_s3_data(
                config,
                file_key=source_config.get('file_key'),
                file_format=source_config.get('file_format', 'csv')
            )
        elif config.source_type == DataSourceType.REST_API:
            df = connector.extract_api_data(
                config,
                endpoint=source_config.get('endpoint'),
                method=source_config.get('method', 'GET'),
                pagination_key=source_config.get('pagination_key')
            )
        else:
            raise ValueError(f"Unsupported source type: {config.source_type}")
        
        # Store in XCom for next task
        context['ti'].xcom_push(key='raw_data', value=df.to_json())
        context['ti'].xcom_push(key='record_count', value=len(df))
        
        return f"Extracted {len(df)} records"
    
    def transform_task(**context):
        """Transform data using AI agent."""
        # Get raw data from previous task
        raw_data_json = context['ti'].xcom_pull(key='raw_data', task_ids='extract')
        df = pd.read_json(raw_data_json)
        
        # Initialize transformation agent
        agent = DataTransformationAgent()
        agent.load_data(df)
        
        # Get transformation rules (could be from Airflow variable or config)
        if transformation_rules.get('use_ai_suggestions', False):
            # Let AI suggest transformations
            suggestions = agent.suggest_transformations()
            logger.info(f"AI suggested {len(suggestions)} transformations")
            
            # Apply suggested transformations
            transformed_df = agent.apply_transformation_plan(suggestions)
        else:
            # Apply predefined transformations
            transformed_df = agent.apply_transformation_plan(
                transformation_rules.get('steps', [])
            )
        
        # Validate data quality
        quality_rules = transformation_rules.get('quality_rules', {})
        validation_results = error_handler.validate_data_quality(
            transformed_df, quality_rules
        )
        
        if not validation_results['passed']:
            error_msg = f"Data quality check failed: {validation_results['errors']}"
            logger.error(error_msg)
            raise ValueError(error_msg)
        
        # Store transformed data
        context['ti'].xcom_push(
            key='transformed_data', 
            value=transformed_df.to_json()
        )
        context['ti'].xcom_push(
            key='transformation_history',
            value=agent.transformation_history
        )
        
        return f"Transformed data. Shape: {transformed_df.shape}"
    
    def load_task(**context):
        """Load data to target."""
        transformed_data_json = context['ti'].xcom_pull(
            key='transformed_data', 
            task_ids='transform'
        )
        df = pd.read_json(transformed_data_json)
        
        connector = DataSourceConnector()
        target_config_obj = DataSourceConfig(**target_config)
        
        # Get load strategy from config
        load_strategy = target_config.get('strategy', 'append')
        target_table = target_config.get('table_name')
        
        if load_strategy == 'replace':
            # Replace entire table
            df.to_sql(
                target_table,
                connector.get_engine(target_config_obj),
                if_exists='replace',
                index=False
            )
        elif load_strategy == 'append':
            # Append to existing table
            df.to_sql(
                target_table,
                connector.get_engine(target_config_obj),
                if_exists='append',
                index=False
            )
        elif load_strategy == 'upsert':
            # Upsert (insert or update)
            # This requires more complex logic, often database-specific
            pass
        
        return f"Loaded {len(df)} records to {target_table}"
    
    def check_data_quality(**context):
        """Check if data quality meets thresholds."""
        record_count = context['ti'].xcom_pull(key='record_count', task_ids='extract')
        
        # Get quality thresholds from Airflow variables
        min_records = int(Variable.get(f"{dag_id}_min_records", default_var=100))
        
        if record_count < min_records:
            return 'quality_failed'
        else:
            return 'quality_passed'
    
    # Define tasks
    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_task,
        dag=dag,
    )
    
    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_task,
        dag=dag,
    )
    
    load = PythonOperator(
        task_id='load',
        python_callable=load_task,
        dag=dag,
    )
    
    quality_check = BranchPythonOperator(
        task_id='quality_check',
        python_callable=check_data_quality,
        dag=dag,
    )
    
    quality_passed = DummyOperator(
        task_id='quality_passed',
        dag=dag,
    )
    
    quality_failed = DummyOperator(
        task_id='quality_failed',
        dag=dag,
        trigger_rule=TriggerRule.ONE_FAILED,
    )
    
    send_alert = PythonOperator(
        task_id='send_alert',
        python_callable=lambda: send_slack_alert({
            'operation': dag_id,
            'error': 'Data quality check failed',
            'timestamp': datetime.now().isoformat()
        }),
        dag=dag,
        trigger_rule=TriggerRule.ONE_FAILED,
    )
    
    # Define task dependencies
    extract >> quality_check
    
    quality_check >> quality_passed >> transform >> load
    quality_check >> quality_failed >> send_alert
    
    return dag

# Create specific DAGs
sales_etl_dag = create_etl_dag(
    dag_id='sales_data_pipeline',
    schedule='0 2 * * *',  # Daily at 2 AM
    source_config={
        'source_type': DataSourceType.POSTGRESQL,
        'connection_params': {
            'host': 'sales-db.example.com',
            'port': 5432,
            'database': 'sales',
            'user': Variable.get('SALES_DB_USER'),
            'password': Variable.get('SALES_DB_PASSWORD'),
        },
        'name': 'sales_database',
    },
    target_config={
        'source_type': DataSourceType.POSTGRESQL,
        'connection_params': {
            'host': '

Keywords

AI agentdata-agents