skillby khanh-vu

Order Execution Patterns

Advanced order execution algorithms to minimize slippage and optimize fill quality.

Installs: 0
Used in: 1 repos
Updated: 1d ago
$npx ai-builder add skill khanh-vu/order-execution-patterns

Installs to .claude/skills/order-execution-patterns/

# Order Execution Patterns

Advanced order execution algorithms to minimize slippage and optimize fill quality.

## TWAP (Time-Weighted Average Price) Execution

```python
import asyncio
from decimal import Decimal
from datetime import datetime, timedelta
from typing import List, Dict

class TWAPExecutor:
    """
    Time-Weighted Average Price execution
    Splits large order evenly over time to minimize market impact
    """

    def __init__(self, exchange_connector):
        self.exchange = exchange_connector
        self.active_executions = {}

    async def execute_twap(
        self,
        symbol: str,
        side: str,  # 'buy' or 'sell'
        total_amount: Decimal,
        duration_minutes: int,
        num_slices: int = None
    ) -> Dict:
        """
        Execute TWAP order

        Args:
            symbol: Trading pair (e.g., 'BTC/USDT')
            side: 'buy' or 'sell'
            total_amount: Total quantity to trade
            duration_minutes: Time window for execution
            num_slices: Number of slices (default: duration_minutes)

        Returns:
            Execution report with fills, average price, slippage
        """
        if num_slices is None:
            num_slices = duration_minutes

        slice_amount = total_amount / num_slices
        interval_seconds = (duration_minutes * 60) / num_slices

        execution_id = f"twap_{symbol}_{datetime.utcnow().timestamp()}"
        self.active_executions[execution_id] = {
            'symbol': symbol,
            'side': side,
            'total_amount': total_amount,
            'fills': [],
            'start_time': datetime.utcnow(),
            'status': 'active'
        }

        logger.info(
            f"Starting TWAP execution: {total_amount} {symbol} "
            f"over {duration_minutes}m in {num_slices} slices"
        )

        try:
            for slice_num in range(num_slices):
                # Get current mid price
                ticker = await self.exchange.fetch_ticker(symbol)
                mid_price = (ticker['bid'] + ticker['ask']) / 2

                # Place limit order at mid price (passive execution)
                order = await self.exchange.create_limit_order(
                    symbol=symbol,
                    side=side,
                    amount=float(slice_amount),
                    price=float(mid_price)
                )

                # Wait for partial fill or timeout
                fill = await self._wait_for_fill(
                    order['id'],
                    timeout_seconds=interval_seconds * 0.8  # 80% of interval
                )

                self.active_executions[execution_id]['fills'].append(fill)

                # If not fully filled, cancel and use market order for remainder
                if fill['filled'] < slice_amount:
                    await self.exchange.cancel_order(order['id'])
                    remainder = slice_amount - fill['filled']

                    if remainder > 0:
                        market_fill = await self.exchange.create_market_order(
                            symbol=symbol,
                            side=side,
                            amount=float(remainder)
                        )
                        self.active_executions[execution_id]['fills'].append(market_fill)

                # Wait until next slice
                if slice_num < num_slices - 1:
                    await asyncio.sleep(interval_seconds)

            # Calculate execution statistics
            report = self._generate_execution_report(execution_id)
            self.active_executions[execution_id]['status'] = 'completed'

            return report

        except Exception as e:
            logger.error(f"TWAP execution failed: {e}")
            self.active_executions[execution_id]['status'] = 'failed'
            raise

    async def _wait_for_fill(
        self,
        order_id: str,
        timeout_seconds: float
    ) -> Dict:
        """Wait for order to fill or timeout"""
        start_time = time.time()

        while time.time() - start_time < timeout_seconds:
            order = await self.exchange.fetch_order(order_id)

            if order['status'] in ['closed', 'filled']:
                return {
                    'filled': Decimal(str(order['filled'])),
                    'price': Decimal(str(order['average'])),
                    'timestamp': order['timestamp']
                }

            await asyncio.sleep(1)

        # Timeout - return partial fill
        order = await self.exchange.fetch_order(order_id)
        return {
            'filled': Decimal(str(order.get('filled', 0))),
            'price': Decimal(str(order.get('average', 0))),
            'timestamp': order['timestamp']
        }
```

## VWAP (Volume-Weighted Average Price) Execution

```python
import numpy as np

class VWAPExecutor:
    """
    Volume-Weighted Average Price execution
    Slices order according to historical volume distribution
    """

    def __init__(self, exchange_connector):
        self.exchange = exchange_connector

    async def execute_vwap(
        self,
        symbol: str,
        side: str,
        total_amount: Decimal,
        duration_minutes: int,
        lookback_days: int = 7
    ) -> Dict:
        """
        Execute VWAP order using historical volume profile

        Args:
            symbol: Trading pair
            side: 'buy' or 'sell'
            total_amount: Total quantity to trade
            duration_minutes: Execution window
            lookback_days: Days of historical data for volume profile
        """
        # Fetch historical volume distribution
        volume_profile = await self._calculate_volume_profile(
            symbol,
            lookback_days
        )

        # Determine slice sizes based on expected volume
        slices = self._calculate_vwap_slices(
            total_amount,
            duration_minutes,
            volume_profile
        )

        logger.info(
            f"Starting VWAP execution: {total_amount} {symbol} "
            f"over {duration_minutes}m with {len(slices)} dynamic slices"
        )

        fills = []

        for slice_info in slices:
            # Wait until target time
            wait_time = (slice_info['target_time'] - datetime.utcnow()).total_seconds()
            if wait_time > 0:
                await asyncio.sleep(wait_time)

            # Execute slice
            try:
                ticker = await self.exchange.fetch_ticker(symbol)
                mid_price = (ticker['bid'] + ticker['ask']) / 2

                order = await self.exchange.create_limit_order(
                    symbol=symbol,
                    side=side,
                    amount=float(slice_info['amount']),
                    price=float(mid_price)
                )

                # Wait briefly for fill
                fill = await self._wait_for_fill(order['id'], timeout_seconds=30)
                fills.append(fill)

                # Market order for remainder if needed
                if fill['filled'] < slice_info['amount']:
                    await self.exchange.cancel_order(order['id'])
                    remainder = slice_info['amount'] - fill['filled']

                    if remainder > 0:
                        market_order = await self.exchange.create_market_order(
                            symbol=symbol,
                            side=side,
                            amount=float(remainder)
                        )
                        fills.append(market_order)

            except Exception as e:
                logger.error(f"VWAP slice execution failed: {e}")
                continue

        return self._generate_vwap_report(fills, total_amount)

    async def _calculate_volume_profile(
        self,
        symbol: str,
        lookback_days: int
    ) -> Dict:
        """
        Calculate intraday volume distribution from historical data
        Returns percentage of daily volume per time bucket
        """
        # Fetch historical OHLCV data
        since = int((datetime.utcnow() - timedelta(days=lookback_days)).timestamp() * 1000)
        ohlcv = await self.exchange.fetch_ohlcv(
            symbol,
            timeframe='1h',
            since=since
        )

        # Group by hour of day
        hourly_volumes = {}
        for candle in ohlcv:
            timestamp, open_, high, low, close, volume = candle
            hour = datetime.fromtimestamp(timestamp / 1000).hour

            if hour not in hourly_volumes:
                hourly_volumes[hour] = []
            hourly_volumes[hour].append(volume)

        # Calculate average volume per hour
        volume_profile = {}
        total_volume = sum(np.mean(vols) for vols in hourly_volumes.values())

        for hour, volumes in hourly_volumes.items():
            avg_volume = np.mean(volumes)
            volume_profile[hour] = avg_volume / total_volume

        return volume_profile
```

## Iceberg Order Pattern

```python
class IcebergOrderExecutor:
    """
    Iceberg order execution - hide total order size
    Only show small portion on order book at a time
    """

    def __init__(self, exchange_connector):
        self.exchange = exchange_connector

    async def execute_iceberg(
        self,
        symbol: str,
        side: str,
        total_amount: Decimal,
        visible_amount: Decimal,  # Amount visible on order book
        price: Decimal = None     # None for market price
    ) -> Dict:
        """
        Execute iceberg order

        Args:
            symbol: Trading pair
            side: 'buy' or 'sell'
            total_amount: Total order size (hidden)
            visible_amount: Visible portion on order book
            price: Limit price (None for market)
        """
        remaining = total_amount
        fills = []

        logger.info(
            f"Starting iceberg order: {total_amount} {symbol} "
            f"with {visible_amount} visible per slice"
        )

        while remaining > 0:
            # Determine current slice size
            slice_size = min(visible_amount, remaining)

            # Place order
            if price:
                order = await self.exchange.create_limit_order(
                    symbol=symbol,
                    side=side,
                    amount=float(slice_size),
                    price=float(price)
                )
            else:
                # Dynamic pricing at mid price
                ticker = await self.exchange.fetch_ticker(symbol)
                mid_price = (ticker['bid'] + ticker['ask']) / 2

                order = await self.exchange.create_limit_order(
                    symbol=symbol,
                    side=side,
                    amount=float(slice_size),
                    price=float(mid_price)
                )

            # Wait for full fill
            fill = await self._wait_for_full_fill(order['id'], timeout_seconds=300)
            fills.append(fill)

            remaining -= fill['filled']

            logger.info(f"Iceberg slice filled: {fill['filled']} @ {fill['price']}, remaining: {remaining}")

            # Small delay before next slice
            await asyncio.sleep(2)

        return self._generate_iceberg_report(fills, total_amount)
```

## Smart Order Router (SOR)

```python
class SmartOrderRouter:
    """
    Route orders across multiple exchanges for best execution
    """

    def __init__(self, exchange_connectors: Dict[str, 'ExchangeConnector']):
        self.exchanges = exchange_connectors

    async def route_order(
        self,
        symbol: str,
        side: str,
        amount: Decimal
    ) -> Dict:
        """
        Route order to exchange(s) with best liquidity and price

        Strategy:
        1. Fetch order books from all exchanges
        2. Calculate effective price for full amount on each exchange
        3. Optionally split order across multiple exchanges
        """
        # Fetch order books from all exchanges
        order_books = await self._fetch_all_order_books(symbol)

        # Calculate execution cost on each exchange
        execution_analysis = {}

        for exchange_name, order_book in order_books.items():
            analysis = self._analyze_execution_cost(
                order_book,
                side,
                amount
            )
            execution_analysis[exchange_name] = analysis

        # Find best single-exchange execution
        best_exchange = min(
            execution_analysis.items(),
            key=lambda x: x[1]['total_cost']
        )[0]

        logger.info(
            f"Smart routing: Best execution for {amount} {symbol} "
            f"on {best_exchange} at avg price {execution_analysis[best_exchange]['avg_price']}"
        )

        # Execute on best exchange
        order = await self.exchanges[best_exchange].create_market_order(
            symbol=symbol,
            side=side,
            amount=float(amount)
        )

        return {
            'exchange': best_exchange,
            'order': order,
            'execution_analysis': execution_analysis
        }

    def _analyze_execution_cost(
        self,
        order_book: Dict,
        side: str,
        amount: Decimal
    ) -> Dict:
        """Calculate total cost and average price for execution"""
        levels = order_book['asks'] if side == 'buy' else order_book['bids']

        remaining = amount
        total_cost = Decimal('0')
        total_filled = Decimal('0')

        for price, size in levels:
            price = Decimal(str(price))
            size = Decimal(str(size))

            fill_amount = min(remaining, size)
            total_cost += fill_amount * price
            total_filled += fill_amount
            remaining -= fill_amount

            if remaining <= 0:
                break

        if remaining > 0:
            # Insufficient liquidity
            return {
                'avg_price': None,
                'total_cost': Decimal('inf'),
                'liquidity_sufficient': False
            }

        return {
            'avg_price': total_cost / total_filled,
            'total_cost': total_cost,
            'liquidity_sufficient': True
        }
```

---
**Use these execution algorithms** to minimize slippage and improve fill quality for large orders.

Quick Install

$npx ai-builder add skill khanh-vu/order-execution-patterns

Details

Type
skill
Author
khanh-vu
Slug
khanh-vu/order-execution-patterns
Created
4d ago