Building an ETL Pipeline Agent with LangChain and Pandas
Marcus Rivera
Full-stack developer and agent builder. Covers coding assistants and dev tools.
ETL (Extract, Transform, Load) pipelines have traditionally been rigid, code-heavy systems that break when data sources change. What if your pipeline could adapt to schema changes, intelligently handl...
Building an AI-Powered ETL Agent: From Data Sources to Production Pipelines
ETL (Extract, Transform, Load) pipelines have traditionally been rigid, code-heavy systems that break when data sources change. What if your pipeline could adapt to schema changes, intelligently handle unexpected data formats, and self-heal when errors occur? This tutorial walks you through building an AI agent that does exactly that—combining traditional ETL patterns with modern AI capabilities.
We'll build a production-ready agent using Python, LangChain, and Apache Airflow that connects to multiple data sources, applies intelligent transformations, handles errors gracefully, and runs on a schedule.
Architecture Overview
Our AI ETL agent consists of four core components:
┌─────────────┐ ┌─────────────────┐ ┌─────────────┐
│ Data │ │ AI Transform │ │ Data │
│ Sources │────▶│ Engine │────▶│ Warehouse │
│ (Extract) │ │ (Transform) │ │ (Load) │
└─────────────┘ └─────────────────┘ └─────────────┘
│ │ │
└────────────────────┴───────────────────────┘
│
┌──────┴──────┐
│ Orchestrator │
│ (Airflow) │
└───────────────┘
The agent uses:
- LangChain for AI reasoning and tool execution
- SQLAlchemy for database connections
- Pandas for data manipulation
- Apache Airflow for scheduling and orchestration
- Pydantic for data validation
Part 1: Data Source Connection Layer
First, let's build a flexible connection manager that handles multiple data sources:
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass
from enum import Enum
import pandas as pd
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine import Engine
import boto3
from google.cloud import bigquery
import requests
from io import StringIO
import json
class DataSourceType(Enum):
POSTGRESQL = "postgresql"
MYSQL = "mysql"
S3 = "s3"
GCS = "gcs"
REST_API = "rest_api"
CSV_FILE = "csv_file"
BIGQUERY = "bigquery"
@dataclass
class DataSourceConfig:
"""Configuration for a data source."""
source_type: DataSourceType
connection_params: Dict[str, Any]
name: str
description: str = ""
retry_attempts: int = 3
timeout: int = 30
class DataSourceConnector:
"""Handles connections to various data sources with retry logic."""
def __init__(self):
self._engines: Dict[str, Engine] = {}
self._clients: Dict[str, Any] = {}
def get_engine(self, config: DataSourceConfig) -> Engine:
"""Get or create a SQLAlchemy engine with connection pooling."""
if config.name not in self._engines:
if config.source_type == DataSourceType.POSTGRESQL:
conn_str = (
f"postgresql://{config.connection_params['user']}:"
f"{config.connection_params['password']}@"
f"{config.connection_params['host']}:"
f"{config.connection_params['port']}/"
f"{config.connection_params['database']}"
)
elif config.source_type == DataSourceType.MYSQL:
conn_str = (
f"mysql+pymysql://{config.connection_params['user']}:"
f"{config.connection_params['password']}@"
f"{config.connection_params['host']}:"
f"{config.connection_params['port']}/"
f"{config.connection_params['database']}"
)
else:
raise ValueError(f"Unsupported SQL source: {config.source_type}")
self._engines[config.name] = create_engine(
conn_str,
pool_size=5,
max_overflow=10,
pool_pre_ping=True # Verify connections before use
)
return self._engines[config.name]
def extract_sql_data(
self,
config: DataSourceConfig,
query: str,
params: Optional[Dict] = None
) -> pd.DataFrame:
"""Extract data from SQL databases with automatic retry."""
import time
engine = self.get_engine(config)
for attempt in range(config.retry_attempts):
try:
with engine.connect() as conn:
df = pd.read_sql(
query,
conn,
params=params,
parse_dates=True
)
return df
except Exception as e:
if attempt == config.retry_attempts - 1:
raise
print(f"Attempt {attempt + 1} failed: {e}. Retrying...")
time.sleep(2 ** attempt) # Exponential backoff
def extract_s3_data(
self,
config: DataSourceConfig,
file_key: str,
file_format: str = "csv"
) -> pd.DataFrame:
"""Extract data from S3 with intelligent format detection."""
s3_client = boto3.client(
's3',
aws_access_key_id=config.connection_params['aws_access_key_id'],
aws_secret_access_key=config.connection_params['aws_secret_access_key'],
region_name=config.connection_params.get('region', 'us-east-1')
)
response = s3_client.get_object(
Bucket=config.connection_params['bucket'],
Key=file_key
)
content = response['Body'].read().decode('utf-8')
if file_format == "csv":
return pd.read_csv(StringIO(content))
elif file_format == "json":
return pd.read_json(StringIO(content))
elif file_format == "parquet":
import pyarrow.parquet as pq
import io
return pq.read_table(io.BytesIO(content.encode())).to_pandas()
else:
# Let AI infer the format
return self._infer_and_parse(content, file_key)
def _infer_and_parse(self, content: str, filename: str) -> pd.DataFrame:
"""Use heuristics to determine file format."""
if content.strip().startswith('{') or content.strip().startswith('['):
return pd.read_json(StringIO(content))
elif ',' in content.split('\n')[0]:
return pd.read_csv(StringIO(content))
else:
# Try space-separated
return pd.read_csv(StringIO(content), sep=r'\s+')
def extract_api_data(
self,
config: DataSourceConfig,
endpoint: str,
method: str = "GET",
headers: Optional[Dict] = None,
params: Optional[Dict] = None,
pagination_key: Optional[str] = None
) -> pd.DataFrame:
"""Extract data from REST APIs with pagination support."""
all_data = []
url = f"{config.connection_params['base_url']}{endpoint}"
while url:
response = requests.request(
method=method,
url=url,
headers=headers or config.connection_params.get('headers', {}),
params=params,
timeout=config.timeout
)
response.raise_for_status()
data = response.json()
if isinstance(data, list):
all_data.extend(data)
elif isinstance(data, dict):
all_data.append(data)
# Handle pagination
if pagination_key and pagination_key in data:
url = data[pagination_key]
params = {} # Reset params for subsequent requests
else:
url = None
return pd.DataFrame(all_data)
def introspect_source(self, config: DataSourceConfig) -> Dict[str, Any]:
"""Use AI to understand data source structure."""
if config.source_type in [DataSourceType.POSTGRESQL, DataSourceType.MYSQL]:
engine = self.get_engine(config)
inspector = inspect(engine)
tables = inspector.get_table_names()
schema_info = {}
for table in tables[:10]: # Limit to first 10 tables
columns = inspector.get_columns(table)
schema_info[table] = [
{
'name': col['name'],
'type': str(col['type']),
'nullable': col.get('nullable', True)
}
for col in columns
]
return {
'type': 'sql',
'tables': tables,
'schema': schema_info,
'sample_query': f"SELECT * FROM {tables[0]} LIMIT 5" if tables else None
}
return {'type': 'unknown', 'message': 'Introspection not implemented for this source'}
Part 2: AI Transformation Engine
Now let's build the intelligent transformation layer that uses LLMs to understand and transform data:
from langchain.agents import AgentExecutor, create_structured_chat_agent
from langchain.tools import BaseTool, StructuredTool, tool
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
import pandas as pd
from typing import List, Dict, Any, Optional
import numpy as np
class DataTransformationAgent:
"""AI agent for intelligent data transformations."""
def __init__(self, llm: ChatOpenAI = None):
self.llm = llm or ChatOpenAI(
model="gpt-4-turbo-preview",
temperature=0.1 # Low temperature for consistent transformations
)
self.df: Optional[pd.DataFrame] = None
self.transformation_history: List[Dict] = []
def load_data(self, df: pd.DataFrame, sample_size: int = 1000):
"""Load data and create a sample for AI analysis."""
self.df = df
self.sample = df.head(sample_size)
self._analyze_data()
def _analyze_data(self):
"""Analyze data structure and quality."""
analysis = {
'shape': self.df.shape,
'dtypes': self.df.dtypes.to_dict(),
'missing_values': self.df.isnull().sum().to_dict(),
'numeric_stats': {},
'categorical_stats': {}
}
# Get statistics for numeric columns
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
analysis['numeric_stats'][col] = {
'mean': self.df[col].mean(),
'std': self.df[col].std(),
'min': self.df[col].min(),
'max': self.df[col].max()
}
# Get value counts for categorical columns
categorical_cols = self.df.select_dtypes(include=['object', 'category']).columns
for col in categorical_cols:
if self.df[col].nunique() < 20: # Only for low-cardinality columns
analysis['categorical_stats'][col] = self.df[col].value_counts().head(10).to_dict()
self.data_analysis = analysis
def create_transformation_tools(self) -> List[BaseTool]:
"""Create tools for the AI agent to use."""
@tool
def rename_columns(mapping: Dict[str, str]) -> str:
"""Rename columns in the DataFrame.
Args:
mapping: Dictionary of {old_name: new_name}
"""
self.df = self.df.rename(columns=mapping)
self.transformation_history.append({
'operation': 'rename_columns',
'params': {'mapping': mapping}
})
return f"Renamed columns: {mapping}"
@tool
def convert_column_type(column: str, target_type: str) -> str:
"""Convert column data type.
Args:
column: Column name to convert
target_type: Target type (str, int, float, datetime, category)
"""
try:
if target_type == 'datetime':
self.df[column] = pd.to_datetime(self.df[column])
elif target_type == 'category':
self.df[column] = self.df[column].astype('category')
else:
self.df[column] = self.df[column].astype(target_type)
self.transformation_history.append({
'operation': 'convert_column_type',
'params': {'column': column, 'target_type': target_type}
})
return f"Converted {column} to {target_type}"
except Exception as e:
return f"Error converting {column}: {str(e)}"
@tool
def fill_missing_values(
column: str,
method: str = "mean",
fill_value: Optional[Any] = None
) -> str:
"""Fill missing values in a column.
Args:
column: Column name
method: Method to use (mean, median, mode, forward, backward, value)
fill_value: Value to use if method is 'value'
"""
if method == "mean":
self.df[column] = self.df[column].fillna(self.df[column].mean())
elif method == "median":
self.df[column] = self.df[column].fillna(self.df[column].median())
elif method == "mode":
self.df[column] = self.df[column].fillna(self.df[column].mode()[0])
elif method == "forward":
self.df[column] = self.df[column].fillna(method='ffill')
elif method == "backward":
self.df[column] = self.df[column].fillna(method='bfill')
elif method == "value":
self.df[column] = self.df[column].fillna(fill_value)
self.transformation_history.append({
'operation': 'fill_missing_values',
'params': {'column': column, 'method': method, 'fill_value': fill_value}
})
return f"Filled missing values in {column} using {method}"
@tool
def create_derived_column(
new_column: str,
expression: str,
description: str = ""
) -> str:
"""Create a new column based on an expression.
Args:
new_column: Name for the new column
expression: Python expression using column names (use df['col'] syntax)
description: Description of what the column represents
"""
try:
# Safety check - only allow certain operations
allowed_names = {'df', 'pd', 'np', 'col'}
allowed_funcs = {'abs', 'round', 'int', 'float', 'str', 'len'}
self.df[new_column] = eval(expression, {"__builtins__": {}}, {
'df': self.df,
'pd': pd,
'np': np,
'col': self.df.columns.tolist()
})
self.transformation_history.append({
'operation': 'create_derived_column',
'params': {
'new_column': new_column,
'expression': expression,
'description': description
}
})
return f"Created new column: {new_column}"
except Exception as e:
return f"Error creating column: {str(e)}"
@tool
def filter_rows(condition: str) -> str:
"""Filter rows based on a condition.
Args:
condition: Python condition using column names
"""
try:
initial_count = len(self.df)
self.df = self.df.query(condition)
removed = initial_count - len(self.df)
self.transformation_history.append({
'operation': 'filter_rows',
'params': {'condition': condition}
})
return f"Filtered rows. Removed {removed} rows."
except Exception as e:
return f"Error filtering rows: {str(e)}"
@tool
def normalize_column(column: str, method: str = "minmax") -> str:
"""Normalize a numeric column.
Args:
column: Column to normalize
method: Normalization method (minmax, zscore, log)
"""
if method == "minmax":
min_val = self.df[column].min()
max_val = self.df[column].max()
self.df[column] = (self.df[column] - min_val) / (max_val - min_val)
elif method == "zscore":
mean = self.df[column].mean()
std = self.df[column].std()
self.df[column] = (self.df[column] - mean) / std
elif method == "log":
self.df[column] = np.log1p(self.df[column])
self.transformation_history.append({
'operation': 'normalize_column',
'params': {'column': column, 'method': method}
})
return f"Normalized {column} using {method}"
return [
rename_columns,
convert_column_type,
fill_missing_values,
create_derived_column,
filter_rows,
normalize_column
]
def create_transformation_agent(self) -> AgentExecutor:
"""Create the AI transformation agent."""
tools = self.create_transformation_tools()
system_prompt = """You are an expert data transformation agent. Your job is to clean,
transform, and prepare data for analysis. You have access to a pandas DataFrame.
Current DataFrame Info:
- Shape: {shape}
- Columns: {columns}
- Data Types: {dtypes}
- Missing Values: {missing_values}
Sample Data:
{sample}
When transforming data:
1. First analyze the data structure and quality issues
2. Suggest a transformation plan
3. Execute transformations step by step
4. Validate results after each step
Always explain your reasoning and ask for confirmation before making destructive changes."""
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt.format(
shape=self.df.shape,
columns=self.df.columns.tolist(),
dtypes=self.df.dtypes.to_dict(),
missing_values=self.df.isnull().sum().to_dict(),
sample=self.sample.head(5).to_string()
)),
MessagesPlaceholder(variable_name="chat_history", optional=True),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
agent = create_structured_chat_agent(self.llm, tools, prompt)
return AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
handle_parsing_errors=True,
max_iterations=15
)
def suggest_transformations(self) -> List[Dict[str, str]]:
"""Use AI to suggest transformations based on data analysis."""
prompt = f"""Based on this data analysis, suggest transformations to clean and prepare the data:
Data Shape: {self.data_analysis['shape']}
Missing Values: {self.data_analysis['missing_values']}
Numeric Stats: {self.data_analysis['numeric_stats']}
Categorical Stats: {self.data_analysis['categorical_stats']}
Provide a JSON array of transformation suggestions with:
- operation: The transformation to apply
- column: Column to transform (if applicable)
- params: Parameters for the transformation
- reason: Why this transformation is needed
Focus on:
1. Handling missing values
2. Fixing data types
3. Normalizing/standardizing data
4. Creating useful derived columns
5. Removing outliers if necessary"""
response = self.llm.invoke(prompt)
# Parse the JSON response
try:
import json
# Extract JSON from response
json_str = response.content
if "```json" in json_str:
json_str = json_str.split("```json")[1].split("```")[0]
elif "```" in json_str:
json_str = json_str.split("```")[1].split("```")[0]
suggestions = json.loads(json_str)
return suggestions
except:
return [{'error': 'Could not parse AI response', 'response': response.content}]
def apply_transformation_plan(self, plan: List[Dict]) -> pd.DataFrame:
"""Apply a transformation plan to the DataFrame."""
results = []
for step in plan:
operation = step.get('operation')
column = step.get('column')
params = step.get('params', {})
try:
if operation == 'fill_missing':
method = params.get('method', 'mean')
fill_value = params.get('fill_value')
self.df[column] = self.df[column].fillna(
self.df[column].mean() if method == 'mean' else
self.df[column].median() if method == 'median' else
self.df[column].mode()[0] if method == 'mode' else
fill_value
)
results.append({'step': step, 'status': 'success'})
elif operation == 'convert_type':
target_type = params.get('target_type')
self.df[column] = self.df[column].astype(target_type)
results.append({'step': step, 'status': 'success'})
elif operation == 'normalize':
method = params.get('method', 'minmax')
if method == 'minmax':
min_val = self.df[column].min()
max_val = self.df[column].max()
self.df[column] = (self.df[column] - min_val) / (max_val - min_val)
results.append({'step': step, 'status': 'success'})
except Exception as e:
results.append({'step': step, 'status': 'error', 'error': str(e)})
return self.df
Part 3: Error Handling and Recovery
Robust error handling is critical for production ETL pipelines:
import logging
from datetime import datetime
from typing import Callable, Any
import traceback
from functools import wraps
import json
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('etl_agent.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('ETLAgent')
class ETLErrorHandler:
"""Comprehensive error handling for ETL operations."""
def __init__(self, max_retries: int = 3, backoff_factor: float = 2.0):
self.max_retries = max_retries
self.backoff_factor = backoff_factor
self.error_log = []
self.alert_callbacks = []
def retry_with_backoff(self, func: Callable) -> Callable:
"""Decorator for automatic retry with exponential backoff."""
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(self.max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
wait_time = self.backoff_factor ** attempt
error_info = {
'function': func.__name__,
'attempt': attempt + 1,
'error': str(e),
'traceback': traceback.format_exc(),
'timestamp': datetime.now().isoformat(),
'args': str(args),
'kwargs': str(kwargs)
}
self.error_log.append(error_info)
logger.warning(
f"Attempt {attempt + 1}/{self.max_retries} failed for {func.__name__}: {e}. "
f"Retrying in {wait_time:.1f} seconds..."
)
if attempt < self.max_retries - 1:
import time
time.sleep(wait_time)
# All retries failed
self._handle_critical_error(func.__name__, last_exception)
raise last_exception
return wrapper
def _handle_critical_error(self, operation: str, error: Exception):
"""Handle critical errors that couldn't be resolved."""
error_report = {
'operation': operation,
'error': str(error),
'full_traceback': traceback.format_exc(),
'timestamp': datetime.now().isoformat(),
'context': 'ETL Pipeline Critical Failure'
}
logger.error(f"CRITICAL ERROR in {operation}: {error}")
# Send alerts
for callback in self.alert_callbacks:
try:
callback(error_report)
except Exception as alert_error:
logger.error(f"Failed to send alert: {alert_error}")
def add_alert_callback(self, callback: Callable):
"""Add a callback function for error alerts."""
self.alert_callbacks.append(callback)
def validate_data_quality(self, df: pd.DataFrame, rules: Dict[str, Any]) -> Dict[str, Any]:
"""Validate data quality against defined rules."""
validation_results = {
'passed': True,
'checks': [],
'warnings': [],
'errors': []
}
# Check for null values
if 'null_threshold' in rules:
for column, threshold in rules['null_threshold'].items():
if column in df.columns:
null_pct = df[column].isnull().mean()
if null_pct > threshold:
validation_results['errors'].append(
f"Column '{column}' has {null_pct:.2%} null values "
f"(threshold: {threshold:.2%})"
)
validation_results['passed'] = False
# Check data types
if 'expected_types' in rules:
for column, expected_type in rules['expected_types'].items():
if column in df.columns:
actual_type = str(df[column].dtype)
if actual_type != expected_type:
validation_results['warnings'].append(
f"Column '{column}' has type '{actual_type}', "
f"expected '{expected_type}'"
)
# Check value ranges
if 'value_ranges' in rules:
for column, (min_val, max_val) in rules['value_ranges'].items():
if column in df.columns and pd.api.types.is_numeric_dtype(df[column]):
out_of_range = df[
(df[column] < min_val) | (df[column] > max_val)
]
if len(out_of_range) > 0:
validation_results['warnings'].append(
f"Column '{column}' has {len(out_of_range)} values "
f"outside range [{min_val}, {max_val}]"
)
# Check for duplicates
if 'unique_columns' in rules:
for column in rules['unique_columns']:
if column in df.columns:
duplicates = df[column].duplicated().sum()
if duplicates > 0:
validation_results['warnings'].append(
f"Column '{column}' has {duplicates} duplicate values"
)
return validation_results
def create_checkpoint(self, df: pd.DataFrame, step_name: str) -> str:
"""Create a checkpoint of the DataFrame for recovery."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
checkpoint_id = f"{step_name}_{timestamp}"
# Save to parquet for efficiency
checkpoint_path = f"checkpoints/{checkpoint_id}.parquet"
df.to_parquet(checkpoint_path, index=False)
# Save metadata
metadata = {
'checkpoint_id': checkpoint_id,
'timestamp': timestamp,
'step_name': step_name,
'shape': df.shape,
'columns': df.columns.tolist(),
'path': checkpoint_path
}
with open(f"checkpoints/{checkpoint_id}_meta.json", 'w') as f:
json.dump(metadata, f, indent=2)
logger.info(f"Created checkpoint: {checkpoint_id}")
return checkpoint_id
def recover_from_checkpoint(self, checkpoint_id: str) -> pd.DataFrame:
"""Recover DataFrame from a checkpoint."""
checkpoint_path = f"checkpoints/{checkpoint_id}.parquet"
if not os.path.exists(checkpoint_path):
raise FileNotFoundError(f"Checkpoint not found: {checkpoint_id}")
df = pd.read_parquet(checkpoint_path)
logger.info(f"Recovered from checkpoint: {checkpoint_id}")
return df
# Example usage with decorators
error_handler = ETLErrorHandler(max_retries=3)
@error_handler.retry_with_backoff
def extract_from_api(url: str, params: Dict = None) -> pd.DataFrame:
"""Extract data from API with automatic retry."""
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
return pd.DataFrame(response.json())
def send_slack_alert(error_report: Dict):
"""Send error alert to Slack."""
webhook_url = os.getenv('SLACK_WEBHOOK_URL')
if not webhook_url:
return
message = {
"text": f"🚨 ETL Pipeline Error: {error_report['operation']}",
"attachments": [{
"color": "danger",
"fields": [
{"title": "Error", "value": str(error_report['error'])[:1000]},
{"title": "Timestamp", "value": error_report['timestamp']},
{"title": "Operation", "value": error_report['operation']}
]
}]
}
requests.post(webhook_url, json=message)
error_handler.add_alert_callback(send_slack_alert)
Part 4: Scheduling and Orchestration with Airflow
Now let's integrate our agent into an Airflow DAG for scheduling:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import Variable
import pandas as pd
from typing import Dict, Any
# Import our ETL components
from etl_agent.data_connector import DataSourceConnector, DataSourceConfig, DataSourceType
from etl_agent.transformation_agent import DataTransformationAgent
from etl_agent.error_handler import ETLErrorHandler
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
}
def create_etl_dag(
dag_id: str,
schedule: str,
source_config: Dict[str, Any],
target_config: Dict[str, Any],
transformation_rules: Dict[str, Any],
**kwargs
):
"""Factory function to create ETL DAGs."""
dag = DAG(
dag_id=dag_id,
default_args=default_args,
description=f'AI-powered ETL pipeline for {dag_id}',
schedule_interval=schedule,
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
tags=['etl', 'ai-agent'],
)
def extract_task(**context):
"""Extract data from source."""
connector = DataSourceConnector()
config = DataSourceConfig(**source_config)
# Get the extraction query/endpoint from Airflow variables
extraction_query = Variable.get(f"{dag_id}_extraction_query", default_var=None)
if config.source_type in [DataSourceType.POSTGRESQL, DataSourceType.MYSQL]:
df = connector.extract_sql_data(config, extraction_query)
elif config.source_type == DataSourceType.S3:
df = connector.extract_s3_data(
config,
file_key=source_config.get('file_key'),
file_format=source_config.get('file_format', 'csv')
)
elif config.source_type == DataSourceType.REST_API:
df = connector.extract_api_data(
config,
endpoint=source_config.get('endpoint'),
method=source_config.get('method', 'GET'),
pagination_key=source_config.get('pagination_key')
)
else:
raise ValueError(f"Unsupported source type: {config.source_type}")
# Store in XCom for next task
context['ti'].xcom_push(key='raw_data', value=df.to_json())
context['ti'].xcom_push(key='record_count', value=len(df))
return f"Extracted {len(df)} records"
def transform_task(**context):
"""Transform data using AI agent."""
# Get raw data from previous task
raw_data_json = context['ti'].xcom_pull(key='raw_data', task_ids='extract')
df = pd.read_json(raw_data_json)
# Initialize transformation agent
agent = DataTransformationAgent()
agent.load_data(df)
# Get transformation rules (could be from Airflow variable or config)
if transformation_rules.get('use_ai_suggestions', False):
# Let AI suggest transformations
suggestions = agent.suggest_transformations()
logger.info(f"AI suggested {len(suggestions)} transformations")
# Apply suggested transformations
transformed_df = agent.apply_transformation_plan(suggestions)
else:
# Apply predefined transformations
transformed_df = agent.apply_transformation_plan(
transformation_rules.get('steps', [])
)
# Validate data quality
quality_rules = transformation_rules.get('quality_rules', {})
validation_results = error_handler.validate_data_quality(
transformed_df, quality_rules
)
if not validation_results['passed']:
error_msg = f"Data quality check failed: {validation_results['errors']}"
logger.error(error_msg)
raise ValueError(error_msg)
# Store transformed data
context['ti'].xcom_push(
key='transformed_data',
value=transformed_df.to_json()
)
context['ti'].xcom_push(
key='transformation_history',
value=agent.transformation_history
)
return f"Transformed data. Shape: {transformed_df.shape}"
def load_task(**context):
"""Load data to target."""
transformed_data_json = context['ti'].xcom_pull(
key='transformed_data',
task_ids='transform'
)
df = pd.read_json(transformed_data_json)
connector = DataSourceConnector()
target_config_obj = DataSourceConfig(**target_config)
# Get load strategy from config
load_strategy = target_config.get('strategy', 'append')
target_table = target_config.get('table_name')
if load_strategy == 'replace':
# Replace entire table
df.to_sql(
target_table,
connector.get_engine(target_config_obj),
if_exists='replace',
index=False
)
elif load_strategy == 'append':
# Append to existing table
df.to_sql(
target_table,
connector.get_engine(target_config_obj),
if_exists='append',
index=False
)
elif load_strategy == 'upsert':
# Upsert (insert or update)
# This requires more complex logic, often database-specific
pass
return f"Loaded {len(df)} records to {target_table}"
def check_data_quality(**context):
"""Check if data quality meets thresholds."""
record_count = context['ti'].xcom_pull(key='record_count', task_ids='extract')
# Get quality thresholds from Airflow variables
min_records = int(Variable.get(f"{dag_id}_min_records", default_var=100))
if record_count < min_records:
return 'quality_failed'
else:
return 'quality_passed'
# Define tasks
extract = PythonOperator(
task_id='extract',
python_callable=extract_task,
dag=dag,
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_task,
dag=dag,
)
load = PythonOperator(
task_id='load',
python_callable=load_task,
dag=dag,
)
quality_check = BranchPythonOperator(
task_id='quality_check',
python_callable=check_data_quality,
dag=dag,
)
quality_passed = DummyOperator(
task_id='quality_passed',
dag=dag,
)
quality_failed = DummyOperator(
task_id='quality_failed',
dag=dag,
trigger_rule=TriggerRule.ONE_FAILED,
)
send_alert = PythonOperator(
task_id='send_alert',
python_callable=lambda: send_slack_alert({
'operation': dag_id,
'error': 'Data quality check failed',
'timestamp': datetime.now().isoformat()
}),
dag=dag,
trigger_rule=TriggerRule.ONE_FAILED,
)
# Define task dependencies
extract >> quality_check
quality_check >> quality_passed >> transform >> load
quality_check >> quality_failed >> send_alert
return dag
# Create specific DAGs
sales_etl_dag = create_etl_dag(
dag_id='sales_data_pipeline',
schedule='0 2 * * *', # Daily at 2 AM
source_config={
'source_type': DataSourceType.POSTGRESQL,
'connection_params': {
'host': 'sales-db.example.com',
'port': 5432,
'database': 'sales',
'user': Variable.get('SALES_DB_USER'),
'password': Variable.get('SALES_DB_PASSWORD'),
},
'name': 'sales_database',
},
target_config={
'source_type': DataSourceType.POSTGRESQL,
'connection_params': {
'host': '