agentby dominodatalab
Data-Wrangler-Agent
Use this agent to find data on the Internet to fit a use case or to generate synthetic data to match the use case
Installs: 0
Used in: 1 repos
Updated: 2d ago
$
npx ai-builder add agent dominodatalab/Data-Wrangler-AgentInstalls to .claude/agents/Data-Wrangler-Agent.md
### System Prompt
```
You are a Senior Data Engineer with 12+ years of experience in enterprise data acquisition, synthesis, and preparation. You excel at locating, generating, and preparing data for ML workflows in Domino Data Lab.
## Core Competencies
- Python-based data engineering (pandas, numpy, polars)
- Web scraping and API integration with Python
- Synthetic data generation with realistic distributions
- Data quality assessment and remediation
- ETL/ELT pipeline development using Python frameworks
- Data versioning and lineage tracking
- Privacy-preserving data techniques
## Primary Responsibilities
1. Locate relevant datasets from public/private sources
2. Generate synthetic data matching business scenarios using Python libraries
3. Establish data connections in Domino
4. Implement data quality checks with Python (great_expectations, pandera)
5. Version datasets for reproducibility
6. Create data documentation and dictionaries
## Domino Integration Points
- Data source connections configuration
- Dataset versioning and storage
- Data quality monitoring setup
- Pipeline scheduling and automation
- Compute environment optimization
## Error Handling Approach
- Implement retry logic with exponential backoff
- Validate data at each transformation step
- Create data quality scorecards
- Maintain fallback data sources
- Log all data lineage information
## Output Standards
- Python notebooks (.ipynb) with clear documentation
- Python scripts (.py) with proper error handling
- Data quality reports with pandas profiling
- Synthetic data generation scripts in Python
- Data dictionaries in JSON/YAML format
- Reproducible Python-based data pipelines
## Professional Formatting Guidelines
- Use professional, business-appropriate language in all outputs
- Avoid emojis, emoticons, or decorative symbols in documentation
- Use standard markdown formatting for structure and emphasis
- Maintain formal tone appropriate for enterprise environments
- Use checkmarks (✓) and X marks (✗) for status indicators only when necessary
```
### Key Methods
```python
def acquire_or_generate_data(self, specifications):
"""Robust data acquisition with Python libraries and MLflow tracking"""
import pandas as pd
import numpy as np
import mlflow
import mlflow.pandas
mlflow.set_tracking_uri("http://localhost:8768")
from faker import Faker
from sdv.synthetic_data import TabularSDG
import json
import os
from datetime import datetime
from pathlib import Path
# Set up directory structure
project_name = specifications.get('project', 'demo')
stage = 'data_acquisition'
# Create directories if they don't exist
code_dir = Path(f"/mnt/code/{stage}")
notebooks_dir = code_dir / "notebooks"
scripts_dir = code_dir / "scripts"
artifacts_dir = Path(f"/mnt/artifacts/{stage}")
data_dir = Path(f"/mnt/data/{project_name}/{stage}")
for directory in [notebooks_dir, scripts_dir, artifacts_dir, data_dir]:
directory.mkdir(parents=True, exist_ok=True)
# Initialize MLflow experiment
experiment_name = f"data_acquisition_{project_name}"
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name="data_acquisition_main") as run:
mlflow.set_tag("stage", "data_wrangling")
mlflow.set_tag("agent", "data_wrangler")
mlflow.log_param("project_name", project_name)
mlflow.log_param("data_directory", str(data_dir))
data_sources = []
# Primary: Try to locate real data using Python
try:
if specifications.get('real_data_preferred', True):
mlflow.log_param("data_source", "real_data")
mlflow.log_param("specifications", json.dumps(specifications))
# Use pandas for data loading
real_data = self.search_and_acquire_data_python(specifications)
quality_score = self.validate_data_quality(real_data)
mlflow.log_metric("data_quality_score", quality_score)
mlflow.log_metric("n_rows", len(real_data))
mlflow.log_metric("n_columns", len(real_data.columns))
if quality_score > 0.8:
# Save data to project dataset
data_path = data_dir / "raw_data.parquet"
real_data.to_parquet(data_path)
# Log dataset info to MLflow
mlflow.log_param("data_shape", str(real_data.shape))
mlflow.pandas.log_table(real_data.head(100), "data_sample.json")
mlflow.log_artifact(str(data_path))
# Create and save data profile
profile_path = artifacts_dir / "data_profile.html"
self.create_data_profile(real_data, profile_path)
mlflow.log_artifact(str(profile_path))
# Create test JSON file
test_json_path = artifacts_dir / "test_data.json"
test_json = real_data.head(5).to_dict(orient='records')
with open(test_json_path, "w") as f:
json.dump(test_json, f, indent=2)
mlflow.log_artifact(str(test_json_path))
# Save data acquisition script to scripts directory
script_path = scripts_dir / "data_acquisition.py"
self.save_acquisition_script(specifications, script_path)
mlflow.log_artifact(str(script_path))
# Create Jupyter notebook for data exploration
notebook_path = notebooks_dir / "data_exploration.ipynb"
self.create_data_exploration_notebook(real_data, specifications, notebook_path)
mlflow.log_artifact(str(notebook_path))
# Create requirements.txt for this stage
requirements_path = code_dir / "requirements.txt"
with open(requirements_path, "w") as f:
f.write("pandas>=2.0.0\nnumpy>=1.24.0\nmlflow>=2.9.0\n")
f.write("faker>=20.0.0\nsdv>=1.0.0\njupyter>=1.0.0\nnbformat>=5.7.0\n")
mlflow.log_artifact(str(requirements_path))
mlflow.set_tag("data_acquisition_status", "success")
return real_data
except Exception as e:
mlflow.log_param("real_data_error", str(e))
self.log_warning(f"Real data acquisition failed: {e}")
# Fallback: Generate synthetic data with Python libraries
try:
mlflow.log_param("data_source", "synthetic")
# Use Python synthetic data libraries
synthetic_params = self.infer_synthetic_parameters(specifications)
mlflow.log_params(synthetic_params)
# Generate using pandas and numpy
synthetic_data = self.generate_synthetic_data_python(
synthetic_params,
use_libraries=['faker', 'sdv', 'numpy'],
ensure_realistic=True,
include_edge_cases=True
)
# Add controlled noise and outliers using numpy
synthetic_data = self.add_realistic_imperfections(
synthetic_data,
missing_rate=0.05,
outlier_rate=0.02
)
# Save synthetic data to project dataset
synthetic_path = data_dir / "synthetic_data.parquet"
synthetic_data.to_parquet(synthetic_path)
# Log synthetic data metrics
mlflow.log_metric("synthetic_rows", len(synthetic_data))
mlflow.log_metric("synthetic_columns", len(synthetic_data.columns))
mlflow.log_metric("missing_rate", 0.05)
mlflow.log_metric("outlier_rate", 0.02)
# Save artifacts
mlflow.pandas.log_table(synthetic_data.head(100), "synthetic_sample.json")
mlflow.log_artifact(str(synthetic_path))
# Create test JSON
test_json_path = artifacts_dir / "test_synthetic.json"
test_json = synthetic_data.head(5).to_dict(orient='records')
with open(test_json_path, "w") as f:
json.dump(test_json, f, indent=2)
mlflow.log_artifact(str(test_json_path))
# Save generation script to scripts directory
script_path = scripts_dir / "synthetic_generation.py"
self.save_generation_script(synthetic_params, script_path)
mlflow.log_artifact(str(script_path))
# Create Jupyter notebook for synthetic data exploration
notebook_path = notebooks_dir / "synthetic_data_exploration.ipynb"
self.create_data_exploration_notebook(synthetic_data, specifications, notebook_path)
mlflow.log_artifact(str(notebook_path))
mlflow.set_tag("data_acquisition_status", "synthetic_success")
return synthetic_data
except Exception as e:
mlflow.log_param("synthetic_data_error", str(e))
# Ultimate fallback: Use cached pandas DataFrame
self.log_error(f"Synthetic generation failed: {e}")
mlflow.set_tag("data_acquisition_status", "fallback_cache")
cached_path = data_dir / f"cached_{specifications.get('domain', 'default')}.parquet"
return pd.read_parquet(cached_path)
def create_data_exploration_notebook(self, data, specifications, notebook_path):
"""Create a Jupyter notebook for data exploration and quality assessment"""
import nbformat as nbf
import json
# Create new notebook
nb = nbf.v4.new_notebook()
# Add title cell
data_type = "Synthetic" if specifications.get('synthetic_data', False) else "Real"
title_cell = nbf.v4.new_markdown_cell(f"""
# {data_type} Data Exploration Report
Project: {specifications.get('project', 'Demo')}
Domain: {specifications.get('domain', 'General')}
## Overview
This notebook contains data exploration and quality assessment for the acquired dataset:
- Dataset characteristics and structure
- Data quality assessment
- Initial exploration and insights
- Recommendations for data preparation
""")
nb.cells.append(title_cell)
# Add imports cell
imports_cell = nbf.v4.new_code_cell("""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')
# Set plotting style
plt.style.use('default')
sns.set_palette("husl")
""")
nb.cells.append(imports_cell)
# Add data loading section
project_name = specifications.get('project', 'demo')
data_load_cell = nbf.v4.new_code_cell(f"""
# Load the acquired dataset
data_path = "/mnt/data/{project_name}/data_acquisition/raw_data.parquet"
if Path(data_path).exists():
df = pd.read_parquet(data_path)
print(f"Dataset loaded successfully!")
print(f"Shape: {{df.shape}}")
print(f"Memory usage: {{df.memory_usage(deep=True).sum() / 1024**2:.2f}} MB")
else:
print("Dataset not found at expected location")
# Display first few rows
df.head()
""")
nb.cells.append(data_load_cell)
# Add data overview section
overview_cell = nbf.v4.new_markdown_cell("## Dataset Overview")
nb.cells.append(overview_cell)
overview_code_cell = nbf.v4.new_code_cell("""
# Basic dataset information
print("Dataset Info:")
print(f"Number of rows: {len(df)}")
print(f"Number of columns: {len(df.columns)}")
print(f"Column names: {list(df.columns)}")
print("\\nData types:")
print(df.dtypes)
print("\\nBasic statistics:")
df.describe()
""")
nb.cells.append(overview_code_cell)
# Add data quality section
quality_cell = nbf.v4.new_markdown_cell("## Data Quality Assessment")
nb.cells.append(quality_cell)
quality_code_cell = nbf.v4.new_code_cell("""
# Check for missing values
print("Missing values:")
missing_counts = df.isnull().sum()
missing_percentages = (missing_counts / len(df)) * 100
missing_df = pd.DataFrame({
'Missing Count': missing_counts,
'Missing Percentage': missing_percentages
})
print(missing_df[missing_df['Missing Count'] > 0])
# Check for duplicates
duplicates = df.duplicated().sum()
print(f"\\nDuplicate rows: {duplicates}")
# Data type distribution
print("\\nData type distribution:")
print(df.dtypes.value_counts())
""")
nb.cells.append(quality_code_cell)
# Add visualizations section
viz_cell = nbf.v4.new_markdown_cell("## Data Visualizations")
nb.cells.append(viz_cell)
# Numerical data distributions
num_viz_cell = nbf.v4.new_code_cell("""
# Distribution of numerical columns
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
# Plot distributions
n_cols = min(4, len(numeric_cols))
n_rows = (len(numeric_cols) + n_cols - 1) // n_cols
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 4*n_rows))
if n_rows == 1:
axes = axes if len(numeric_cols) > 1 else [axes]
else:
axes = axes.flatten()
for i, col in enumerate(numeric_cols):
if i < len(axes):
df[col].hist(bins=30, ax=axes[i], alpha=0.7)
axes[i].set_title(f'Distribution of {col}')
axes[i].set_xlabel(col)
axes[i].set_ylabel('Frequency')
# Hide empty subplots
for j in range(i+1, len(axes)):
axes[j].set_visible(False)
plt.tight_layout()
plt.show()
else:
print("No numeric columns found for distribution plots")
""")
nb.cells.append(num_viz_cell)
# Categorical data overview
cat_viz_cell = nbf.v4.new_code_cell("""
# Categorical data overview
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
if len(categorical_cols) > 0:
print("Categorical columns summary:")
for col in categorical_cols[:5]: # Limit to first 5 categorical columns
print(f"\\n{col}:")
print(f" Unique values: {df[col].nunique()}")
print(f" Most frequent: {df[col].mode().iloc[0] if not df[col].empty else 'N/A'}")
if df[col].nunique() <= 10:
print(f" Value counts:")
print(df[col].value_counts().head())
else:
print("No categorical columns found")
""")
nb.cells.append(cat_viz_cell)
# Add correlation analysis for numeric data
corr_cell = nbf.v4.new_code_cell("""
# Correlation analysis for numeric columns
if len(numeric_cols) > 1:
plt.figure(figsize=(10, 8))
correlation_matrix = df[numeric_cols].corr()
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0, square=True)
plt.title('Feature Correlation Matrix')
plt.tight_layout()
plt.show()
# Find highly correlated pairs
high_corr_pairs = []
for i in range(len(correlation_matrix.columns)):
for j in range(i+1, len(correlation_matrix.columns)):
corr_val = correlation_matrix.iloc[i, j]
if abs(corr_val) > 0.7:
high_corr_pairs.append((correlation_matrix.columns[i], correlation_matrix.columns[j], corr_val))
if high_corr_pairs:
print("\\nHighly correlated pairs (|correlation| > 0.7):")
for col1, col2, corr in high_corr_pairs:
print(f" {col1} - {col2}: {corr:.3f}")
else:
print("\\nNo highly correlated pairs found")
else:
print("Insufficient numeric columns for correlation analysis")
""")
nb.cells.append(corr_cell)
# Add data preparation recommendations
recommendations_cell = nbf.v4.new_markdown_cell("## Data Preparation Recommendations")
nb.cells.append(recommendations_cell)
recommendations_code_cell = nbf.v4.new_code_cell("""
# Generate data preparation recommendations
recommendations = []
# Missing value recommendations
if missing_counts.sum() > 0:
recommendations.append("Handle missing values using appropriate imputation strategies")
# Duplicate recommendations
if duplicates > 0:
recommendations.append(f"Remove {duplicates} duplicate rows")
# High cardinality categorical variables
for col in categorical_cols:
if df[col].nunique() > len(df) * 0.5:
recommendations.append(f"Consider encoding strategy for high-cardinality column: {col}")
# Skewed distributions
for col in numeric_cols:
skewness = df[col].skew()
if abs(skewness) > 2:
recommendations.append(f"Consider transformation for skewed column: {col} (skewness: {skewness:.2f})")
# Display recommendations
print("Data Preparation Recommendations:")
for i, rec in enumerate(recommendations, 1):
print(f"{i}. {rec}")
if not recommendations:
print("No major data quality issues detected!")
""")
nb.cells.append(recommendations_code_cell)
# Add conclusion
conclusion_cell = nbf.v4.new_markdown_cell("""
## Conclusion
This data exploration report provides:
- Dataset structure and basic statistics
- Data quality assessment
- Visualization of key patterns
- Recommendations for data preparation
Next steps:
1. Implement recommended data cleaning steps
2. Feature engineering based on patterns observed
3. Proceed to exploratory data analysis for modeling insights
""")
nb.cells.append(conclusion_cell)
# Write notebook to file
with open(notebook_path, 'w') as f:
nbf.write(nb, f)
return notebook_path
```Quick Install
$
npx ai-builder add agent dominodatalab/Data-Wrangler-AgentDetails
- Type
- agent
- Author
- dominodatalab
- Slug
- dominodatalab/Data-Wrangler-Agent
- Created
- 6d ago