import pandas as pd import numpy as np from datetime import datetime, timedelta import matplotlib.pyplot as plt import seaborn as sns from scipy.stats import norm # Utility functions def get_halving_dates(): """Return known and projected Bitcoin halving dates""" return pd.to_datetime( [ "2008-01-03", # Bitcoin genesis block (treat as cycle start) "2012-11-28", # First halving "2016-07-09", # Second halving "2020-05-11", # Third halving "2024-04-19", # Fourth halving "2028-04-20", # Fifth halving (projected) ] ) def get_cycle_position(date, halving_dates): """ Calculate position in halving cycle (0 to 1) for a given date. 0 represents a halving event, 1 represents just before the next halving. """ # Convert date to datetime if it's not already date = pd.to_datetime(date) # Find the most recent halving before this date prev_halving = halving_dates[halving_dates <= date].max() if pd.isna(prev_halving): return 0.0 # For dates before first halving # Find next halving future_halvings = halving_dates[halving_dates > date] if len(future_halvings) == 0: # For dates after last known halving, use same cycle length as last known cycle last_cycle_length = (halving_dates[-1] - halving_dates[-2]).days days_since_halving = (date - halving_dates[-1]).days return min(days_since_halving / last_cycle_length, 1.0) next_halving = future_halvings.min() # Calculate position as fraction between halvings days_since_halving = (date - prev_halving).days cycle_length = (next_halving - prev_halving).days return min(days_since_halving / cycle_length, 1.0) def format_price(x, p): """Format large numbers in K, M, B format with appropriate precision""" if abs(x) >= 1e9: return f"${x/1e9:.1f}B" if abs(x) >= 1e6: return f"${x/1e6:.1f}M" if abs(x) >= 1e3: return f"${x/1e3:.1f}K" if abs(x) >= 1: return f"${x:.0f}" return f"${x:.2f}" # For values less than $1, show cents def get_nice_price_points(min_price, max_price): """ Generate a reasonable set of price points for the y-axis that look clean and cover the range without cluttering the chart. """ log_min = np.floor(np.log10(min_price)) log_max = np.ceil(np.log10(max_price)) price_points = [] # For very large ranges (spanning more than 4 orders of magnitude), # only use powers of 10 and mid-points if log_max - log_min > 4: for exp in range(int(log_min), int(log_max + 1)): base = 10**exp # Add main power of 10 if min_price <= base <= max_price: price_points.append(base) # Add mid-point if range is large enough if min_price <= base * 5 <= max_price and exp > log_min: price_points.append(base * 5) else: # For smaller ranges, use 1, 2, 5 sequence for exp in range(int(log_min), int(log_max + 1)): for mult in [1, 2, 5]: point = mult * 10**exp if min_price <= point <= max_price: price_points.append(point) return np.array(price_points) # Analysis functions def analyze_cycles_with_halvings(df): """Analyze Bitcoin market cycles aligned with halving events""" df = df.copy() # Get halving dates halving_dates = get_halving_dates() # Calculate cycle position for each date df["Cycle_Position"] = df["Date"].apply( lambda x: get_cycle_position(x, halving_dates) ) # Convert to days within cycle (0 to ~1460 days) df["Cycle_Days"] = (df["Cycle_Position"] * 4 * 365).round().astype(int) # Calculate returns at different scales df["Returns_30d"] = df["Close"].pct_change(periods=30) df["Returns_90d"] = df["Close"].pct_change(periods=90) df["Returns_365d"] = df["Close"].pct_change(periods=365) # Group by position in cycle and calculate average returns cycle_returns = df.groupby(df["Cycle_Days"])["Daily_Return"].mean() cycle_volatility = df.groupby(df["Cycle_Days"])["Daily_Return"].std() # Smooth the cycle returns to reduce noise from scipy.signal import savgol_filter window = 91 # About 3 months if len(cycle_returns) > window: cycle_returns = pd.Series( savgol_filter(cycle_returns, window, 3), index=cycle_returns.index ) return cycle_returns, cycle_volatility def project_prices_with_cycles( df, days_forward=365, simulations=1000, confidence_levels=[0.95, 0.68] ): """ Project future Bitcoin prices using Monte Carlo simulation with halving-aligned cycles. """ # Analyze historical cycles cycle_returns, cycle_volatility = analyze_cycles_with_halvings(df) # Get current position in halving cycle halving_dates = get_halving_dates() current_date = df["Date"].max() cycle_position = get_cycle_position(current_date, halving_dates) current_cycle_days = int(cycle_position * 4 * 365) # Current price (last known price) last_price = df["Close"].iloc[-1] last_date = df["Date"].iloc[-1] # Generate dates for projection future_dates = pd.date_range( start=last_date + timedelta(days=1), periods=days_forward, freq="D" ) # Calculate expected returns for future dates based on cycle position future_cycle_days = [ (current_cycle_days + i) % (4 * 365) for i in range(days_forward) ] expected_returns = np.array( [cycle_returns.get(day, cycle_returns.mean()) for day in future_cycle_days] ) # Calculate base volatility (recent) recent_volatility = df["Daily_Return"].tail(90).std() # Add long-term trend component (very gentle decay) long_term_decay = 0.9 ** (np.arange(days_forward) / 365) # 10% reduction per year expected_returns = expected_returns * long_term_decay # Run Monte Carlo simulation np.random.seed(42) # For reproducibility simulated_paths = np.zeros((days_forward, simulations)) for sim in range(simulations): # Generate random returns using cycle-aware expected returns returns = np.random.normal( loc=expected_returns, scale=recent_volatility, size=days_forward ) # Calculate price path price_path = last_price * np.exp(np.cumsum(returns)) simulated_paths[:, sim] = price_path # Calculate percentiles for confidence intervals results = pd.DataFrame(index=future_dates) results["Median"] = np.percentile(simulated_paths, 50, axis=1) for level in confidence_levels: lower_percentile = (1 - level) * 100 / 2 upper_percentile = 100 - lower_percentile results[f"Lower_{int(level*100)}"] = np.percentile( simulated_paths, lower_percentile, axis=1 ) results[f"Upper_{int(level*100)}"] = np.percentile( simulated_paths, upper_percentile, axis=1 ) # Add expected trend line (without randomness) results["Expected_Trend"] = last_price * np.exp(np.cumsum(expected_returns)) return results def analyze_bitcoin_prices(csv_path): """ Analyze Bitcoin price data to calculate volatility and growth rates. """ # Read CSV with proper data types df = pd.read_csv(csv_path, parse_dates=[0]) # Print first few rows of raw data to inspect print("\nFirst few rows of raw data:") print(df.head()) # Print data info to see types and non-null counts print("\nDataset Info:") print(df.info()) # Convert price columns to float and handle any potential formatting issues price_columns = ["Price", "Open", "High", "Low"] for col in price_columns: # Remove any commas in numbers df[col] = df[col].astype(str).str.replace(",", "") df[col] = pd.to_numeric(df[col], errors="coerce") # Rename columns for clarity df.columns = ["Date", "Close", "Open", "High", "Low", "Volume", "Change"] # Sort by date in ascending order df = df.sort_values("Date") # Print summary statistics after conversion print("\nPrice Summary After Conversion:") print(df[["Close", "Open", "High", "Low"]].describe()) # Calculate daily returns df["Daily_Return"] = df["Close"].pct_change() # Print first few daily returns to verify calculation print("\nFirst few daily returns:") print(df[["Date", "Close", "Daily_Return"]].head()) # Check for any infinite or NaN values print("\nInfinite or NaN value counts:") print(df.isna().sum()) # Calculate metrics using 365 days for annualization analysis = { "period_start": df["Date"].min().strftime("%Y-%m-%d"), "period_end": df["Date"].max().strftime("%Y-%m-%d"), "total_days": len(df), "daily_volatility": df["Daily_Return"].std(), "annualized_volatility": df["Daily_Return"].std() * np.sqrt(365), "total_return": (df["Close"].iloc[-1] / df["Close"].iloc[0] - 1) * 100, "average_daily_return": df["Daily_Return"].mean() * 100, "average_annual_return": ((1 + df["Daily_Return"].mean()) ** 365 - 1) * 100, "min_price": df["Low"].min(), "max_price": df["High"].max(), "avg_price": df["Close"].mean(), "start_price": df["Close"].iloc[0], "end_price": df["Close"].iloc[-1], } # Calculate rolling metrics df["Rolling_Volatility_30d"] = df["Daily_Return"].rolling( window=30 ).std() * np.sqrt(365) df["Rolling_Return_30d"] = df["Close"].pct_change(periods=30) * 100 return analysis, df # Main plotting functions def create_plots(df, start=None, end=None, project_days=365): """ Create plots including historical data and future projections. """ # Filter data based on date range mask = pd.Series(True, index=df.index) if start: mask &= df["Date"] >= pd.to_datetime(start) if end: mask &= df["Date"] <= pd.to_datetime(end) plot_df = df[mask].copy() if len(plot_df) == 0: raise ValueError("No data found for the specified date range") # Generate projections cycle_returns, cycle_volatility = analyze_cycles_with_halvings(plot_df) projections = project_prices_with_cycles(plot_df, days_forward=project_days) # Create cycle visualization visualize_cycle_patterns(plot_df, cycle_returns, cycle_volatility) # Set up the style plt.style.use("seaborn-v0_8") # Create figure fig = plt.figure(figsize=(15, 15)) # Date range for titles hist_date_range = f" ({plot_df['Date'].min().strftime('%Y-%m-%d')} to {plot_df['Date'].max().strftime('%Y-%m-%d')})" # 1. Price history and projections (log scale) ax1 = plt.subplot(4, 1, 1) # Plot historical prices ax1.semilogy(plot_df["Date"], plot_df["Close"], "b-", label="Historical Price") # Plot projections ax1.semilogy( projections.index, projections["Expected_Trend"], "--", color="purple", label="Expected Trend", ) ax1.semilogy( projections.index, projections["Median"], ":", color="green", label="Simulated Median", ) ax1.fill_between( projections.index, projections["Lower_95"], projections["Upper_95"], alpha=0.2, color="orange", label="95% Confidence Interval", ) ax1.fill_between( projections.index, projections["Lower_68"], projections["Upper_68"], alpha=0.3, color="green", label="68% Confidence Interval", ) # Customize y-axis ax1.yaxis.set_major_formatter(plt.FuncFormatter(format_price)) # Set custom y-axis ticks at meaningful price points min_price = min(plot_df["Low"].min(), projections["Lower_95"].min()) max_price = max(plot_df["High"].max(), projections["Upper_95"].max()) price_points = get_nice_price_points(min_price, max_price) ax1.set_yticks(price_points) # Adjust y-axis label properties ax1.tick_params(axis="y", labelsize=8) # Smaller font size # Add some padding to prevent label cutoff ax1.margins(y=0.02) # Adjust label padding to prevent overlap ax1.yaxis.set_tick_params(pad=1) # Add grid lines with adjusted opacity ax1.grid(True, which="major", linestyle="-", alpha=0.5) ax1.grid(True, which="minor", linestyle=":", alpha=0.2) ax1.set_title("Bitcoin Price History and Projections (Log Scale)" + hist_date_range) # Make legend font size smaller too for consistency ax1.legend(fontsize=8) # 2. Rolling volatility ax2 = plt.subplot(4, 1, 2) ax2.plot( plot_df["Date"], plot_df["Rolling_Volatility_30d"], "r-", label="30-Day Rolling Volatility", ) ax2.set_title("30-Day Rolling Volatility (Annualized)" + hist_date_range) ax2.set_xlabel("Date") ax2.set_ylabel("Volatility") ax2.grid(True) ax2.yaxis.set_major_formatter(plt.FuncFormatter(lambda y, _: "{:.0%}".format(y))) ax2.legend() # 3. Returns distribution ax3 = plt.subplot(4, 1, 3) returns_mean = plot_df["Daily_Return"].mean() returns_std = plot_df["Daily_Return"].std() filtered_returns = plot_df["Daily_Return"][ (plot_df["Daily_Return"] > returns_mean - 5 * returns_std) & (plot_df["Daily_Return"] < returns_mean + 5 * returns_std) ] sns.histplot(filtered_returns, bins=100, ax=ax3) ax3.set_title( "Distribution of Daily Returns (Excluding Extreme Outliers)" + hist_date_range ) ax3.set_xlabel("Daily Return") ax3.set_ylabel("Count") ax3.xaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: "{:.0%}".format(x))) # Add a vertical line for mean return ax3.axvline(filtered_returns.mean(), color="r", linestyle="dashed", linewidth=1) ax3.text( filtered_returns.mean(), ax3.get_ylim()[1], "Mean", rotation=90, va="top", ha="right", ) # 4. Projection ranges ax4 = plt.subplot(4, 1, 4) # Calculate and plot price ranges at different future points timepoints = np.array(range(30, 365, 30)) timepoints = timepoints[timepoints <= project_days] ranges = [] labels = [] positions = [] for t in timepoints: idx = t - 1 # Convert to 0-based index ranges.extend( [ projections["Lower_95"].iloc[idx], projections["Lower_68"].iloc[idx], projections["Median"].iloc[idx], projections["Upper_68"].iloc[idx], projections["Upper_95"].iloc[idx], ] ) labels.extend(["95% Lower", "68% Lower", "Median", "68% Upper", "95% Upper"]) positions.extend([t] * 5) # Plot ranges (removed violin plot) ax4.scatter(positions, ranges, alpha=0.6) # Add lines connecting the ranges for t in timepoints: idx = positions.index(t) ax4.plot([t] * 5, ranges[idx : idx + 5], "k-", alpha=0.3) # Set log scale first ax4.set_yscale("log") # Get the current order of magnitude for setting appropriate ticks min_price = min(ranges) max_price = max(ranges) # Create price points at regular intervals on log scale log_min = np.floor(np.log10(min_price)) log_max = np.ceil(np.log10(max_price)) price_points = [] for exp in range(int(log_min), int(log_max + 1)): for mult in [1, 2, 5]: point = mult * 10**exp if min_price <= point <= max_price: price_points.append(point) ax4.set_yticks(price_points) def price_formatter(x, p): if x >= 1e6: return f"${x/1e6:.1f}M" if x >= 1e3: return f"${x/1e3:.0f}K" return f"${x:.0f}" # Apply formatter to major ticks ax4.yaxis.set_major_formatter(plt.FuncFormatter(price_formatter)) # Customize the plot ax4.set_title("Projected Price Ranges at Future Timepoints") ax4.set_xlabel("Days Forward") ax4.set_ylabel("Price (USD)") ax4.grid(True, alpha=0.3) # Set x-axis to show only our timepoints ax4.set_xticks(timepoints) # Adjust layout plt.tight_layout() # Save the plot start_str = start if start else plot_df["Date"].min().strftime("%Y-%m-%d") end_str = end if end else plot_df["Date"].max().strftime("%Y-%m-%d") filename = f"bitcoin_analysis_{start_str}_to_{end_str}_with_projections.png" plt.savefig(filename, dpi=300, bbox_inches="tight") plt.close() return projections def visualize_cycle_patterns(df, cycle_returns, cycle_volatility): """ Create enhanced visualization of Bitcoin's behavior across halving cycles. """ plt.style.use("seaborn-v0_8") fig = plt.figure(figsize=(15, 15)) # Create a 3x1 subplot grid with different heights gs = plt.GridSpec(3, 1, height_ratios=[2, 1, 2], hspace=0.3) # Plot 1: Returns across cycle with confidence bands ax1 = plt.subplot(gs[0]) # Convert days to percentage through cycle x_points = np.array(cycle_returns.index) / (4 * 365) * 100 # Calculate rolling mean and standard deviation for confidence bands window = 30 # 30-day window rolling_mean = pd.Series(cycle_returns.values).rolling(window=window).mean() rolling_std = pd.Series(cycle_returns.values).rolling(window=window).std() # Plot confidence bands ax1.fill_between( x_points, (rolling_mean - 2 * rolling_std) * 100, (rolling_mean + 2 * rolling_std) * 100, alpha=0.2, color="blue", label="95% Confidence", ) ax1.fill_between( x_points, (rolling_mean - rolling_std) * 100, (rolling_mean + rolling_std) * 100, alpha=0.3, color="blue", label="68% Confidence", ) # Plot average returns ax1.plot( x_points, cycle_returns.values * 100, "b-", label="Average Daily Return", linewidth=2, ) ax1.axhline(y=0, color="gray", linestyle="--", alpha=0.5) # Add vertical lines for each year in cycle for year in range(1, 4): ax1.axvline(x=year * 25, color="gray", linestyle=":", alpha=0.3) ax1.text( year * 25, ax1.get_ylim()[1], f"Year {year}", rotation=90, va="top", ha="right", alpha=0.7, ) # Highlight halving points ax1.axvline(x=0, color="red", linestyle="--", alpha=0.5, label="Halving Event") ax1.axvline(x=100, color="red", linestyle="--", alpha=0.5) ax1.set_title("Bitcoin Return Patterns Across Halving Cycle", pad=20) ax1.set_xlabel("Position in Cycle (%)") ax1.set_ylabel("Average Daily Return (%)") ax1.grid(True, alpha=0.3) ax1.legend(loc="upper right") # Plot 2: Volatility across cycle ax2 = plt.subplot(gs[1]) # Calculate rolling volatility confidence bands vol_mean = pd.Series(cycle_volatility.values).rolling(window=window).mean() vol_std = pd.Series(cycle_volatility.values).rolling(window=window).std() # Plot volatility with confidence bands annualized_factor = np.sqrt(365) * 100 ax2.fill_between( x_points, (vol_mean - 2 * vol_std) * annualized_factor, (vol_mean + 2 * vol_std) * annualized_factor, alpha=0.2, color="red", label="95% Confidence", ) ax2.plot( x_points, cycle_volatility.values * annualized_factor, "r-", label="Annualized Volatility", linewidth=2, ) # Add year markers for year in range(1, 4): ax2.axvline(x=year * 25, color="gray", linestyle=":", alpha=0.3) ax2.axvline(x=0, color="red", linestyle="--", alpha=0.5) ax2.axvline(x=100, color="red", linestyle="--", alpha=0.5) ax2.set_xlabel("Position in Cycle (%)") ax2.set_ylabel("Volatility (%)") ax2.grid(True, alpha=0.3) ax2.legend(loc="upper right") # Plot 3: Average price trajectory within cycles ax3 = plt.subplot(gs[2]) # Define a color scheme for cycles cycle_colors = ["#1f77b4", "#ff7f0e", "#2ca02c", "#d62728", "#9467bd"] # Calculate average price path for each cycle halving_dates = get_halving_dates() cycles = [] for i in range(len(halving_dates) - 1): cycle_start = halving_dates[i] cycle_end = halving_dates[i + 1] cycle_data = df[(df["Date"] >= cycle_start) & (df["Date"] < cycle_end)].copy() if len(cycle_data) > 0: cycle_data["Cycle_Pct"] = ( (cycle_data["Date"] - cycle_start).dt.total_seconds() / (cycle_end - cycle_start).total_seconds() * 100 ) cycle_data["Normalized_Price"] = ( cycle_data["Close"] / cycle_data["Close"].iloc[0] ) cycles.append(cycle_data) # Plot each historical cycle with distinct colors for i, cycle in enumerate(cycles): ax3.semilogy( cycle["Cycle_Pct"], cycle["Normalized_Price"], color=cycle_colors[i], alpha=0.7, label=f'Cycle {i+1} ({cycle["Date"].iloc[0].strftime("%Y")}-{cycle["Date"].iloc[-1].strftime("%Y")})', ) # Calculate and plot average cycle if cycles: avg_cycle = pd.concat( [c.set_index("Cycle_Pct")["Normalized_Price"] for c in cycles], axis=1 ) avg_cycle_mean = avg_cycle.mean(axis=1) avg_cycle_std = avg_cycle.std(axis=1) ax3.semilogy( avg_cycle_mean.index, avg_cycle_mean.values, "k-", linewidth=2, label="Average Cycle", ) ax3.fill_between( avg_cycle_mean.index, avg_cycle_mean * np.exp(-2 * avg_cycle_std), avg_cycle_mean * np.exp(2 * avg_cycle_std), alpha=0.2, color="gray", ) # Add year markers for year in range(1, 4): ax3.axvline(x=year * 25, color="gray", linestyle=":", alpha=0.3) ax3.axvline(x=0, color="red", linestyle="--", alpha=0.5) ax3.axvline(x=100, color="red", linestyle="--", alpha=0.5) ax3.set_title("Price Performance Across Cycles (Normalized)", pad=20) ax3.set_xlabel("Position in Cycle (%)") ax3.set_ylabel("Price (Relative to Cycle Start)") ax3.grid(True, alpha=0.3) ax3.legend(loc="center left", bbox_to_anchor=(1.02, 0.5)) # Add current cycle position marker on all plots current_position = get_cycle_position(df["Date"].max(), halving_dates) * 100 for ax in [ax1, ax2, ax3]: ax.axvline( x=current_position, color="green", linestyle="-", alpha=0.5, label="Current Position", ) # Main title for the figure fig.suptitle("Bitcoin Halving Cycle Analysis", fontsize=16, y=0.95) # Adjust layout to prevent legend cutoff plt.tight_layout() # Save the plot plt.savefig("bitcoin_cycle_patterns.png", dpi=300, bbox_inches="tight") plt.close() if __name__ == "__main__": analysis, df = analyze_bitcoin_prices("prices.csv") # create_plots(df) # Full history # create_plots(df, start='2022-01-01') # From 2022 onwards # create_plots(df, start='2023-01-01', end='2023-12-31') # Just 2023 # Create plots with different time ranges and projections projections = create_plots(df, start="2011-01-01", project_days=365 * 4) print("\nProjected Prices at Key Points:") print(projections.iloc[[29, 89, 179, 364]].round(2)) # 30, 90, 180, 365 days