Data-Wrangler-Agent

Use this agent to find data on the Internet to fit a use case or to generate synthetic data to match the use case

Installs: 0
Used in: 1 repos
Updated: 2d ago
$npx ai-builder add agent dominodatalab/Data-Wrangler-Agent

Installs to .claude/agents/Data-Wrangler-Agent.md

### System Prompt
```
You are a Senior Data Engineer with 12+ years of experience in enterprise data acquisition, synthesis, and preparation. You excel at locating, generating, and preparing data for ML workflows in Domino Data Lab.

## Core Competencies
- Python-based data engineering (pandas, numpy, polars)
- Web scraping and API integration with Python
- Synthetic data generation with realistic distributions
- Data quality assessment and remediation
- ETL/ELT pipeline development using Python frameworks
- Data versioning and lineage tracking
- Privacy-preserving data techniques

## Primary Responsibilities
1. Locate relevant datasets from public/private sources
2. Generate synthetic data matching business scenarios using Python libraries
3. Establish data connections in Domino
4. Implement data quality checks with Python (great_expectations, pandera)
5. Version datasets for reproducibility
6. Create data documentation and dictionaries

## Domino Integration Points
- Data source connections configuration
- Dataset versioning and storage
- Data quality monitoring setup
- Pipeline scheduling and automation
- Compute environment optimization

## Error Handling Approach
- Implement retry logic with exponential backoff
- Validate data at each transformation step
- Create data quality scorecards
- Maintain fallback data sources
- Log all data lineage information

## Output Standards
- Python notebooks (.ipynb) with clear documentation
- Python scripts (.py) with proper error handling
- Data quality reports with pandas profiling
- Synthetic data generation scripts in Python
- Data dictionaries in JSON/YAML format
- Reproducible Python-based data pipelines

## Professional Formatting Guidelines
- Use professional, business-appropriate language in all outputs
- Avoid emojis, emoticons, or decorative symbols in documentation
- Use standard markdown formatting for structure and emphasis
- Maintain formal tone appropriate for enterprise environments
- Use checkmarks (✓) and X marks (✗) for status indicators only when necessary
```

### Key Methods
```python
def acquire_or_generate_data(self, specifications):
    """Robust data acquisition with Python libraries and MLflow tracking"""
    import pandas as pd
    import numpy as np
    import mlflow
    import mlflow.pandas
    mlflow.set_tracking_uri("http://localhost:8768")
    from faker import Faker
    from sdv.synthetic_data import TabularSDG
    import json
    import os
    from datetime import datetime
    from pathlib import Path
    
    # Set up directory structure
    project_name = specifications.get('project', 'demo')
    stage = 'data_acquisition'
    
    # Create directories if they don't exist
    code_dir = Path(f"/mnt/code/{stage}")
    notebooks_dir = code_dir / "notebooks"
    scripts_dir = code_dir / "scripts"
    artifacts_dir = Path(f"/mnt/artifacts/{stage}")
    data_dir = Path(f"/mnt/data/{project_name}/{stage}")
    
    for directory in [notebooks_dir, scripts_dir, artifacts_dir, data_dir]:
        directory.mkdir(parents=True, exist_ok=True)
    
    # Initialize MLflow experiment
    experiment_name = f"data_acquisition_{project_name}"
    mlflow.set_experiment(experiment_name)
    
    with mlflow.start_run(run_name="data_acquisition_main") as run:
        mlflow.set_tag("stage", "data_wrangling")
        mlflow.set_tag("agent", "data_wrangler")
        mlflow.log_param("project_name", project_name)
        mlflow.log_param("data_directory", str(data_dir))
        
        data_sources = []
        
        # Primary: Try to locate real data using Python
        try:
            if specifications.get('real_data_preferred', True):
                mlflow.log_param("data_source", "real_data")
                mlflow.log_param("specifications", json.dumps(specifications))
                
                # Use pandas for data loading
                real_data = self.search_and_acquire_data_python(specifications)
                quality_score = self.validate_data_quality(real_data)
                
                mlflow.log_metric("data_quality_score", quality_score)
                mlflow.log_metric("n_rows", len(real_data))
                mlflow.log_metric("n_columns", len(real_data.columns))
                
                if quality_score > 0.8:
                    # Save data to project dataset
                    data_path = data_dir / "raw_data.parquet"
                    real_data.to_parquet(data_path)
                    
                    # Log dataset info to MLflow
                    mlflow.log_param("data_shape", str(real_data.shape))
                    mlflow.pandas.log_table(real_data.head(100), "data_sample.json")
                    mlflow.log_artifact(str(data_path))
                    
                    # Create and save data profile
                    profile_path = artifacts_dir / "data_profile.html"
                    self.create_data_profile(real_data, profile_path)
                    mlflow.log_artifact(str(profile_path))
                    
                    # Create test JSON file
                    test_json_path = artifacts_dir / "test_data.json"
                    test_json = real_data.head(5).to_dict(orient='records')
                    with open(test_json_path, "w") as f:
                        json.dump(test_json, f, indent=2)
                    mlflow.log_artifact(str(test_json_path))
                    
                    # Save data acquisition script to scripts directory
                    script_path = scripts_dir / "data_acquisition.py"
                    self.save_acquisition_script(specifications, script_path)
                    mlflow.log_artifact(str(script_path))

                    # Create Jupyter notebook for data exploration
                    notebook_path = notebooks_dir / "data_exploration.ipynb"
                    self.create_data_exploration_notebook(real_data, specifications, notebook_path)
                    mlflow.log_artifact(str(notebook_path))

                    # Create requirements.txt for this stage
                    requirements_path = code_dir / "requirements.txt"
                    with open(requirements_path, "w") as f:
                        f.write("pandas>=2.0.0\nnumpy>=1.24.0\nmlflow>=2.9.0\n")
                        f.write("faker>=20.0.0\nsdv>=1.0.0\njupyter>=1.0.0\nnbformat>=5.7.0\n")
                    mlflow.log_artifact(str(requirements_path))

                    mlflow.set_tag("data_acquisition_status", "success")
                    return real_data
                    
        except Exception as e:
            mlflow.log_param("real_data_error", str(e))
            self.log_warning(f"Real data acquisition failed: {e}")
        
        # Fallback: Generate synthetic data with Python libraries
        try:
            mlflow.log_param("data_source", "synthetic")
            
            # Use Python synthetic data libraries
            synthetic_params = self.infer_synthetic_parameters(specifications)
            mlflow.log_params(synthetic_params)
            
            # Generate using pandas and numpy
            synthetic_data = self.generate_synthetic_data_python(
                synthetic_params,
                use_libraries=['faker', 'sdv', 'numpy'],
                ensure_realistic=True,
                include_edge_cases=True
            )
            
            # Add controlled noise and outliers using numpy
            synthetic_data = self.add_realistic_imperfections(
                synthetic_data,
                missing_rate=0.05,
                outlier_rate=0.02
            )
            
            # Save synthetic data to project dataset
            synthetic_path = data_dir / "synthetic_data.parquet"
            synthetic_data.to_parquet(synthetic_path)
            
            # Log synthetic data metrics
            mlflow.log_metric("synthetic_rows", len(synthetic_data))
            mlflow.log_metric("synthetic_columns", len(synthetic_data.columns))
            mlflow.log_metric("missing_rate", 0.05)
            mlflow.log_metric("outlier_rate", 0.02)
            
            # Save artifacts
            mlflow.pandas.log_table(synthetic_data.head(100), "synthetic_sample.json")
            mlflow.log_artifact(str(synthetic_path))
            
            # Create test JSON
            test_json_path = artifacts_dir / "test_synthetic.json"
            test_json = synthetic_data.head(5).to_dict(orient='records')
            with open(test_json_path, "w") as f:
                json.dump(test_json, f, indent=2)
            mlflow.log_artifact(str(test_json_path))
            
            # Save generation script to scripts directory
            script_path = scripts_dir / "synthetic_generation.py"
            self.save_generation_script(synthetic_params, script_path)
            mlflow.log_artifact(str(script_path))

            # Create Jupyter notebook for synthetic data exploration
            notebook_path = notebooks_dir / "synthetic_data_exploration.ipynb"
            self.create_data_exploration_notebook(synthetic_data, specifications, notebook_path)
            mlflow.log_artifact(str(notebook_path))

            mlflow.set_tag("data_acquisition_status", "synthetic_success")
            return synthetic_data
            
        except Exception as e:
            mlflow.log_param("synthetic_data_error", str(e))
            # Ultimate fallback: Use cached pandas DataFrame
            self.log_error(f"Synthetic generation failed: {e}")
            mlflow.set_tag("data_acquisition_status", "fallback_cache")
            cached_path = data_dir / f"cached_{specifications.get('domain', 'default')}.parquet"
            return pd.read_parquet(cached_path)

def create_data_exploration_notebook(self, data, specifications, notebook_path):
    """Create a Jupyter notebook for data exploration and quality assessment"""
    import nbformat as nbf
    import json

    # Create new notebook
    nb = nbf.v4.new_notebook()

    # Add title cell
    data_type = "Synthetic" if specifications.get('synthetic_data', False) else "Real"
    title_cell = nbf.v4.new_markdown_cell(f"""
# {data_type} Data Exploration Report
Project: {specifications.get('project', 'Demo')}
Domain: {specifications.get('domain', 'General')}

## Overview
This notebook contains data exploration and quality assessment for the acquired dataset:
- Dataset characteristics and structure
- Data quality assessment
- Initial exploration and insights
- Recommendations for data preparation
""")
    nb.cells.append(title_cell)

    # Add imports cell
    imports_cell = nbf.v4.new_code_cell("""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

# Set plotting style
plt.style.use('default')
sns.set_palette("husl")
""")
    nb.cells.append(imports_cell)

    # Add data loading section
    project_name = specifications.get('project', 'demo')
    data_load_cell = nbf.v4.new_code_cell(f"""
# Load the acquired dataset
data_path = "/mnt/data/{project_name}/data_acquisition/raw_data.parquet"
if Path(data_path).exists():
    df = pd.read_parquet(data_path)
    print(f"Dataset loaded successfully!")
    print(f"Shape: {{df.shape}}")
    print(f"Memory usage: {{df.memory_usage(deep=True).sum() / 1024**2:.2f}} MB")
else:
    print("Dataset not found at expected location")

# Display first few rows
df.head()
""")
    nb.cells.append(data_load_cell)

    # Add data overview section
    overview_cell = nbf.v4.new_markdown_cell("## Dataset Overview")
    nb.cells.append(overview_cell)

    overview_code_cell = nbf.v4.new_code_cell("""
# Basic dataset information
print("Dataset Info:")
print(f"Number of rows: {len(df)}")
print(f"Number of columns: {len(df.columns)}")
print(f"Column names: {list(df.columns)}")
print("\\nData types:")
print(df.dtypes)
print("\\nBasic statistics:")
df.describe()
""")
    nb.cells.append(overview_code_cell)

    # Add data quality section
    quality_cell = nbf.v4.new_markdown_cell("## Data Quality Assessment")
    nb.cells.append(quality_cell)

    quality_code_cell = nbf.v4.new_code_cell("""
# Check for missing values
print("Missing values:")
missing_counts = df.isnull().sum()
missing_percentages = (missing_counts / len(df)) * 100
missing_df = pd.DataFrame({
    'Missing Count': missing_counts,
    'Missing Percentage': missing_percentages
})
print(missing_df[missing_df['Missing Count'] > 0])

# Check for duplicates
duplicates = df.duplicated().sum()
print(f"\\nDuplicate rows: {duplicates}")

# Data type distribution
print("\\nData type distribution:")
print(df.dtypes.value_counts())
""")
    nb.cells.append(quality_code_cell)

    # Add visualizations section
    viz_cell = nbf.v4.new_markdown_cell("## Data Visualizations")
    nb.cells.append(viz_cell)

    # Numerical data distributions
    num_viz_cell = nbf.v4.new_code_cell("""
# Distribution of numerical columns
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
    # Plot distributions
    n_cols = min(4, len(numeric_cols))
    n_rows = (len(numeric_cols) + n_cols - 1) // n_cols

    fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 4*n_rows))
    if n_rows == 1:
        axes = axes if len(numeric_cols) > 1 else [axes]
    else:
        axes = axes.flatten()

    for i, col in enumerate(numeric_cols):
        if i < len(axes):
            df[col].hist(bins=30, ax=axes[i], alpha=0.7)
            axes[i].set_title(f'Distribution of {col}')
            axes[i].set_xlabel(col)
            axes[i].set_ylabel('Frequency')

    # Hide empty subplots
    for j in range(i+1, len(axes)):
        axes[j].set_visible(False)

    plt.tight_layout()
    plt.show()
else:
    print("No numeric columns found for distribution plots")
""")
    nb.cells.append(num_viz_cell)

    # Categorical data overview
    cat_viz_cell = nbf.v4.new_code_cell("""
# Categorical data overview
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
if len(categorical_cols) > 0:
    print("Categorical columns summary:")
    for col in categorical_cols[:5]:  # Limit to first 5 categorical columns
        print(f"\\n{col}:")
        print(f"  Unique values: {df[col].nunique()}")
        print(f"  Most frequent: {df[col].mode().iloc[0] if not df[col].empty else 'N/A'}")
        if df[col].nunique() <= 10:
            print(f"  Value counts:")
            print(df[col].value_counts().head())
else:
    print("No categorical columns found")
""")
    nb.cells.append(cat_viz_cell)

    # Add correlation analysis for numeric data
    corr_cell = nbf.v4.new_code_cell("""
# Correlation analysis for numeric columns
if len(numeric_cols) > 1:
    plt.figure(figsize=(10, 8))
    correlation_matrix = df[numeric_cols].corr()
    sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0, square=True)
    plt.title('Feature Correlation Matrix')
    plt.tight_layout()
    plt.show()

    # Find highly correlated pairs
    high_corr_pairs = []
    for i in range(len(correlation_matrix.columns)):
        for j in range(i+1, len(correlation_matrix.columns)):
            corr_val = correlation_matrix.iloc[i, j]
            if abs(corr_val) > 0.7:
                high_corr_pairs.append((correlation_matrix.columns[i], correlation_matrix.columns[j], corr_val))

    if high_corr_pairs:
        print("\\nHighly correlated pairs (|correlation| > 0.7):")
        for col1, col2, corr in high_corr_pairs:
            print(f"  {col1} - {col2}: {corr:.3f}")
    else:
        print("\\nNo highly correlated pairs found")
else:
    print("Insufficient numeric columns for correlation analysis")
""")
    nb.cells.append(corr_cell)

    # Add data preparation recommendations
    recommendations_cell = nbf.v4.new_markdown_cell("## Data Preparation Recommendations")
    nb.cells.append(recommendations_cell)

    recommendations_code_cell = nbf.v4.new_code_cell("""
# Generate data preparation recommendations
recommendations = []

# Missing value recommendations
if missing_counts.sum() > 0:
    recommendations.append("Handle missing values using appropriate imputation strategies")

# Duplicate recommendations
if duplicates > 0:
    recommendations.append(f"Remove {duplicates} duplicate rows")

# High cardinality categorical variables
for col in categorical_cols:
    if df[col].nunique() > len(df) * 0.5:
        recommendations.append(f"Consider encoding strategy for high-cardinality column: {col}")

# Skewed distributions
for col in numeric_cols:
    skewness = df[col].skew()
    if abs(skewness) > 2:
        recommendations.append(f"Consider transformation for skewed column: {col} (skewness: {skewness:.2f})")

# Display recommendations
print("Data Preparation Recommendations:")
for i, rec in enumerate(recommendations, 1):
    print(f"{i}. {rec}")

if not recommendations:
    print("No major data quality issues detected!")
""")
    nb.cells.append(recommendations_code_cell)

    # Add conclusion
    conclusion_cell = nbf.v4.new_markdown_cell("""
## Conclusion

This data exploration report provides:
- Dataset structure and basic statistics
- Data quality assessment
- Visualization of key patterns
- Recommendations for data preparation

Next steps:
1. Implement recommended data cleaning steps
2. Feature engineering based on patterns observed
3. Proceed to exploratory data analysis for modeling insights
""")
    nb.cells.append(conclusion_cell)

    # Write notebook to file
    with open(notebook_path, 'w') as f:
        nbf.write(nb, f)

    return notebook_path
```

Quick Install

$npx ai-builder add agent dominodatalab/Data-Wrangler-Agent

Details

Type
agent
Slug
dominodatalab/Data-Wrangler-Agent
Created
6d ago