Python Code for Portfolio Optimization Chapter 11 – Risk Parity Portfolios
Daniel P. Palomar (2025). Portfolio Optimization: Theory and Application. Cambridge University Press.
Last update: March 27, 2025
Contributors:
Libraries¶
The following libraries are used in the examples:
# Core numerical computing
import numpy as np
import pandas as pd
from typing import Dict, Tuple, List, Callable
import warnings
warnings.filterwarnings('ignore')
import time
from statistics import median
# For financial data
import yfinance as yf # Loading financial data
import empyrical as emp # Performance metrics
# Book data (pip install "git+https://github.com/dppalomar/pob.git#subdirectory=python")
from pob_python import SP500_stocks_2015to2020, SP500_index_2015to2020
# Plotting
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import matplotlib.gridspec as gridspec
import seaborn as sns
sns.set_theme(style="darkgrid")
# Optimization
import riskparityportfolio as rpp
import cvxpy as cp # interface for convex optimization solvers
from qpsolvers import solve_qp
import quadprog
from scipy import optimize, linalg
Some math definitions for the equations:
$\def\bm#1{\boldsymbol{#1}}$ $\def\textm#1{\textsf{#1}}$ $\def\T{{\mkern-2mu\raise-1mu\mathsf{T}}}}$ $\def\R{\mathbb{R}}$ $\def\E{\rm I\kern-.2em E}$ $\def\w{\bm{w}}$ $\def\bmu{\bm{\mu}}}$ $\def\bSigma{\bm{\Sigma}}}$
From dollar to risk diversification¶
Portfolio allocation and risk allocation for the $1/N$ portfolio and risk parity portfolio (based on the riskparityportfolio
package available for installation at PiPy riskparityportfolio, see the riskparityportfolio documentation):
# Get data
stock_prices = SP500_stocks_2015to2020.loc["2020":"2020-09", ["AAPL", "NFLX", "TSCO", "MGM", "MSFT", "FB", "AMZN", "GOOGL"]]
T = stock_prices.shape[0]
T_trn = round(0.70*T)
# Calculate returns
X = np.diff(np.log(stock_prices.iloc[:T_trn].values), axis=0)
Sigma = np.cov(X.T)
N = X.shape[1]
# Portfolios
w_EWP = np.ones(N) / N
w_RPP = rpp.RiskParityPortfolio(covariance=Sigma).weights
# Helper function to compute risk contribution
def risk_contribution(w, Sigma):
portfolio_risk = np.sqrt(w.T @ Sigma @ w)
marginal_risk_contribution = Sigma @ w / portfolio_risk
risk_contribution = w * marginal_risk_contribution
return risk_contribution
# Helper function to compute relative risk contribution
def relative_risk_contribution(w, Sigma):
rc = risk_contribution(w, Sigma)
return rc / np.sum(rc)
# Helper function to plot barplots of portfolio weights and risk contribution
def barplot_w_and_RCC(weights_dict, Sigma):
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
# Extract stock names (assuming it's always the first key)
stocks = weights_dict['stocks']
# Get all portfolio keys except 'stocks'
portfolio_keys = [key for key in weights_dict.keys() if key != 'stocks']
# Extract portfolios and their names
portfolios = [weights_dict[key] for key in portfolio_keys]
portfolio_names = portfolio_keys
x = np.arange(len(stocks))
width = 0.8 / len(portfolios) # Adjust width based on number of portfolios
# Plot weights
for i, (portfolio, name) in enumerate(zip(portfolios, portfolio_names)):
position = x + i*width - (len(portfolios)-1)*width/2
ax1.bar(position, portfolio, width, label=name, edgecolor='black')
# Plot relative risk contributions
for i, (portfolio, name) in enumerate(zip(portfolios, portfolio_names)):
position = x + i*width - (len(portfolios)-1)*width/2
rrc = relative_risk_contribution(portfolio, Sigma)
ax2.bar(position, rrc, width, label=name, edgecolor='black')
# Set labels and titles
ax1.set_title('Portfolio weights')
ax1.set_ylabel('Weight')
ax1.set_xticks(x)
ax1.set_xticklabels(stocks)
ax1.legend()
ax2.set_title('Relative risk contribution')
ax2.set_ylabel('Risk')
ax2.set_xticks(x)
ax2.set_xticklabels(stocks)
ax2.legend()
plt.tight_layout()
# Plot weights and risk contributions
barplot_w_and_RCC({
'stocks': stock_prices.columns,
'1/N': w_EWP,
'RPP': w_RPP,
}, Sigma)
Naive diagonal formulation¶
Portfolio allocation and risk contribution of the $1/N$ portfolio and naive RPP:
# Naive diagonal risk parity portfolio
def naive_risk_parity(Sigma):
w = 1 / np.sqrt(np.diag(Sigma))
return w / np.sum(w)
w_naive_RPP = naive_risk_parity(Sigma)
w_naive_RPP_package = rpp.RiskParityPortfolio(Sigma).get_diag_solution()
# Plot weights and risk contributions
barplot_w_and_RCC({
'stocks': stock_prices.columns,
'1/N': w_EWP,
'Naive diagonal RPP': w_naive_RPP,
'Naive diagonal RPP (package)': w_naive_RPP_package,
}, Sigma)
Vanilla convex formulations¶
Portfolio allocation and risk contribution of the vanilla convex RPP compared to benchmarks:
# Portfolios
w_EWP = np.ones(N) / N
w_naive_RPP = rpp.RiskParityPortfolio(Sigma).get_diag_solution()
w_convex_RPP = rpp.RiskParityPortfolio(Sigma).weights
# Plot weights and risk contributions
barplot_w_and_RCC({
'stocks': stock_prices.columns,
'1/N': w_EWP,
'Naive diagonal RPP': w_naive_RPP,
'Vanilla convex RPP': w_convex_RPP,
}, Sigma)
Algorithms¶
General-purpose nonlinear solver¶
We can solve the convex problem formulation with SciPy's general-purpose nonlinear solver scipy.optimize
(see ):
# Initial point
w = np.ones(N) / N
x0 = w / np.sqrt(w.T @ Sigma @ w)
# Function definition
def fn_convex(x, Sigma):
N = Sigma.shape[0]
return 0.5 * x.T @ Sigma @ x - (1/N) * np.sum(np.log(x))
# Optimize with general-purpose solver
result = optimize.minimize(
fn_convex,
x0,
args=(Sigma,),
method='BFGS'
)
x_general_solver = result.x
w_general_solver_RPP = x_general_solver / np.sum(x_general_solver)
# Sanity check of the solution
b = np.ones(N) / N
residual = Sigma @ x_general_solver - b / x_general_solver
print("Optimality condition residual:", np.linalg.norm(residual))
Optimality condition residual: 1.4157366298264829e-05
Newton's method¶
Newton's method for Spinu's RPP formulation:
def newton_method_risk_parity(Sigma, num_iter=5):
N = Sigma.shape[0]
b = np.ones(N) / N
# Initial point
w = np.ones(N) / N
x = w / np.sqrt(w.T @ Sigma @ w)
# For tracking progress
results = []
results.append({
'k': 0,
'cpu_time_k': 0,
'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)),
'gap': None # Will be filled later
})
for k in range(1, num_iter + 1):
start_time = time.time()
# Newton step
grad = Sigma @ x - b / x
Hessian = Sigma + np.diag(b / x**2)
x_new = x - np.linalg.solve(Hessian, grad)
cpu_time = (time.time() - start_time) * 1000 # in milliseconds
x = x_new
results.append({
'k': k,
'cpu_time_k': cpu_time,
'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)),
'gap': None # Will be filled later
})
# Calculate optimal value (assuming the last iteration is close to optimal)
opt_value = results[-1]['obj_value']
# Fill in the gaps
for result in results:
result['gap'] = result['obj_value'] - opt_value
return pd.DataFrame(results), x
# Run Newton's method
df_newton, x_newton = newton_method_risk_parity(Sigma)
df_newton['CPU time [ms]'] = df_newton['cpu_time_k'].cumsum()
# Plot convergence
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8))
ax1.semilogy(df_newton['k'], df_newton['gap'], 'b-', linewidth=1.5)
ax1.set_xlabel('Iterations')
ax1.set_ylabel('Optimality gap')
ax1.set_title('Optimality gap versus iterations')
ax1.set_xlim(0, df_newton['k'].max())
ax1.set_xticks(range(0, df_newton['k'].max() + 1))
ax2.semilogy(df_newton['CPU time [ms]'], df_newton['gap'], 'b-', linewidth=1.5)
ax2.set_xlabel('CPU time [ms]')
ax2.set_ylabel('Optimality gap')
ax2.set_title('Optimality gap versus CPU time')
ax2.set_xlim(0, 0.25)
plt.tight_layout()
Cyclical coordinate descent algorithm¶
Cyclical coordinate descent algorithm for Spinu's RPP formulation:
def cyclical_coordinate_descent(Sigma, num_iter=10):
N = Sigma.shape[0]
b = np.ones(N) / N
# Initial point
w = np.ones(N) / N
x = w / np.sqrt(w.T @ Sigma @ w)
# For tracking progress
results = []
results.append({
'k': 0,
'cpu_time_k': 0,
'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)),
'gap': None # Will be filled later
})
for k in range(1, num_iter + 1):
start_time = time.time()
x_new = x.copy()
for i in range(N):
# Update each coordinate
Sigma_xk_i = np.dot(x_new[:i], Sigma[:i, i]) + np.dot(x_new[i+1:], Sigma[i+1:, i])
x_new[i] = (-Sigma_xk_i + np.sqrt(Sigma_xk_i**2 + 4 * Sigma[i, i] * b[i])) / (2 * Sigma[i, i])
cpu_time = (time.time() - start_time) * 1000 # in milliseconds
x = x_new
results.append({
'k': k,
'cpu_time_k': cpu_time,
'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)),
'gap': None # Will be filled later
})
# Calculate optimal value (assuming the last iteration is close to optimal)
opt_value = results[-1]['obj_value']
# Fill in the gaps
for result in results:
result['gap'] = result['obj_value'] - opt_value
return pd.DataFrame(results), x
# Run cyclical coordinate descent
df_ccd, x_ccd = cyclical_coordinate_descent(Sigma)
df_ccd['CPU time [ms]'] = df_ccd['cpu_time_k'].cumsum()
# Plot convergence
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8))
ax1.semilogy(df_ccd['k'], df_ccd['gap'], 'b-', linewidth=1.5)
ax1.set_xlabel('Iterations')
ax1.set_ylabel('Optimality gap')
ax1.set_title('Optimality gap versus iterations')
ax1.set_xlim(0, df_ccd['k'].max())
ax1.set_xticks(range(0, df_ccd['k'].max() + 1))
ax2.semilogy(df_ccd['CPU time [ms]'], df_ccd['gap'], 'b-', linewidth=1.5)
ax2.set_xlabel('CPU time [ms]')
ax2.set_ylabel('Optimality gap')
ax2.set_title('Optimality gap versus CPU time')
ax2.set_xlim(0, 50)
plt.tight_layout()
Numerical comparison¶
Convergence of Newton, MM, and SCA methods for Spinu's RPP problem, comparing the original version with the improved one (i.e., using the correlation matrix instead of the covariance matrix and using the linear normalization step). See Chapter 11 in book for details. It seems that the best method is the improved SCA.
num_iter = 25
N = Sigma.shape[0]
b = np.ones(N) / N
# Prepare correlation matrix
sigma = np.sqrt(np.diag(Sigma))
C = np.corrcoef(X.T)
# Compute optimal value (using vanilla convex RPP)
w_opt = rpp.RiskParityPortfolio(Sigma).weights
x_opt = w_opt / np.sqrt(w_opt.T @ Sigma @ w_opt)
opt_value = 0.5 * x_opt.T @ Sigma @ x_opt - np.sum(b * np.log(x_opt))
results = []
#
# Newton's method
#
x = np.sqrt(b) / np.sqrt(np.sum(Sigma, axis=1))
results.append({
'k': 0,
'cpu_time_k': 0,
'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)),
'gap': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)) - opt_value,
'method': 'Newton'
})
for k in range(1, num_iter + 1):
start_time = time.time()
# Newton step
grad = Sigma @ x - b / x
Hessian = Sigma + np.diag(b / x**2)
x_new = x - np.linalg.solve(Hessian, grad)
cpu_time = (time.time() - start_time) * 1000 # in milliseconds
x = x_new
results.append({
'k': k,
'cpu_time_k': cpu_time,
'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)),
'gap': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)) - opt_value,
'method': 'Newton'
})
#
# Newton's method (improved)
#
x = np.sqrt(b) / np.sqrt(np.sum(C, axis=1))
rowsum_C = np.sum(C, axis=1)
results.append({
'k': 0,
'cpu_time_k': 0,
'obj_value': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)),
'gap': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)) - opt_value,
'method': 'Newton - improved'
})
for k in range(1, num_iter + 1):
start_time = time.time()
# Newton step with correlation matrix
grad = C @ x - b / x
Hessian = C + np.diag(b / x**2)
x_new = x - np.linalg.solve(Hessian, grad)
x_new = x_new * np.sqrt(np.sum(b/x_new) / np.sum(rowsum_C * x_new))
cpu_time = (time.time() - start_time) * 1000 # in milliseconds
x = x_new
results.append({
'k': k,
'cpu_time_k': cpu_time,
'obj_value': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)),
'gap': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)) - opt_value,
'method': 'Newton - improved'
})
#
# MM algorithm
#
x = np.sqrt(b) / np.sqrt(np.sum(Sigma, axis=1))
results.append({
'k': 0,
'cpu_time_k': 0,
'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)),
'gap': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)) - opt_value,
'method': 'MM'
})
lmd_max = np.max(np.linalg.eigvals(Sigma))
Sigma_lmd_max = Sigma - lmd_max * np.eye(N)
for k in range(1, num_iter + 1):
start_time = time.time()
# MM update
Sigma_lmd_max_xk = Sigma_lmd_max @ x
x_new = (-Sigma_lmd_max_xk + np.sqrt(Sigma_lmd_max_xk**2 + 4 * lmd_max * b)) / (2 * lmd_max)
cpu_time = (time.time() - start_time) * 1000 # in milliseconds
x = x_new
results.append({
'k': k,
'cpu_time_k': cpu_time,
'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)),
'gap': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)) - opt_value,
'method': 'MM'
})
#
# MM algorithm (improved)
#
x = np.sqrt(b) / np.sqrt(np.sum(C, axis=1))
rowsum_C = np.sum(C, axis=1)
results.append({
'k': 0,
'cpu_time_k': 0,
'obj_value': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)),
'gap': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)) - opt_value,
'method': 'MM - improved'
})
lmd_max = np.max(np.linalg.eigvals(C))
C_lmd_max = C - lmd_max * np.eye(N)
for k in range(1, num_iter + 1):
start_time = time.time()
# MM update with correlation matrix
C_lmd_max_xk = C_lmd_max @ x
x_new = (-C_lmd_max_xk + np.sqrt(C_lmd_max_xk**2 + 4 * lmd_max * b)) / (2 * lmd_max)
x_new = x_new * np.sqrt(np.sum(b/x_new) / np.sum(rowsum_C * x_new))
cpu_time = (time.time() - start_time) * 1000 # in milliseconds
x = x_new
results.append({
'k': k,
'cpu_time_k': cpu_time,
'obj_value': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)),
'gap': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)) - opt_value,
'method': 'MM - improved'
})
#
# SCA algorithm
#
x = np.sqrt(b) / np.sqrt(np.sum(Sigma, axis=1))
gamma = 1
eps = 0.1
results.append({
'k': 0,
'cpu_time_k': 0,
'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)),
'gap': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)) - opt_value,
'method': 'SCA'
})
Sigma_Diag_Sigma = Sigma - np.diag(np.diag(Sigma))
for k in range(1, num_iter + 1):
start_time = time.time()
# SCA update
Sigma_Diag_Sigma_xk = Sigma_Diag_Sigma @ x
x_hat = (-Sigma_Diag_Sigma_xk + np.sqrt(Sigma_Diag_Sigma_xk**2 + 4 * np.diag(Sigma) * b)) / (2 * np.diag(Sigma))
x_new = gamma * x_hat + (1 - gamma) * x
cpu_time = (time.time() - start_time) * 1000 # in milliseconds
x = x_new
gamma = gamma * (1 - eps * gamma)
results.append({
'k': k,
'cpu_time_k': cpu_time,
'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)),
'gap': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)) - opt_value,
'method': 'SCA'
})
#
# SCA algorithm (improved)
#
x = np.sqrt(b) / np.sqrt(np.sum(C, axis=1))
rowsum_C = np.sum(C, axis=1)
gamma = 1
eps = 0.1
results.append({
'k': 0,
'cpu_time_k': 0,
'obj_value': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)),
'gap': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)) - opt_value,
'method': 'SCA - improved'
})
C_minus_I = C - np.eye(N)
for k in range(1, num_iter + 1):
start_time = time.time()
# SCA update with correlation matrix
C_minus_I_xk = C_minus_I @ x
x_hat = (-C_minus_I_xk + np.sqrt(C_minus_I_xk**2 + 4 * b)) / 2
x_hat = x_hat * np.sqrt(np.sum(b/x_hat) / np.sum(rowsum_C * x_hat))
x_new = gamma * x_hat + (1 - gamma) * x
cpu_time = (time.time() - start_time) * 1000 # in milliseconds
x = x_new
gamma = gamma * (1 - eps * gamma)
results.append({
'k': k,
'cpu_time_k': cpu_time,
'obj_value': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)),
'gap': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)) - opt_value,
'method': 'SCA - improved'
})
# Convert results to DataFrame
df = pd.DataFrame(results)
# Calculate cumulative CPU time
df['CPU time [ms]'] = df.groupby('method')['cpu_time_k'].cumsum()
# Plot convergence
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
# Plot gap vs iterations
for method in df['method'].unique():
method_data = df[df['method'] == method]
ax1.semilogy(method_data['k'], method_data['gap'], linewidth=1.5, label=method)
ax1.set_xlabel('Iterations')
ax1.set_ylabel('Optimality gap')
ax1.set_title('Optimality gap versus iterations')
ax1.set_xlim(0, num_iter)
ax1.set_xticks(range(0, num_iter+1, 5))
ax1.legend()
ax1.grid(True, which="both", ls="--", alpha=0.3)
# Plot gap vs CPU time
for method in df['method'].unique():
method_data = df[df['method'] == method]
ax2.semilogy(method_data['CPU time [ms]'], method_data['gap'], linewidth=1.5, label=method)
ax2.set_xlabel('CPU time [ms]')
ax2.set_ylabel('Optimality gap')
ax2.set_title('Optimality gap versus CPU time')
ax2.set_xlim(0, 0.25)
ax2.legend()
ax2.grid(True, which="both", ls="--", alpha=0.3)
plt.tight_layout()
plt.show()
General nonconvex formulations¶
Portfolio allocation and risk contribution of general nonconvex RPP (with $w_i \leq 0.15$) compared to benchmarks ($1/N$ portfolio, naive diagonal RPP, and vanilla convex RPP):
# Portfolios
w_EWP = np.ones(N) / N
w_naive_RPP = rpp.RiskParityPortfolio(Sigma).get_diag_solution()
w_convex_RPP = rpp.RiskParityPortfolio(Sigma).weights
# General nonconvex formulation with upper bound w < 0.15 via Cw = c and Dw <= d)
ub = 0.14
my_portfolio = rpp.RiskParityPortfolio(Sigma)
my_portfolio.design(Cmat=np.ones((1, N)), cvec=np.array([1.0]), # sum(w) = 1
Dmat=np.vstack([-np.eye(N), np.eye(N)]),
dvec=np.concatenate([np.zeros(N), ub*np.ones(N)])) # -w <= 0 and w < 0.15
w_nonconvex_RPP = my_portfolio.weights
# Plot weights and risk contributions
barplot_w_and_RCC({
'stocks': stock_prices.columns,
'1/N': w_EWP,
'Naive diagonal RPP': w_naive_RPP,
'Vanilla convex RPP': w_convex_RPP,
'General nonconvex RPP': w_nonconvex_RPP,
}, Sigma)
Algorithms¶
Converge comparison for different algorithms to solve the nonconvex RPP formulation: $$ \begin{array}{ll} \underset{\w}{\textm{minimize}} & \begin{aligned}\sum_{i=1}^N \left(\frac{w_i\left(\bSigma\w\right)_i}{\w^\T\bSigma\w} - b_i\right)^2\end{aligned}\\ \textm{subject to} & \w \in \mathcal{W}. \end{array} $$
# Prep data
N = 200
stock_prices = SP500_stocks_2015to2020.iloc[:, :N]
X = np.diff(np.log(stock_prices.values), axis=0) # Calculate returns from stock prices
Sigma = np.cov(X.T)
b = np.ones(N) / N # Risk budget
w_ub = 0.008 # RPP with upper bound
w_init = np.ones(N) / N
num_iter = 20
num_times = 10 # to compute the CPU time
#
# General nonlinear solver (using scipy.optimize)
#
def R(w):
Sigma_w = Sigma @ w
r = w * Sigma_w
sum_r = np.sum(r)
r_sumr_b = r/sum_r - b
return np.sum(np.square(r_sumr_b))
def R_grad(w):
Sigma_w = Sigma @ w
r = w * Sigma_w
sum_r = np.sum(r)
r_sumr_b = r/sum_r - b
v = (2/sum_r) * (r_sumr_b - np.sum(r_sumr_b * r)/sum_r)
return (Sigma @ (w * v) + Sigma_w * v)
def heq(w, *args):
"""Equality constraint: sum(w) = 1"""
return np.sum(w) - 1
def heq_jac(w, *args):
"""Jacobian of equality constraint"""
return np.ones((1, len(w)))
def hin(w, *args):
"""Inequality constraints: w >= 0 and w <= w_ub"""
return np.concatenate([w, w_ub - w])
def hin_jac(w, *args):
"""Jacobian of inequality constraints"""
N = len(w)
return np.vstack([np.eye(N), -np.eye(N)])
# Initialize dataframe for results
w = w_init.copy()
df = pd.DataFrame({
"k": [0],
"cpu_time_per_iter": [np.nan],
"CPU time [ms]": [0],
"obj_value": [R(w)],
"method": ["nonlinear solver (scipy)"]
})
# Run optimization for different iterations
for k in range(1, num_iter + 1):
cpu_times = []
for _ in range(num_times):
start_time = time.time()
# Define constraints for scipy.optimize
constraints = [
{'type': 'eq', 'fun': heq, 'jac': heq_jac},
{'type': 'ineq', 'fun': hin, 'jac': hin_jac}
]
# Run optimization with same initial point and with limited iterations
res = optimize.minimize(
R,
w_init,
jac=R_grad,
constraints=constraints,
method='SLSQP',
options={'maxiter': k, 'disp': False}
)
w = res.x
end_time = time.time()
cpu_times.append((end_time - start_time) * 1e6) # microseconds
cpu_time = median(cpu_times)
# Append results to dataframe
df = pd.concat([df, pd.DataFrame({
"k": [k],
"cpu_time_per_iter": [np.nan],
"CPU time [ms]": [1e3 * cpu_time], # convert to milliseconds
"obj_value": [R(w)],
"method": ["nonlinear solver (scipy)"]
})], ignore_index=True)
#
# SCA via explicit implementation
#
gamma = 0.9
zeta = 1e-7
# Constraints (A' * b >= b0)
meq = 1
# Create constraint matrices
ones_vec = np.ones((1, N))
neg_eye = -np.eye(N)
pos_eye = np.eye(N)
Amat = np.vstack([ones_vec, neg_eye, pos_eye]).T # Transpose to match R's t(rbind(...))
bvec = np.concatenate([[1], -np.ones(N) * w_ub, np.zeros(N)])
# Intermediate variables
tau = 0.05 * np.sum(np.diag(Sigma)) / (2*N)
tauI = tau * np.eye(N)
# Define functions for risk concentration
my_portfolio = rpp.RiskParityPortfolio(Sigma)
risk = rpp.RiskContribOverVarianceMinusBudget(my_portfolio)
g = risk.risk_concentration_vector # gvec
A = risk.jacobian_risk_concentration_vector() # Amat
# # Numerical check
# g(np.ones(N) / N), A(np.ones(N) / N)
# Initialize
wk = w_init.copy()
df = pd.concat([df, pd.DataFrame({
"k": [0],
"cpu_time_per_iter": [0],
"CPU time [ms]": [np.nan],
"obj_value": [R(wk)],
"method": ["SCA"]
})], ignore_index=True)
# Loop
for k in range(1, num_iter + 1):
cpu_times = []
for _ in range(num_times):
start_time = time.time()
# Auxiliary quantities
gk = np.asarray(g(wk))
Jk = np.asarray(A(wk))
Qk = 2 * Jk.T @ Jk + tauI
qk = 2 * Jk.T @ gk - Qk @ wk
# Prepare constraints for QP solver
# Equality constraint: sum(w) = 1
A_eq = np.ones((1, N))
b_eq = np.array([1.0])
# Inequality constraints: 0 <= w <= w_ub
G = np.vstack([-np.eye(N), np.eye(N)]) # -w <= 0 and w <= w_ub
h = np.concatenate([np.zeros(N), np.ones(N) * w_ub])
# Combine then together
Cmat = np.vstack([A_eq, -G]).T
bvec = np.concatenate([b_eq, -h])
# Solve QP
#w_hat = solve_qp(P=Qk, q=qk, A=A_eq, b=b_eq, lb=np.zeros(N), ub=np.ones(N) * w_ub, solver="quadprog")
#w_hat = solve_qp(P=Qk, q=qk, G=G, h=h, A=A_eq, b=b_eq, solver="quadprog")
w_hat = quadprog.solve_qp(Qk, -qk, C=Cmat, b=bvec, meq=1)[0]
w_new = wk + gamma * (w_hat - wk)
end_time = time.time()
cpu_times.append((end_time - start_time) * 1e6) # microseconds
cpu_time_k = median(cpu_times)
wk = w_new
gamma = gamma * (1 - zeta*gamma)
# Append results to dataframe
df = pd.concat([df, pd.DataFrame({
"k": [k],
"cpu_time_per_iter": [1e3 * cpu_time_k],
"CPU time [ms]": [np.nan],
"obj_value": [R(wk)],
"method": ["SCA"]
})], ignore_index=True)
df.loc[df['method'] == 'SCA', 'CPU time [ms]'] = df.loc[df['method'] == 'SCA', 'cpu_time_per_iter'].cumsum()
# Process dataframe
#df['method'] = pd.Categorical(df['method'], categories=df['method'].unique())
# Add optimality gap using the actual minimum value
min_obj_value = df['obj_value'].min()
df['gap'] = df['obj_value'] - min_obj_value
#
# Plotting
#
fig = plt.figure(figsize=(12, 10))
gs = gridspec.GridSpec(2, 1, height_ratios=[1, 1])
# Plot 1: Gap vs iterations
ax1 = plt.subplot(gs[0])
for method in df['method'].unique():
method_data = df[df['method'] == method]
ax1.semilogy(method_data['k'], method_data['gap'], linewidth=1.5, label=method)
ax1.set_xlabel('Iterations')
ax1.set_ylabel('Optimality gap')
ax1.set_title('Optimality gap versus iterations')
ax1.set_xlim(0, num_iter)
ax1.set_xticks(range(0, num_iter+1, 1))
ax1.set_ylim(1e-7, None)
ax1.legend()
ax1.grid(True, which="both", ls="--", alpha=0.3)
# Plot 2: Gap vs CPU time
ax2 = plt.subplot(gs[1])
for method in df['method'].unique():
method_data = df[df['method'] == method]
ax2.semilogy(method_data['CPU time [ms]'], method_data['gap'], linewidth=1.5, label=method)
ax2.set_xlabel('CPU time [ms]')
ax2.set_ylabel('Optimality gap')
ax2.set_title('Optimality gap versus CPU time')
ax2.set_ylim(1e-7, None)
ax2.legend()
ax2.grid(True, which="both", ls="--", alpha=0.3)
plt.tight_layout()
plt.show()