bitcoin-model/model.py
Sam Fredrickson ea8c2092a6 Mega-squash
Implement some basic backtesting.

Switch to simpler log-based projection.

Add improved vol calculation.

Implement trend smoothing.

Add Claude's notes on current model.

Run projections and backtests in parallel.

Add market maturity projection adjustments.

Improve volume handling in market maturity calculations.

New backtest suite proposed by Claude.

Fix create_plots() output.

Merge branch 'new-backtests' into market-maturity-backtests

Add era-aware market maturity adjustments.

Add updated notes from Claude.

Add justfile to simplify organizing results for comparison.

Run more projections from various start dates.

Tuning session; removed market maturity.

The market maturity score only complicated the model with no clear
benefit. Still working on getting the various backtests tuned.

Add Claude's notes from recent session.

More helpful additions to workflow.

New systematic backtest framework.

Add notes on new backtesting framework.

Use ruff linter.

Tweak the backtests.

* Start from 2011 instead of 2013.
* Validate over two years instead of one.

Improve uncertainty estimation.

Add Claude's notes from the last revision.

Add adaptive volatility window.

New, streamlined NOTES.

Fix projection plot bugs.

Update prices.csv.

Actually use long-term vol in adaptive calculation.

Use more conservative 1e-6 to prevent division by zero.

it's an error not to provide halving dates

warn when val period shorter than projection period

Update prices.csv

Manage output files in less-janky fashion.

Use market fundamentals intsead of empirical era adjustments.

Improve CI coverage.

Use S2F metrics for trend analysis.

update prices

Merge branch 'next' into mkt-fndm

Update prices.

Merge branch 'next' into mkt-fndm

Add CDPR plot.

Merge branch 'next' into mkt-fndm

Update prices.

Slight optimization to cdpr plot gen.

Update prices.

Merge branch 'next' into mkt-fndm

Add price to CDPR plot.

Update prices.

Add .private to .gitignore.

Update prices.
2024-12-19 23:58:42 -08:00

2022 lines
68 KiB
Python

import argparse
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import seaborn as sns
import shutil
import warnings
from datetime import timedelta
from multiprocessing import Pool
from typing import BinaryIO
# Output
class Output:
"""
Output ensures result files get written to the right directory.
Example:
output = Output("output", "test-1")
with output.create("summary.txt") as f:
# path to f is "output/test-1/summary.txt"
"""
out_dir: str
def __init__(self, base_dir: str, name: str):
"""
Initialize the output manager. This will both fully delete any existing
output directory, and then create an empty directory for the output.
Args:
base_dir: The root directory for outputs
name: The subdir of the root, where the results files go
"""
self.out_dir = os.path.join(base_dir, name)
shutil.rmtree(self.out_dir, ignore_errors=True)
os.makedirs(self.out_dir)
def create(self, filename) -> BinaryIO:
"""
Create a new file for writing in the output directory.
"""
full_path = self.named(filename)
f = open(full_path, "w")
return f
def named(self, filename) -> str:
"""
Get the full path within the output directory for a named results file.
"""
full_path = os.path.join(self.out_dir, filename)
return full_path
# Utility functions
def get_halving_dates():
"""Return known and projected Bitcoin halving dates"""
return pd.to_datetime(
[
"2008-01-03", # Bitcoin genesis block (treat as cycle start)
"2012-11-28", # First halving
"2016-07-09", # Second halving
"2020-05-11", # Third halving
"2024-04-19", # Fourth halving
"2028-04-20", # Fifth halving (projected)
]
)
def get_cycle_position(date, halving_dates):
"""
Calculate position in halving cycle (0 to 1) for a given date.
0 represents a halving event, 1 represents just before the next halving.
"""
if len(halving_dates) == 0:
raise Exception("halving dates cannot be empty")
# Convert date to datetime if it's not already
date = pd.to_datetime(date)
# Find the most recent halving before this date
prev_halving = halving_dates[halving_dates <= date].max()
if pd.isna(prev_halving):
return 0.0 # For dates before first halving
# Find next halving
future_halvings = halving_dates[halving_dates > date]
if len(future_halvings) == 0:
# For dates after last known halving, use same cycle length as last known cycle
last_cycle_length = (halving_dates[-1] - halving_dates[-2]).days
days_since_halving = (date - halving_dates[-1]).days
return min(days_since_halving / last_cycle_length, 1.0)
next_halving = future_halvings.min()
# Calculate position as fraction between halvings
days_since_halving = (date - prev_halving).days
cycle_length = (next_halving - prev_halving).days
return min(days_since_halving / cycle_length, 1.0)
def format_price(x, p):
"""Format large numbers in K, M, B format with appropriate precision"""
if abs(x) >= 1e9:
return f"${x/1e9:.1f}B"
if abs(x) >= 1e6:
return f"${x/1e6:.1f}M"
if abs(x) >= 1e3:
return f"${x/1e3:.1f}K"
if abs(x) >= 1:
return f"${x:.0f}"
return f"${x:.2f}" # For values less than $1, show cents
def get_nice_price_points(min_price, max_price):
"""
Generate a reasonable set of price points for the y-axis that look clean
and cover the range without cluttering the chart.
"""
# Handle zero or negative prices
min_price = max(min_price, 0.0001) # Set minimum price to $0.0001
log_min = np.floor(np.log10(min_price))
log_max = np.ceil(np.log10(max_price))
price_points = []
# For very large ranges (spanning more than 4 orders of magnitude),
# only use powers of 10 and mid-points
if log_max - log_min > 4:
for exp in range(int(log_min), int(log_max + 1)):
base = 10**exp
# Add main power of 10
if min_price <= base <= max_price:
price_points.append(base)
# Add mid-point if range is large enough
if min_price <= base * 5 <= max_price and exp > log_min:
price_points.append(base * 5)
else:
# For smaller ranges, use 1, 2, 5 sequence
for exp in range(int(log_min), int(log_max + 1)):
for mult in [1, 2, 5]:
point = mult * 10**exp
if min_price <= point <= max_price:
price_points.append(point)
return np.array(price_points)
# Market metrics
class MarketFundamentals:
"""
Calculate and track fundamental market metrics for Bitcoin.
Designed to be extensible for additional metrics.
"""
def __init__(self):
# Constants
self.GENESIS_DATE = pd.Timestamp("2009-01-03")
self.BLOCKS_PER_DAY = 144
self.HALVING_INTERVAL = 210000 # blocks
# Volatility adjustment parameters (from our tuning)
self.VOLUME_SCALE = 70
self.DEPTH_SCALE = 7
self.BASE_ADJUSTMENT = 0.68
def calculate_total_supply(self, date):
"""Calculate total Bitcoin supply at a given date."""
days_since_genesis = (date - self.GENESIS_DATE).days
if days_since_genesis < 0:
return 0
total_supply = 0
remaining_blocks = days_since_genesis * self.BLOCKS_PER_DAY
current_reward = 50
while remaining_blocks > 0 and current_reward >= 0.01:
blocks_at_this_reward = min(remaining_blocks, self.HALVING_INTERVAL)
total_supply += blocks_at_this_reward * current_reward
remaining_blocks -= blocks_at_this_reward
current_reward /= 2
# Adjust for missed blocks and lost coins
total_supply *= 0.95 # Account for varying block times
total_supply *= 0.93 # Estimate for lost/inaccessible coins
return total_supply
def get_block_reward(self, date):
"""Get Bitcoin block reward at a given date."""
days_since_genesis = (date - self.GENESIS_DATE).days
if days_since_genesis < 0:
return 0
halvings = days_since_genesis // (4 * 365) # Approximate halving periods
return 50 / (2**halvings)
def calculate_supply_metrics(self, date):
"""Calculate supply-related metrics."""
total_supply = self.calculate_total_supply(date)
block_reward = self.get_block_reward(date)
daily_new_supply = block_reward * self.BLOCKS_PER_DAY
return {
"total_supply": total_supply,
"daily_new_supply": daily_new_supply,
"supply_growth_rate": daily_new_supply / total_supply,
"stock_to_flow": total_supply / (daily_new_supply * 365), # Annualized
}
def calculate_market_metrics(self, df, date, window=30):
"""Calculate market activity metrics."""
recent_data = df[df["Date"] <= date].tail(window)
if len(recent_data) < window:
return {"avg_volume": 0, "price_volatility": 0, "price_impact": 0}
avg_volume = recent_data["Volume"].mean()
price_volatility = recent_data["Close"].pct_change().std()
price_impact = recent_data["Close"].std() / recent_data["Close"].mean()
return {
"avg_volume": avg_volume,
"price_volatility": price_volatility,
"price_impact": price_impact,
}
def get_market_maturity_metrics(self, df, date, window=30):
"""
Combine supply and market metrics to assess market maturity.
"""
supply_metrics = self.calculate_supply_metrics(date)
market_metrics = self.calculate_market_metrics(df, date, window)
# Calculate combined metrics
volume_to_supply = market_metrics["avg_volume"] / supply_metrics["total_supply"]
market_depth = volume_to_supply / (market_metrics["price_impact"] + 0.001)
return {
"volume_to_supply": volume_to_supply,
"supply_growth_rate": supply_metrics["supply_growth_rate"],
"market_depth": market_depth,
"stock_to_flow": supply_metrics["stock_to_flow"],
"price_impact": market_metrics["price_impact"],
}
def calculate_volatility_adjustment(self, metrics):
"""
Calculate volatility adjustment based on market metrics.
"""
# Supply-based component
supply_based_vol = np.sqrt(metrics["supply_growth_rate"] * 365 * 100)
# Market maturity component
maturity_factor = 1 - np.clip(
metrics["volume_to_supply"] * self.VOLUME_SCALE, 0, 0.6
)
# Market depth component
depth_factor = np.clip(
1 / np.sqrt(1 + metrics["market_depth"] * self.DEPTH_SCALE), 0.7, 1.3
)
# Combine factors
adjustment = (
self.BASE_ADJUSTMENT
* (1 + supply_based_vol)
* maturity_factor
* depth_factor
)
# Ensure reasonable bounds
return np.clip(adjustment, 0.65, 0.75)
def calculate_confidence_adjustment(self, metrics, level):
"""Calculate confidence interval adjustments with more sensitive market metrics."""
# More granular depth impact
if metrics["market_depth"] > 0.1:
depth_factor = 0.5
elif metrics["market_depth"] > 0.05:
depth_factor = 0.7
else:
depth_factor = 1.0
# More granular volume impact
if metrics["volume_to_supply"] > 0.015:
volume_factor = 0.5
elif metrics["volume_to_supply"] > 0.008:
volume_factor = 0.7
else:
volume_factor = 1.0
depth_impact = np.clip(metrics["market_depth"] * 0.15 * depth_factor, 0, 0.15)
vol_impact = np.clip(metrics["volume_to_supply"] * 20 * volume_factor, 0, 0.15)
total_adjustment = (depth_impact + vol_impact) * 0.4
if level >= 0.95:
total_adjustment *= 0.4
return level + (1 - level) * total_adjustment
def compare_adjustments(df, fundamentals):
"""
Compare fundamental-based adjustments with original era-based ones.
"""
# Sample dates for comparison
date_range = pd.date_range(start=df["Date"].min(), end=df["Date"].max(), freq="30D")
results = []
for date in date_range:
# Calculate era-based adjustment
if date < pd.Timestamp("2017-12-10"):
era_adj = 0.71 # early era
elif date < pd.Timestamp("2020-01-01"):
era_adj = 0.69 # transition era
else:
era_adj = 0.67 # mature era
# Calculate fundamental-based adjustment
metrics = fundamentals.get_market_maturity_metrics(df, date)
fund_adj = fundamentals.calculate_volatility_adjustment(metrics)
results.append(
{
"date": date,
"era_adjustment": era_adj,
"fundamental_adjustment": fund_adj,
"metrics": metrics,
}
)
return pd.DataFrame(results)
# Analysis functions
def analyze_trends(df):
"""
Analyze Bitcoin price trends using log returns with S2F awareness.
"""
df = df.copy()
fundamentals = MarketFundamentals()
# Get halving dates and calculate cycle position
halving_dates = get_halving_dates()
df["Cycle_Position"] = df["Date"].apply(
lambda x: get_cycle_position(x, halving_dates)
)
df["Cycle_Days"] = (df["Cycle_Position"] * 4 * 365).round().astype(int)
# Calculate S2F metrics for each date
supply_metrics = [
fundamentals.calculate_supply_metrics(date) for date in df["Date"]
]
df["S2F_Ratio"] = [m["stock_to_flow"] for m in supply_metrics]
df["S2F_Change"] = df["S2F_Ratio"].pct_change()
# Calculate log returns and basic cycle returns
df["Log_Price"] = np.log(df["Close"])
df["Log_Return"] = df["Log_Price"].diff()
# Calculate cycle-based returns
position_returns = df.groupby("Cycle_Days")["Log_Return"].mean()
# Calculate S2F impact on returns
s2f_impact = df.groupby("Cycle_Days")["S2F_Change"].mean()
# Smooth both components
window = 60
smoothed_cycle_returns = position_returns.rolling(
window=window,
center=True,
min_periods=int(window / 2),
).mean()
smoothed_s2f_impact = s2f_impact.rolling(
window=window,
center=True,
min_periods=int(window / 2),
).mean()
# Fill NaN values
smoothed_cycle_returns = smoothed_cycle_returns.fillna(method="bfill").fillna(
method="ffill"
)
smoothed_s2f_impact = smoothed_s2f_impact.fillna(method="bfill").fillna(
method="ffill"
)
# Combine cycle returns with S2F impact
s2f_weight = 0.3 # Adjustable parameter
combined_returns = (
smoothed_cycle_returns * (1 - s2f_weight) + smoothed_s2f_impact * s2f_weight
)
# Apply dampening using current market metrics
latest_metrics = fundamentals.get_market_maturity_metrics(df, df["Date"].max())
market_adjustment = fundamentals.calculate_volatility_adjustment(latest_metrics)
def adaptive_dampen(x):
if x > 2 * combined_returns.std():
return x * (0.6 * market_adjustment)
elif x < -2 * combined_returns.std():
return x * (0.7 * market_adjustment)
return x * market_adjustment
return combined_returns.map(adaptive_dampen)
def calculate_adaptive_volatility(
df,
short_window=30,
medium_window=90,
long_window=180,
vol_clip_min=0.5,
vol_clip_max=2.0,
):
"""
Calculate volatility with adaptive window sizes based on market conditions.
Returns a single volatility value for the most recent period.
Incorporates long-term volatility as a stability baseline and additional
reference point for regime detection.
"""
df = df.copy()
df["Log_Return"] = np.log(df["Close"]).diff()
# Remove any NaN values that could cause issues
df = df.dropna()
if len(df) < long_window:
# Not enough data, fall back to simple volatility
return df["Log_Return"].std()
# Get recent data for efficiency
lookback = max(long_window * 2, 360) # Use enough data for stable estimates
recent_df = df.iloc[-lookback:].copy() if len(df) > lookback else df.copy()
try:
# Initial volatility estimate using base windows
short_vol = recent_df["Log_Return"].ewm(span=short_window, adjust=False).std()
medium_vol = recent_df["Log_Return"].ewm(span=medium_window, adjust=False).std()
long_vol = recent_df["Log_Return"].ewm(span=long_window, adjust=False).std()
# Ensure we have valid volatility values
if short_vol.iloc[-1] == 0 or np.isnan(short_vol.iloc[-1]):
return df["Log_Return"].std() # Fallback to simple volatility
# Calculate regime indicators for recent period
medium_vol_mean = medium_vol.rolling(min(90, len(recent_df))).mean()
long_vol_mean = long_vol.rolling(min(180, len(recent_df))).mean()
if medium_vol_mean.iloc[-1] == 0:
vol_regime = pd.Series([1.0] * len(recent_df))
else:
# Compare short-term to both medium and long-term volatility
medium_regime = short_vol / medium_vol_mean
long_regime = short_vol / long_vol_mean
# Use the more conservative (higher) regime indicator
vol_regime = pd.concat([medium_regime, long_regime], axis=1).max(axis=1)
vol_regime = vol_regime.clip(vol_clip_min, vol_clip_max)
# Get most recent regime reading
latest_regime = vol_regime.iloc[-1]
# Adjust window sizes based on current regime
adj_factor = 1 / latest_regime
adj_short = max(10, int(short_window * adj_factor)) # Minimum window of 10
adj_medium = max(30, int(medium_window * adj_factor))
adj_long = max(60, int(long_window * adj_factor))
# Calculate final volatilities using adjusted windows
final_short = recent_df["Log_Return"].iloc[-adj_short:].std()
final_medium = recent_df["Log_Return"].iloc[-adj_medium:].std()
final_long = recent_df["Log_Return"].iloc[-adj_long:].std()
# If any volatility measure is NaN or 0, fall back to simple volatility
if np.isnan([final_short, final_medium, final_long]).any() or 0 in [
final_short,
final_medium,
final_long,
]:
return df["Log_Return"].std()
# Calculate regime-based weights, now incorporating long-term volatility
high_vol_weight = (latest_regime - vol_clip_min) / (vol_clip_max - vol_clip_min)
base_weights = np.array([0.2, 0.5, 0.3]) # Short, medium, long weights
stress_weights = np.array(
[0.4, 0.4, 0.2]
) # More weight on short-term during stress
# Interpolate between base and stress weights
weights = (
base_weights * (1 - high_vol_weight) + stress_weights * high_vol_weight
)
# Calculate final volatility using all three timeframes
final_vol = (
final_short * weights[0]
+ final_medium * weights[1]
+ final_long * weights[2]
)
# Add uncertainty adjustment based on regime changes
regime_change = abs(vol_regime.diff()).fillna(0)
regime_change_mean = regime_change.rolling(5, min_periods=1).mean().iloc[-1]
if regime_change_mean == 0:
uncertainty_adjustment = 1.0
else:
regime_change_zscore = regime_change.iloc[-1] / regime_change_mean
uncertainty_adjustment = 1 + np.clip(regime_change_zscore / 2, 0, 0.5)
return max(final_vol * uncertainty_adjustment, df["Log_Return"].std() * 0.5)
except Exception as e:
print(f"Error in adaptive volatility calculation: {e}")
# Fall back to simple volatility calculation
return df["Log_Return"].std()
def calculate_volatility(df, short_window=30, medium_window=90, long_window=180):
"""Calculate volatility using fundamental metrics and adaptive windows."""
df = df.copy()
df["Log_Return"] = np.log(df["Close"]).diff()
if len(df) < 30:
return 0.02 # Reasonable default for very short periods
try:
# Initialize fundamentals calculator
fundamentals = MarketFundamentals()
current_date = df["Date"].max()
# Get recent data for efficiency
lookback = max(long_window * 2, 360)
recent_df = df.iloc[-lookback:].copy() if len(df) > lookback else df.copy()
# Calculate base volatilities
short_vol = recent_df["Log_Return"].ewm(span=short_window, adjust=False).std()
medium_vol = recent_df["Log_Return"].ewm(span=medium_window, adjust=False).std()
long_vol = recent_df["Log_Return"].ewm(span=long_window, adjust=False).std()
if short_vol.iloc[-1] == 0 or np.isnan(short_vol.iloc[-1]):
return df["Log_Return"].std()
# Calculate volatility regime indicators
medium_vol_mean = medium_vol.rolling(min(90, len(recent_df))).mean()
long_vol_mean = long_vol.rolling(min(180, len(recent_df))).mean()
# Compare short-term to both medium and long-term volatility
if medium_vol_mean.iloc[-1] == 0:
vol_regime = pd.Series([1.0] * len(recent_df))
else:
medium_regime = short_vol / medium_vol_mean
long_regime = short_vol / long_vol_mean
vol_regime = pd.concat([medium_regime, long_regime], axis=1).max(axis=1)
vol_regime = vol_regime.clip(0.5, 2.0)
latest_regime = vol_regime.iloc[-1]
# Get market metrics
metrics = fundamentals.get_market_maturity_metrics(df, current_date)
# Calculate adaptive weights
high_vol_weight = (latest_regime - 0.5) / 1.5 # 1.5 = 2.0 - 0.5
base_weights = np.array([0.2, 0.5, 0.3])
stress_weights = np.array([0.4, 0.4, 0.2])
weights = (
base_weights * (1 - high_vol_weight) + stress_weights * high_vol_weight
)
# Calculate final volatilities
final_short = recent_df["Log_Return"].iloc[-short_window:].std()
final_medium = recent_df["Log_Return"].iloc[-medium_window:].std()
final_long = recent_df["Log_Return"].iloc[-long_window:].std()
if np.isnan([final_short, final_medium, final_long]).any() or 0 in [
final_short,
final_medium,
final_long,
]:
return df["Log_Return"].std()
# Apply market-based adjustment
market_adjustment = fundamentals.calculate_volatility_adjustment(metrics)
# Calculate final volatility
final_vol = (
final_short * weights[0]
+ final_medium * weights[1]
+ final_long * weights[2]
) * market_adjustment
return max(final_vol, df["Log_Return"].std() * 0.5)
except Exception as e:
print(f"Error in volatility calculation: {e}")
return df["Log_Return"].std()
def adjust_trend_expectations(expected_returns, cycle_position):
"""
Simple trend adjustment.
"""
if cycle_position > 0.75:
damping_factor = 0.70
else:
damping_factor = 0.85
return expected_returns * damping_factor
def calculate_market_conditions(df, lookback_window=180):
"""
Calculate market condition metrics to inform uncertainty scaling.
"""
df = df.copy() # Avoid modifying original dataframe
metrics = {}
# Use log returns for stability
df["Log_Return"] = np.log(df["Close"]).diff()
# Handle initial NaN values
df["Log_Return"] = df["Log_Return"].fillna(method="bfill")
# Recent vs historical volatility ratio
recent_vol = max(df["Log_Return"].tail(30).std(), 1e-6) # Prevent division by zero
historical_vol = max(df["Log_Return"].tail(lookback_window).std(), 1e-6)
metrics["vol_ratio"] = recent_vol / historical_vol
# Trend strength using log prices
log_prices = np.log(df["Close"])
ma50 = log_prices.rolling(50, min_periods=1).mean()
ma200 = log_prices.rolling(200, min_periods=1).mean()
metrics["trend_strength"] = (ma50.iloc[-1] - ma200.iloc[-1]) / historical_vol
# Drawdown intensity
rolling_max = df["Close"].rolling(lookback_window, min_periods=1).max()
current_drawdown = df["Close"].iloc[-1] / rolling_max.iloc[-1] - 1
metrics["drawdown"] = abs(min(current_drawdown, 0))
return metrics
def get_projection_adjustments(days_forward, current_cycle_position, df):
"""Enhanced projection adjustments using market fundamentals."""
adjustments = np.ones(days_forward)
fundamentals = MarketFundamentals()
# Pre-calculate metrics
lookback = min(365, len(df))
historical_metrics = [
fundamentals.get_market_maturity_metrics(df, date)
for date in df["Date"].tail(lookback)
]
historical_avg_volume_to_supply = np.mean(
[m["volume_to_supply"] for m in historical_metrics]
)
current_metrics = fundamentals.get_market_maturity_metrics(df, df["Date"].max())
base_uncertainty = 0.016 * (1 + current_metrics["supply_growth_rate"] * 365)
vol_factor = 1 + 0.2 * abs(1 - current_metrics["volume_to_supply"] * 50)
market_depth_factor = 1 / np.sqrt(1 + current_metrics["market_depth"] * 5)
s2f_factor = 1 / np.log1p(current_metrics["stock_to_flow"])
regime_scale = np.clip(
current_metrics["volume_to_supply"] / historical_avg_volume_to_supply, 0.88, 1.2
)
for i in range(days_forward):
time_factor = min(
1
+ (i / 365)
* base_uncertainty
* vol_factor
* market_depth_factor
* s2f_factor,
1.20,
)
cycle_position = (current_cycle_position + i / 1460) % 1
local_cycle_factor = 1.15 if cycle_position > 0.75 else 1.0
adjustments[i] = time_factor * local_cycle_factor * regime_scale
adjustments[i] = max(adjustments[i], 1.02 + (i / 365) * 0.01)
return adjustments
def calculate_confidence_intervals(simulated_paths, confidence_levels=[0.95, 0.68]):
"""
Calculate confidence intervals with dynamic quantile selection based on market conditions.
"""
results = {}
for level in confidence_levels:
# Calculate standard error of the median
median_std = np.std(
[np.median(simulated_paths[:, i]) for i in range(simulated_paths.shape[1])]
)
# Adjust quantiles based on estimation uncertainty
adjustment = min(0.1, median_std / np.median(simulated_paths)) # Cap adjustment
# Widen intervals slightly when uncertainty is high
effective_level = level + (1 - level) * adjustment
lower_percentile = (1 - effective_level) * 100 / 2
upper_percentile = 100 - lower_percentile
results[f"Lower_{int(level*100)}"] = np.percentile(
simulated_paths, lower_percentile, axis=1
)
results[f"Upper_{int(level*100)}"] = np.percentile(
simulated_paths, upper_percentile, axis=1
)
return results
def project_prices(
df, days_forward=365, simulations=1000, confidence_levels=[0.95, 0.68]
):
"""Generate price projections with fundamental-based adjustments."""
df = df.copy()
fundamentals = MarketFundamentals()
df["Log_Price"] = np.log(df["Close"])
df["Log_Return"] = df["Log_Price"].diff()
# Get halving dates and current cycle position
halving_dates = get_halving_dates()
current_date = df["Date"].max()
cycle_position = get_cycle_position(current_date, halving_dates)
current_cycle_days = int(cycle_position * 4 * 365)
# Current price and date
last_price = df["Close"].iloc[-1]
last_date = df["Date"].iloc[-1]
# Generate projection dates
future_dates = pd.date_range(
start=last_date + timedelta(days=1), periods=days_forward, freq="D"
)
# Calculate expected returns
future_cycle_days = [
(current_cycle_days + i) % (4 * 365) for i in range(days_forward)
]
cycle_trends = analyze_trends(df)
expected_returns = np.array(
[cycle_trends.get(day, cycle_trends.mean()) for day in future_cycle_days]
)
# Calculate base volatility
base_volatility = calculate_volatility(df)
# Get projection adjustments
projection_adjustments = get_projection_adjustments(
days_forward, cycle_position, df
)
# Run Monte Carlo simulation
np.random.seed(42) # Restored for reproducibility
simulated_paths = np.zeros((days_forward, simulations))
for sim in range(simulations):
drift = expected_returns
vol = base_volatility
time_scaled_vol = vol * projection_adjustments
returns = np.random.normal(loc=drift, scale=time_scaled_vol, size=days_forward)
cumulative_returns = np.cumsum(returns)
price_path = last_price * np.exp(cumulative_returns)
simulated_paths[:, sim] = price_path
# Calculate results
results = pd.DataFrame(index=future_dates)
results["Median"] = np.percentile(simulated_paths, 50, axis=1)
results["Expected_Trend"] = last_price * np.exp(np.cumsum(drift))
# Calculate confidence intervals
for level in confidence_levels:
metrics = fundamentals.get_market_maturity_metrics(df, current_date)
# Calculate intervals by projection horizon
lower_bounds = []
upper_bounds = []
for day in range(days_forward):
time_factor = (
0.85 if day > 365 else 0.9 if day > 180 else 0.95 if day > 90 else 1.0
)
effective_level = (
fundamentals.calculate_confidence_adjustment(metrics, level)
* time_factor
)
lower_percentile = (1 - effective_level) * 100 / 2
upper_percentile = 100 - lower_percentile
lower_bounds.append(
np.percentile(simulated_paths[day, :], lower_percentile)
)
upper_bounds.append(
np.percentile(simulated_paths[day, :], upper_percentile)
)
results[f"Lower_{int(level*100)}"] = lower_bounds
results[f"Upper_{int(level*100)}"] = upper_bounds
return results
def analyze_bitcoin_prices(csv_path):
"""
Analyze Bitcoin price data to calculate volatility and growth rates.
"""
# Read CSV with proper data types
df = pd.read_csv(csv_path, parse_dates=[0])
# Print first few rows of raw data to inspect
print("\nFirst few rows of raw data:")
print(df.head())
# Print data info to see types and non-null counts
print("\nDataset Info:")
print(df.info())
# Convert price columns to float and handle any potential formatting issues
numeric_columns = ["Price", "Open", "High", "Low", "Vol."] # Added Volume
for col in numeric_columns:
# Remove any commas and 'K'/'M' suffixes
df[col] = df[col].astype(str).str.replace(",", "")
# Convert K to thousands
df[col] = df[col].str.replace("K", "e3")
# Convert M to millions
df[col] = df[col].str.replace("M", "e6")
# Convert B to billions
df[col] = df[col].str.replace("B", "e9")
# Convert to numeric
df[col] = pd.to_numeric(df[col], errors="coerce")
# Rename columns for clarity
df.columns = ["Date", "Close", "Open", "High", "Low", "Volume", "Change"]
# Sort by date in ascending order
df = df.sort_values("Date")
# Print summary statistics after conversion
print("\nPrice Summary After Conversion:")
print(df[["Close", "Open", "High", "Low", "Volume"]].describe())
# Calculate daily returns
df["Daily_Return"] = df["Close"].pct_change()
# Print first few daily returns to verify calculation
print("\nFirst few daily returns:")
print(df[["Date", "Close", "Daily_Return"]].head())
# Check for any infinite or NaN values
print("\nInfinite or NaN value counts:")
print(df.isna().sum())
# Calculate metrics using 365 days for annualization
analysis = {
"period_start": df["Date"].min().strftime("%Y-%m-%d"),
"period_end": df["Date"].max().strftime("%Y-%m-%d"),
"total_days": len(df),
"daily_volatility": df["Daily_Return"].std(),
"annualized_volatility": df["Daily_Return"].std() * np.sqrt(365),
"total_return": (df["Close"].iloc[-1] / df["Close"].iloc[0] - 1) * 100,
"average_daily_return": df["Daily_Return"].mean() * 100,
"average_annual_return": ((1 + df["Daily_Return"].mean()) ** 365 - 1) * 100,
"min_price": df["Low"].min(),
"max_price": df["High"].max(),
"avg_price": df["Close"].mean(),
"start_price": df["Close"].iloc[0],
"end_price": df["Close"].iloc[-1],
}
# Calculate rolling metrics
df["Rolling_Volatility_30d"] = df["Daily_Return"].rolling(
window=30
).std() * np.sqrt(365)
df["Rolling_Return_30d"] = df["Close"].pct_change(periods=30) * 100
return analysis, df
# Main plotting functions
def add_cdpr_plot(df, output: Output):
"""
Add a plot showing the Compounding Daily Periodic Rate (CDPR) over different time periods.
Also includes a logarithmic price axis on the right side.
"""
plt.style.use("seaborn-v0_8")
fig, ax1 = plt.subplots(figsize=(15, 6))
# Calculate CDPR for different time periods
periods = [180, 360, 720]
cdpr = {}
# Find the longest CDPR series length
max_period = max(periods)
daily_returns = df["Close"].pct_change().fillna(0)
for period in periods:
cdpr[f"{period}d CDPR"] = (
daily_returns.rolling(period).apply(
lambda x: (1 + x).prod() ** (1 / period) - 1, raw=True
)
) * 100
# Clip all CDPR series to the length of the longest one
cdpr[f"{period}d CDPR"] = cdpr[f"{period}d CDPR"][max_period:]
# Find the non-NaN min and max CDPR values
cdpr_values = [values for values in cdpr.values()]
min_cdpr = np.nanmin([np.nanmin(values) for values in cdpr_values])
max_cdpr = np.nanmax([np.nanmax(values) for values in cdpr_values])
# Ensure x-axis (dates) and y-axis (CDPR) have the same length
start_date = df["Date"].iloc[max_period:].min()
end_date = df["Date"].max()
plot_dates = pd.date_range(start=start_date, end=end_date, freq="D")
# Plot CDPR lines on the left axis
for label, values in cdpr.items():
ax1.plot(plot_dates, values, label=label)
# Customize the left axis (CDPR)
ax1.set_xlabel("Date")
ax1.set_ylabel("CDPR (%)")
ax1.grid(True, alpha=0.3)
# Adjust y-axis tick marks and add shaded lines between ticks
yticks = list(np.arange(int(min_cdpr), int(max_cdpr) + 1, 0.5))
ax1.set_yticks(yticks)
ax1.tick_params(axis="y", which="major", labelsize=8)
ax1.set_yticklabels(["{:.1f}%".format(y) for y in yticks])
# Add shaded lines between tick marks
for i in range(1, len(yticks)):
ax1.axhline(
y=yticks[i], color="lightgray", linestyle="--", linewidth=1, alpha=0.5
)
# Create the right axis for price
ax2 = ax1.twinx()
# Plot price on the right axis
price_data = df["Close"].iloc[max_period:]
ax2.semilogy(
plot_dates, price_data, color="#FF6B6B", linewidth=1.5, label="Price (USD)"
)
ax2.set_ylabel("Price (USD)", color="#FF6B6B")
ax2.tick_params(axis="y", labelcolor="#FF6B6B")
# Set human-readable price ticks
price_ticks = [1, 10, 100, 1000, 10000, 100000]
price_labels = ["$1", "$10", "$100", "$1k", "$10k", "$100k"]
ax2.set_yticks(price_ticks)
ax2.set_yticklabels(price_labels)
# Add legends for both axes
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left")
plt.title("Compounding Daily Periodic Rate (CDPR) and Price")
# Save the plot
filename = output.named("bitcoin_cdpr_plot.png")
plt.tight_layout()
plt.savefig(filename, dpi=150, bbox_inches="tight")
plt.close()
def create_plots(df, output: Output, start=None, end=None, project_days=365):
"""
Create enhanced plots including market maturity visualization.
"""
# Add the new CDPR plot
add_cdpr_plot(df, output)
# Filter data based on date range
mask = pd.Series(True, index=df.index)
if start:
mask &= df["Date"] >= pd.to_datetime(start)
if end:
mask &= df["Date"] <= pd.to_datetime(end)
plot_df = df[mask].copy()
if len(plot_df) == 0:
raise ValueError("No data found for the specified date range")
# Generate projections
projections = project_prices(plot_df, days_forward=project_days)
# Set up the style
plt.style.use("seaborn-v0_8")
# Create figure with adjusted size and spacing
fig = plt.figure(figsize=(15, 15))
# Use GridSpec for better control over subplot spacing
gs = plt.GridSpec(5, 1, height_ratios=[3, 1.5, 1.5, 1.5, 2], hspace=0.4)
# Date range for titles
hist_date_range = f" ({plot_df['Date'].min().strftime('%Y-%m-%d')} to {plot_df['Date'].max().strftime('%Y-%m-%d')})"
# Calculate full date range including projections
full_date_range = pd.date_range(plot_df["Date"].min(), projections.index.max())
# 1. Price history and projections (log scale)
ax1 = fig.add_subplot(gs[0])
# Plot historical prices
ax1.semilogy(plot_df["Date"], plot_df["Close"], "b-", label="Historical Price")
# Plot projections
ax1.semilogy(
projections.index,
projections["Expected_Trend"],
"--",
color="purple",
label="Expected Trend",
)
ax1.semilogy(
projections.index,
projections["Median"],
":",
color="green",
label="Simulated Median",
)
ax1.fill_between(
projections.index,
projections["Lower_95"],
projections["Upper_95"],
alpha=0.2,
color="orange",
label="95% Confidence Interval",
)
ax1.fill_between(
projections.index,
projections["Lower_68"],
projections["Upper_68"],
alpha=0.3,
color="green",
label="68% Confidence Interval",
)
# Customize y-axis
ax1.yaxis.set_major_formatter(plt.FuncFormatter(format_price))
min_price = min(plot_df["Low"].min(), projections["Lower_95"].min())
max_price = max(plot_df["High"].max(), projections["Upper_95"].max())
price_points = get_nice_price_points(min_price, max_price)
ax1.set_yticks(price_points)
ax1.tick_params(axis="y", labelsize=8)
ax1.margins(y=0.02)
ax1.grid(True, which="major", linestyle="-", alpha=0.5)
ax1.grid(True, which="minor", linestyle=":", alpha=0.2)
ax1.set_title("Bitcoin Price History and Projections (Log Scale)" + hist_date_range)
ax1.legend(fontsize=8)
# Set x-axis limits to full range
ax1.set_xlim(full_date_range[0], full_date_range[-1])
ax1.tick_params(axis="x", rotation=45)
# 3. Rolling volatility
ax3 = fig.add_subplot(gs[1])
ax3.plot(
plot_df["Date"],
plot_df["Rolling_Volatility_30d"],
"r-",
label="30-Day Rolling Volatility",
)
# Add empty space to match price plot x-axis
ax3.set_xlim(full_date_range[0], full_date_range[-1])
# Add vertical line to mark start of projections
ax3.axvline(plot_df["Date"].max(), color="gray", linestyle="--", alpha=0.5)
ax3.text(
plot_df["Date"].max(),
ax3.get_ylim()[1],
"Projection Start",
rotation=90,
va="top",
ha="right",
alpha=0.7,
)
ax3.set_title("30-Day Rolling Volatility (Annualized)" + hist_date_range)
ax3.set_ylabel("Volatility")
ax3.grid(True)
ax3.yaxis.set_major_formatter(plt.FuncFormatter(lambda y, _: "{:.0%}".format(y)))
ax3.legend()
ax3.tick_params(axis="x", rotation=45)
# 4. Returns distribution
ax4 = fig.add_subplot(gs[2])
returns_mean = plot_df["Daily_Return"].mean()
returns_std = plot_df["Daily_Return"].std()
filtered_returns = plot_df["Daily_Return"][
(plot_df["Daily_Return"] > returns_mean - 5 * returns_std)
& (plot_df["Daily_Return"] < returns_mean + 5 * returns_std)
]
sns.histplot(filtered_returns, bins=100, ax=ax4)
ax4.set_title(
"Distribution of Daily Returns (Excluding Extreme Outliers)" + hist_date_range
)
ax4.set_xlabel("Daily Return")
ax4.set_ylabel("Count")
ax4.xaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: "{:.0%}".format(x)))
# Add mean line
ax4.axvline(filtered_returns.mean(), color="r", linestyle="dashed", linewidth=1)
ax4.text(
filtered_returns.mean(),
ax4.get_ylim()[1],
"Mean",
rotation=90,
va="top",
ha="right",
)
# 5. Projection ranges
ax5 = fig.add_subplot(gs[3:]) # Use last two grid spaces for larger plot
timepoints = np.array(range(30, project_days, 30))
timepoints = timepoints[timepoints <= project_days]
ranges = []
labels = []
positions = []
for t in timepoints:
idx = t - 1
ranges.extend(
[
projections["Lower_95"].iloc[idx],
projections["Lower_68"].iloc[idx],
projections["Median"].iloc[idx],
projections["Upper_68"].iloc[idx],
projections["Upper_95"].iloc[idx],
]
)
labels.extend(["95% Lower", "68% Lower", "Median", "68% Upper", "95% Upper"])
positions.extend([t] * 5)
ax5.scatter(positions, ranges, alpha=0.6)
for t in timepoints:
idx = positions.index(t)
ax5.plot([t] * 5, ranges[idx : idx + 5], "k-", alpha=0.3)
ax5.set_yscale("log")
min_price = min(ranges)
max_price = max(ranges)
price_points = get_nice_price_points(min_price, max_price)
ax5.set_yticks(price_points)
ax5.yaxis.set_major_formatter(plt.FuncFormatter(format_price))
ax5.set_title("Projected Price Ranges at Future Timepoints")
ax5.set_xlabel("Days Forward")
ax5.set_ylabel("Price (USD)")
ax5.grid(True, alpha=0.3)
ax5.set_xticks(timepoints)
# Save the plot
start_str = start if start else plot_df["Date"].min().strftime("%Y-%m-%d")
end_str = end if end else plot_df["Date"].max().strftime("%Y-%m-%d")
filename = output.named(
f"bitcoin_analysis_{start_str}_to_{end_str}_with_projections.png"
)
# Use tight_layout with adjusted parameters
plt.tight_layout(pad=2.0)
plt.savefig(filename, dpi=300, bbox_inches="tight")
plt.close()
return projections
def visualize_cycle_patterns(df, output: Output, cycle_returns, cycle_volatility):
"""
Create enhanced visualization of Bitcoin's behavior across halving cycles.
"""
plt.style.use("seaborn-v0_8")
fig = plt.figure(figsize=(15, 15))
# Create a 3x1 subplot grid with different heights
gs = plt.GridSpec(3, 1, height_ratios=[2, 1, 2], hspace=0.3)
# Plot 1: Returns across cycle with confidence bands
ax1 = plt.subplot(gs[0])
# Convert days to percentage through cycle
x_points = np.array(cycle_returns.index) / (4 * 365) * 100
# Calculate rolling mean and standard deviation for confidence bands
window = 30 # 30-day window
rolling_mean = pd.Series(cycle_returns.values).rolling(window=window).mean()
rolling_std = pd.Series(cycle_returns.values).rolling(window=window).std()
# Plot confidence bands
ax1.fill_between(
x_points,
(rolling_mean - 2 * rolling_std) * 100,
(rolling_mean + 2 * rolling_std) * 100,
alpha=0.2,
color="blue",
label="95% Confidence",
)
ax1.fill_between(
x_points,
(rolling_mean - rolling_std) * 100,
(rolling_mean + rolling_std) * 100,
alpha=0.3,
color="blue",
label="68% Confidence",
)
# Plot average returns
ax1.plot(
x_points,
cycle_returns.values * 100,
"b-",
label="Average Daily Return",
linewidth=2,
)
ax1.axhline(y=0, color="gray", linestyle="--", alpha=0.5)
# Add vertical lines for each year in cycle
for year in range(1, 4):
ax1.axvline(x=year * 25, color="gray", linestyle=":", alpha=0.3)
ax1.text(
year * 25,
ax1.get_ylim()[1],
f"Year {year}",
rotation=90,
va="top",
ha="right",
alpha=0.7,
)
# Highlight halving points
ax1.axvline(x=0, color="red", linestyle="--", alpha=0.5, label="Halving Event")
ax1.axvline(x=100, color="red", linestyle="--", alpha=0.5)
ax1.set_title("Bitcoin Return Patterns Across Halving Cycle", pad=20)
ax1.set_xlabel("Position in Cycle (%)")
ax1.set_ylabel("Average Daily Return (%)")
ax1.grid(True, alpha=0.3)
ax1.legend(loc="upper right")
# Plot 2: Volatility across cycle
ax2 = plt.subplot(gs[1])
# Calculate rolling volatility confidence bands
vol_mean = pd.Series(cycle_volatility.values).rolling(window=window).mean()
vol_std = pd.Series(cycle_volatility.values).rolling(window=window).std()
# Plot volatility with confidence bands
annualized_factor = np.sqrt(365) * 100
ax2.fill_between(
x_points,
(vol_mean - 2 * vol_std) * annualized_factor,
(vol_mean + 2 * vol_std) * annualized_factor,
alpha=0.2,
color="red",
label="95% Confidence",
)
ax2.plot(
x_points,
cycle_volatility.values * annualized_factor,
"r-",
label="Annualized Volatility",
linewidth=2,
)
# Add year markers
for year in range(1, 4):
ax2.axvline(x=year * 25, color="gray", linestyle=":", alpha=0.3)
ax2.axvline(x=0, color="red", linestyle="--", alpha=0.5)
ax2.axvline(x=100, color="red", linestyle="--", alpha=0.5)
ax2.set_xlabel("Position in Cycle (%)")
ax2.set_ylabel("Volatility (%)")
ax2.grid(True, alpha=0.3)
ax2.legend(loc="upper right")
# Plot 3: Average price trajectory within cycles
ax3 = plt.subplot(gs[2])
# Define a color scheme for cycles
cycle_colors = ["#1f77b4", "#ff7f0e", "#2ca02c", "#d62728", "#9467bd"]
# Calculate average price path for each cycle
halving_dates = get_halving_dates()
cycles = []
for i in range(len(halving_dates) - 1):
cycle_start = halving_dates[i]
cycle_end = halving_dates[i + 1]
cycle_data = df[(df["Date"] >= cycle_start) & (df["Date"] < cycle_end)].copy()
if len(cycle_data) > 0:
cycle_data["Cycle_Pct"] = (
(cycle_data["Date"] - cycle_start).dt.total_seconds()
/ (cycle_end - cycle_start).total_seconds()
* 100
)
cycle_data["Normalized_Price"] = (
cycle_data["Close"] / cycle_data["Close"].iloc[0]
)
cycles.append(cycle_data)
# Plot each historical cycle with distinct colors
for i, cycle in enumerate(cycles):
ax3.semilogy(
cycle["Cycle_Pct"],
cycle["Normalized_Price"],
color=cycle_colors[i],
alpha=0.7,
label=f'Cycle {i+1} ({cycle["Date"].iloc[0].strftime("%Y")}-{cycle["Date"].iloc[-1].strftime("%Y")})',
)
# Calculate and plot average cycle
if cycles:
avg_cycle = pd.concat(
[c.set_index("Cycle_Pct")["Normalized_Price"] for c in cycles], axis=1
)
avg_cycle_mean = avg_cycle.mean(axis=1)
avg_cycle_std = avg_cycle.std(axis=1)
ax3.semilogy(
avg_cycle_mean.index,
avg_cycle_mean.values,
"k-",
linewidth=2,
label="Average Cycle",
)
ax3.fill_between(
avg_cycle_mean.index,
avg_cycle_mean * np.exp(-2 * avg_cycle_std),
avg_cycle_mean * np.exp(2 * avg_cycle_std),
alpha=0.2,
color="gray",
)
# Add year markers
for year in range(1, 4):
ax3.axvline(x=year * 25, color="gray", linestyle=":", alpha=0.3)
ax3.axvline(x=0, color="red", linestyle="--", alpha=0.5)
ax3.axvline(x=100, color="red", linestyle="--", alpha=0.5)
ax3.set_title("Price Performance Across Cycles (Normalized)", pad=20)
ax3.set_xlabel("Position in Cycle (%)")
ax3.set_ylabel("Price (Relative to Cycle Start)")
ax3.grid(True, alpha=0.3)
ax3.legend(loc="center left", bbox_to_anchor=(1.02, 0.5))
# Add current cycle position marker on all plots
current_position = get_cycle_position(df["Date"].max(), halving_dates) * 100
for ax in [ax1, ax2, ax3]:
ax.axvline(
x=current_position,
color="green",
linestyle="-",
alpha=0.5,
label="Current Position",
)
# Main title for the figure
fig.suptitle("Bitcoin Halving Cycle Analysis", fontsize=16, y=0.95)
# Adjust layout to prevent legend cutoff
plt.tight_layout()
# Save the plot
filename = output.named("bitcoin_cycle_patterns.png")
plt.savefig(filename, dpi=300, bbox_inches="tight")
plt.close()
def create_backtest_plot(
df,
output: Output,
backtest_date="2020-05-11",
start_date="2012-11-28",
project_days=1650,
):
"""
Create a plot comparing actual price history against model projections from a historical date.
Returns both the projections and performance metrics.
Args:
df: DataFrame with historical price data
backtest_date: Date to start the backtest from
start_date: Date to start considering historical data
project_days: Number of days to project forward from backtest date
Returns:
tuple: (projections DataFrame, metrics dictionary)
"""
# Convert dates to datetime
backtest_date = pd.to_datetime(backtest_date)
start_date = pd.to_datetime(start_date)
# Validate dates
if start_date >= backtest_date:
raise ValueError("start_date must be earlier than backtest_date")
# Clean the data: remove rows with zero or invalid prices and filter by date
df = df[(df["Close"] > 0) & (df["Date"] >= start_date)].copy()
# Split data into training (before backtest date) and validation (after backtest date)
training_df = df[df["Date"] <= backtest_date].copy()
validation_df = df[df["Date"] > backtest_date].copy()
# Check if we have enough data
if len(training_df) < 30: # Require at least 30 days of training data
raise ValueError("Insufficient training data before backtest date")
if len(validation_df) < project_days:
warnings.warn(
f"Validation period ({len(validation_df)} days) shorter than projection period ({project_days} days)"
)
# Generate historical projections using only training data
historical_projections = project_prices(training_df, days_forward=project_days)
# Set up the plot
plt.style.use("seaborn-v0_8")
_, ax = plt.figure(figsize=(15, 10)), plt.gca()
# Plot training data
heading_label = f'Historical Price (Training: {start_date.strftime("%Y-%m-%d")} to {backtest_date.strftime("%Y-%m-%d")})'
ax.semilogy(
training_df["Date"],
training_df["Close"],
"b-",
label=heading_label,
alpha=0.7,
)
# Plot validation data
ax.semilogy(
validation_df["Date"],
validation_df["Close"],
"g-",
label=f'Actual Price (Validation: {backtest_date.strftime("%Y-%m-%d")} onwards)',
linewidth=2,
)
# Plot projections
ax.semilogy(
historical_projections.index,
historical_projections["Expected_Trend"],
"--",
color="purple",
label="Model Projection (Expected)",
)
ax.semilogy(
historical_projections.index,
historical_projections["Median"],
":",
color="orange",
label="Model Projection (Median)",
)
# Add confidence intervals
ax.fill_between(
historical_projections.index,
historical_projections["Lower_95"],
historical_projections["Upper_95"],
alpha=0.2,
color="orange",
label="95% Confidence Interval",
)
ax.fill_between(
historical_projections.index,
historical_projections["Lower_68"],
historical_projections["Upper_68"],
alpha=0.3,
color="green",
label="68% Confidence Interval",
)
# Customize y-axis
ax.yaxis.set_major_formatter(plt.FuncFormatter(format_price))
# Set custom y-axis ticks
min_price = min(
df["Low"].min(),
historical_projections["Lower_95"].min(),
0.0001, # Set minimum price floor
)
max_price = max(df["High"].max(), historical_projections["Upper_95"].max())
price_points = get_nice_price_points(min_price, max_price)
ax.set_yticks(price_points)
# Add halving lines
halving_dates = get_halving_dates()
relevant_halvings = halving_dates[
(halving_dates >= start_date) & (halving_dates <= validation_df["Date"].max())
]
for date in relevant_halvings:
ax.axvline(date, color="red", linestyle="--", alpha=0.3)
ax.text(
date,
ax.get_ylim()[1],
"Halving",
rotation=90,
va="top",
ha="right",
alpha=0.7,
)
# Calculate model performance metrics
metrics = {}
if len(validation_df) > 0:
# Create a common date range for comparison
actual_prices = validation_df.set_index("Date")["Close"]
common_dates = actual_prices.index.intersection(historical_projections.index)
if len(common_dates) > 0:
actual_aligned = actual_prices[common_dates]
projections_aligned = historical_projections.loc[common_dates]
# Calculate metrics using aligned data
metrics = {
"mape": np.mean(
np.abs(
(actual_aligned - projections_aligned["Expected_Trend"])
/ actual_aligned
)
)
* 100,
"rmse": np.sqrt(
np.mean(
(actual_aligned - projections_aligned["Expected_Trend"]) ** 2
)
),
"max_error": np.max(
np.abs(actual_aligned - projections_aligned["Expected_Trend"])
),
"coverage_95": np.mean(
(actual_aligned >= projections_aligned["Lower_95"])
& (actual_aligned <= projections_aligned["Upper_95"])
)
* 100,
"coverage_68": np.mean(
(actual_aligned >= projections_aligned["Lower_68"])
& (actual_aligned <= projections_aligned["Upper_68"])
)
* 100,
}
# Add metrics to plot
metrics_text = (
f"Model Performance Metrics:\n"
f"MAPE: {metrics['mape']:.1f}%\n"
f"RMSE: ${metrics['rmse']:,.0f}\n"
f"Max Error: ${metrics['max_error']:,.0f}\n"
f"95% CI Coverage: {metrics['coverage_95']:.1f}%\n"
f"68% CI Coverage: {metrics['coverage_68']:.1f}%"
)
ax.text(
0.02,
0.98,
metrics_text,
transform=ax.transAxes,
verticalalignment="top",
bbox=dict(facecolor="white", alpha=0.8),
)
# Customize plot
ax.set_title(
f'Bitcoin Price: Model Backtest\nTraining: {start_date.strftime("%Y-%m-%d")} to {backtest_date.strftime("%Y-%m-%d")}'
)
ax.set_xlabel("Date")
ax.set_ylabel("Price (USD)")
ax.grid(True, which="major", linestyle="-", alpha=0.5)
ax.grid(True, which="minor", linestyle=":", alpha=0.2)
ax.legend(loc="center left", bbox_to_anchor=(1.02, 0.5))
# Adjust layout and save
plt.tight_layout()
filename = output.named(
f'bitcoin_backtest_{start_date.strftime("%Y%m%d")}_to_{backtest_date.strftime("%Y%m%d")}.png'
)
plt.savefig(filename, dpi=300, bbox_inches="tight")
plt.close()
return historical_projections, metrics
def run_projection(args):
df, start, output = args
_ = create_plots(df, output, start=start, project_days=365 * 4)
def run_projections(df, output: Output):
# Create main projection
projection_starts = [
"2011-01-01",
"2012-01-01",
"2013-01-01",
"2014-01-01",
"2015-01-01",
"2016-07-09",
]
args = [(df, start, output) for start in projection_starts]
with Pool() as pool:
pool.map(run_projection, args)
def run_single_backtest(args):
"""
Run a single backtest with the given parameters.
Must be defined at module level for multiprocessing.
Args:
args: tuple of (params dict, DataFrame)
"""
params, df, output = args
try:
# Create a copy of params without the description
backtest_params = params.copy()
backtest_params.pop("description", None)
projections, metrics = create_backtest_plot(df, output, **backtest_params)
# Ensure metrics has all required keys with default values
if metrics is None:
metrics = {}
default_metrics = {
"mape": 0.0,
"rmse": 0.0,
"max_error": 0.0,
"coverage_95": 0.0,
"coverage_68": 0.0,
}
# Update metrics with defaults for any missing keys
metrics = {**default_metrics, **metrics}
return {
"params": params,
"projections": projections,
"metrics": metrics,
"success": True,
}
except Exception as e:
print(
f"Error in backtest for period {params['description']}: {str(e)}"
) # Debug print
return {"params": params, "error": str(e), "success": False}
def run_systematic_backtests(
df, output: Output, validation_years=2, min_training_years=8
):
"""
Run a comprehensive suite of backtests with consistent validation periods.
Uses sliding windows for both start and end dates.
"""
# Convert years to days
validation_days = validation_years * 365
min_training_days = min_training_years * 365
# Define start date for reliable data
mature_start = pd.Timestamp("2011-01-01")
last_possible_start = df["Date"].max() - pd.Timedelta(
days=min_training_days + validation_days
)
end_date = df["Date"].max() - pd.Timedelta(days=validation_days)
if mature_start >= last_possible_start:
raise ValueError(
f"Insufficient data for backtesting with current parameters:\n"
f"- Data range: {mature_start} to {df['Date'].max()}\n"
f"- Minimum training period: {min_training_years} years\n"
f"- Validation period: {validation_years} years"
)
old_backtests = [
{
"start_date": "2016-07-09",
"backtest_date": "2024-04-19",
"project_days": validation_days,
"description": "Second until fourth halving",
},
{
"start_date": "2013-01-01", # Includes pre-futures for cycle learning
"backtest_date": "2020-05-11",
"project_days": validation_days,
"description": "Post-Futures Window with two cycles of training",
},
{
"start_date": "2014-01-01",
"backtest_date": "2021-12-31",
"project_days": validation_days,
"description": "Cross-Regime Test with two cycles of training",
},
{
"start_date": "2015-01-01",
"backtest_date": "2022-01-01",
"project_days": validation_days,
"description": "Recent Window focusing on post-2022 behavior",
},
]
backtest_periods = []
backtest_periods.extend(old_backtests)
# Generate backtest periods with sliding windows
window_start = mature_start
step = pd.Timedelta(days=180) # 6 month steps
while window_start <= last_possible_start:
backtest_date = window_start + pd.Timedelta(days=min_training_days)
backtest_periods.append(
{
"start_date": window_start.strftime("%Y-%m-%d"),
"backtest_date": backtest_date.strftime("%Y-%m-%d"),
"project_days": validation_days,
"description": f"Training {window_start.strftime('%Y-%m-%d')} to {backtest_date.strftime('%Y-%m-%d')}",
}
)
window_start += step
# Add specific periods of interest
special_periods = []
# Halving-based periods
halving_dates = get_halving_dates()
relevant_halvings = [
h
for h in halving_dates
if h < end_date and h > (mature_start + pd.Timedelta(days=min_training_days))
]
for halving in relevant_halvings:
earliest_start = halving - pd.Timedelta(days=min_training_days)
if earliest_start >= mature_start:
special_periods.append(
{
"start_date": earliest_start.strftime("%Y-%m-%d"),
"backtest_date": halving.strftime("%Y-%m-%d"),
"project_days": validation_days,
"description": f"Pre-halving {halving.strftime('%Y')}",
}
)
# Market structure change periods
important_dates = [
("2017-12-01", "Post-futures introduction"),
("2020-03-01", "Post-COVID crash"),
("2021-11-01", "Post-2021 peak"),
]
for date, description in important_dates:
test_date = pd.Timestamp(date)
if test_date < end_date:
earliest_start = test_date - pd.Timedelta(days=min_training_days)
if earliest_start >= mature_start:
special_periods.append(
{
"start_date": earliest_start.strftime("%Y-%m-%d"),
"backtest_date": date,
"project_days": validation_days,
"description": description,
}
)
# Combine and remove any duplicates
all_periods = backtest_periods + special_periods
unique_periods = []
seen_dates = set()
for period in all_periods:
key = f"{period['start_date']}_{period['backtest_date']}"
if key not in seen_dates:
unique_periods.append(period)
seen_dates.add(key)
if not unique_periods:
raise ValueError("No valid backtest periods found with current parameters")
# Sort periods by backtest date for clearer analysis
unique_periods.sort(key=lambda x: pd.Timestamp(x["backtest_date"]))
print("\nRunning backtests with:")
print(
f"- Start dates range: {unique_periods[0]['start_date']} to {unique_periods[-1]['start_date']}"
)
print(
f"- Backtest dates range: {unique_periods[0]['backtest_date']} to {unique_periods[-1]['backtest_date']}"
)
print(f"- Minimum training period: {min_training_years} years")
print(f"- Validation period: {validation_years} years")
print(f"- Number of test periods: {len(unique_periods)}")
print("\nTest periods:")
for period in unique_periods:
print(f"- {period['description']}")
# Create args tuples with params and DataFrame
args = [(params, df, output) for params in unique_periods]
# Use multiprocessing
with Pool() as pool:
results = pool.map(run_single_backtest, args)
# Analyze results
successful_tests = [r for r in results if r["success"]]
failed_tests = [r for r in results if not r["success"]]
# Define stress periods
stress_periods = {
# COVID crash and recovery
("2020-03-01", "2020-09-01"): "COVID crash period",
# 2021 peak and subsequent crash
("2021-11-01", "2022-06-01"): "2021 peak aftermath",
# Add more stress periods as needed
}
def is_stress_period(test_date):
"""Check if a test date falls in any stress period"""
test_date = pd.Timestamp(test_date)
for (start, end), _ in stress_periods.items():
if pd.Timestamp(start) <= test_date <= pd.Timestamp(end):
return True
return False
# Categorize results
normal_periods = []
stress_periods_results = []
for result in successful_tests:
if is_stress_period(result["params"]["backtest_date"]):
stress_periods_results.append(result)
else:
normal_periods.append(result)
# Calculate metrics for each category
def calculate_category_metrics(results):
if not results:
return None
return {
"count": len(results),
"mape": np.mean([r["metrics"]["mape"] for r in results]),
"rmse": np.mean([r["metrics"]["rmse"] for r in results]),
"max_error": np.mean([r["metrics"]["max_error"] for r in results]),
"coverage_95": np.mean([r["metrics"]["coverage_95"] for r in results]),
"coverage_68": np.mean([r["metrics"]["coverage_68"] for r in results]),
}
normal_metrics = calculate_category_metrics(normal_periods)
stress_metrics = calculate_category_metrics(stress_periods_results)
# Write detailed results
with output.create("bitcoin_backtest_results_summary.txt") as f:
f.write("Systematic Backtest Results\n")
f.write("==========================\n\n")
f.write("Configuration:\n")
f.write(f"- Minimum training period: {min_training_years} years\n")
f.write(f"- Validation period: {validation_years} years\n")
f.write(
f"- Start dates range: {unique_periods[0]['start_date']} to {unique_periods[-1]['start_date']}\n"
)
f.write(
f"- Backtest dates range: {unique_periods[0]['backtest_date']} to {unique_periods[-1]['backtest_date']}\n"
)
f.write(f"- Number of test periods: {len(unique_periods)}\n\n")
# Normal Periods
f.write("Normal Market Periods\n")
f.write("====================\n")
f.write(f"Number of periods: {len(normal_periods)}\n\n")
for result in normal_periods:
f.write("\n" + "=" * 50 + "\n")
f.write(f"Period: {result['params']['description']}\n")
f.write(
f"Training: {result['params']['start_date']} to {result['params']['backtest_date']}\n"
)
f.write(
f"Validation: {result['params']['backtest_date']} to {pd.Timestamp(result['params']['backtest_date']) + pd.Timedelta(days=validation_years*365):%Y-%m-%d}\n"
)
f.write("\nMetrics:\n")
for metric, value in result["metrics"].items():
if metric in ["mape", "coverage_95", "coverage_68"]:
f.write(f"- {metric}: {value:.1f}%\n")
else:
f.write(f"- {metric}: ${value:,.0f}\n")
f.write("\n")
if normal_metrics:
f.write("\nNormal Periods Aggregate Metrics:\n")
f.write(f"MAPE: {normal_metrics['mape']:.1f}%\n")
f.write(f"RMSE: ${normal_metrics['rmse']:,.0f}\n")
f.write(f"Average Max Error: ${normal_metrics['max_error']:,.0f}\n")
f.write(f"95% CI Coverage: {normal_metrics['coverage_95']:.1f}%\n")
f.write(f"68% CI Coverage: {normal_metrics['coverage_68']:.1f}%\n")
# Stress Periods
f.write("\n\nStress Periods\n")
f.write("=============\n")
f.write(f"Number of periods: {len(stress_periods_results)}\n\n")
for result in stress_periods_results:
f.write("\n" + "=" * 50 + "\n")
f.write(f"Period: {result['params']['description']}\n")
f.write(
f"Training: {result['params']['start_date']} to {result['params']['backtest_date']}\n"
)
f.write(
f"Validation: {result['params']['backtest_date']} to {pd.Timestamp(result['params']['backtest_date']) + pd.Timedelta(days=validation_years*365):%Y-%m-%d}\n"
)
f.write("\nMetrics:\n")
for metric, value in result["metrics"].items():
if metric in ["mape", "coverage_95", "coverage_68"]:
f.write(f"- {metric}: {value:.1f}%\n")
else:
f.write(f"- {metric}: ${value:,.0f}\n")
f.write("\n")
if stress_metrics:
f.write("\nStress Periods Aggregate Metrics:\n")
f.write(f"MAPE: {stress_metrics['mape']:.1f}%\n")
f.write(f"RMSE: ${stress_metrics['rmse']:,.0f}\n")
f.write(f"Average Max Error: ${stress_metrics['max_error']:,.0f}\n")
f.write(f"95% CI Coverage: {stress_metrics['coverage_95']:.1f}%\n")
f.write(f"68% CI Coverage: {stress_metrics['coverage_68']:.1f}%\n")
return (
normal_metrics,
stress_metrics,
normal_periods,
stress_periods_results,
failed_tests,
)
# CLI
def get_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
prog="model",
description="Bitcoin price model",
)
parser.add_argument(
"-o",
"--output",
help="output base directory",
default="./output",
)
parser.add_argument(
"-n",
"--name",
help="subdir of output base directory",
default="baseline",
)
return parser.parse_args()
def main():
args = get_args()
global output
output = Output(args.output, args.name)
analysis, df = analyze_bitcoin_prices("prices.csv")
run_projections(df, output)
normal_metrics, stress_metrics, normal_results, stress_results, failed_tests = (
run_systematic_backtests(df, output)
)
print("\nAggregate Metrics:")
print(f"Total backtests run: {normal_metrics['count']}")
print(f"Successful tests: {len(normal_results)}")
print(f"Failed tests: {len(failed_tests)}")
print("\nAverage Performance:")
print(f"MAPE: {normal_metrics['mape']:.1f}%")
print(f"RMSE: ${normal_metrics['rmse']:,.0f}")
print(f"95% CI Coverage: {normal_metrics['coverage_95']:.1f}%")
print(f"68% CI Coverage: {normal_metrics['coverage_68']:.1f}%")
print("\nFailed Tests:")
for test in failed_tests:
print(f"Period: {test['params']['description']}")
print(f"Error: {test['error']}\n")
if __name__ == "__main__":
main()