Kaggle Trading at the Close PB金牌方案!
竞赛背景证券交易所是一个快节奏、高风险的环境,每一秒都至关重要。随着交易日接近尾声,交易的强度会不断增加,最后十分钟会至关重要。这些时刻通常以高波动和快速价格波动为特征,对塑造当天的全球经济起着关键作用。
纳斯达克证券交易所的每个交易日都以纳斯达克收盘交叉盘竞价结束。这个过程为交易所上市的证券建立了官方的收盘价格。这些证券的收盘价格对投资者、分析师和其他市场参与者来说是评估个别证券和整个市场表现的关键指标。
在这个复杂的金融领域中,运营着全球领先电子market maker Optiver。受到技术创新的推动,Optiver交易各种金融工具,如衍生品、现金股票、ETF、债券和外币,在全球主要交易所上为数千种这些工具提供有竞争力的双边价格。
在纳斯达克交易所交易的最后十分钟,像Optiver这样的market maker将传统订单簿数据与竞价簿数据合并。整合来自两个来源的信息的能力对于为所有市场参与者提供最佳价格至关重要。
在这个比赛中,所有选手需要利用股票的订单簿和收盘竞价数据开发模型,预测数百家纳斯达克上市股票的收盘价格走势。来自竞价的信息可以用来调整价格,评估供需动态,并识别交易机会。
方案解读作者的因子构建的比较详细,具体的细节大家可以自己总结,此处大致汇总构建的逻辑框架。构建的粒度:
其中构建的逻辑中使用到的算子,或者统计的操作:
weights = [
0.004, 0.001, 0.002, 0.006, 0.004, 0.004, 0.002, 0.006, 0.006, 0.002, 0.002, 0.008,
0.006, 0.002, 0.008, 0.006, 0.002, 0.006, 0.004, 0.002, 0.004, 0.001, 0.006, 0.004,
0.002, 0.002, 0.004, 0.002, 0.004, 0.004, 0.001, 0.001, 0.002, 0.002, 0.006, 0.004,
0.004, 0.004, 0.006, 0.002, 0.002, 0.04 , 0.002, 0.002, 0.004, 0.04 , 0.002, 0.001,
0.006, 0.004, 0.004, 0.006, 0.001, 0.004, 0.004, 0.002, 0.006, 0.004, 0.006, 0.004,
0.006, 0.004, 0.002, 0.001, 0.002, 0.004, 0.002, 0.008, 0.004, 0.004, 0.002, 0.004,
0.006, 0.002, 0.004, 0.004, 0.002, 0.004, 0.004, 0.004, 0.001, 0.002, 0.002, 0.008,
0.02 , 0.004, 0.006, 0.002, 0.02 , 0.002, 0.002, 0.006, 0.004, 0.002, 0.001, 0.02,
0.006, 0.001, 0.002, 0.004, 0.001, 0.002, 0.006, 0.006, 0.004, 0.006, 0.001, 0.002,
0.004, 0.006, 0.006, 0.001, 0.04 , 0.006, 0.002, 0.004, 0.002, 0.002, 0.006, 0.002,
0.002, 0.004, 0.006, 0.006, 0.002, 0.002, 0.008, 0.006, 0.004, 0.002, 0.006, 0.002,
0.004, 0.006, 0.002, 0.004, 0.001, 0.004, 0.002, 0.004, 0.008, 0.006, 0.008, 0.002,
0.004, 0.002, 0.001, 0.004, 0.004, 0.004, 0.006, 0.008, 0.004, 0.001, 0.001, 0.002,
0.006, 0.004, 0.001, 0.002, 0.006, 0.004, 0.006, 0.008, 0.002, 0.002, 0.004, 0.002,
0.04 , 0.002, 0.002, 0.004, 0.002, 0.002, 0.006, 0.02 , 0.004, 0.002, 0.006, 0.02,
0.001, 0.002, 0.006, 0.004, 0.006, 0.004, 0.004, 0.004, 0.004, 0.002, 0.004, 0.04,
0.002, 0.008, 0.002, 0.004, 0.001, 0.004, 0.006, 0.004,
]
weights = {int(k):v for k,v in enumerate(weights)}
from numba import njit, prange
# 📊 Function to compute triplet imbalance in parallel using Numba
@njit(parallel=True)
def compute_triplet_imbalance(df_values, comb_indices):
num_rows = df_values.shape[0]
num_combinations = len(comb_indices)
imbalance_features = np.empty((num_rows, num_combinations))
# 🔁 Loop through all combinations of triplets
for i in prange(num_combinations):
a, b, c = comb_indices[i]
# 🔁 Loop through rows of the DataFrame
for j in range(num_rows):
max_val = max(df_values[j, a], df_values[j, b], df_values[j, c])
min_val = min(df_values[j, a], df_values[j, b], df_values[j, c])
mid_val = df_values[j, a] + df_values[j, b] + df_values[j, c] - min_val - max_val
# 🚫 Prevent division by zero
if mid_val == min_val:
imbalance_features[j, i] = np.nan
else:
imbalance_features[j, i] = (max_val - mid_val) / (mid_val - min_val)
return imbalance_features
# 📊 Function to compute triplet imbalance in parallel using Numba
@njit(parallel=True)
def compute_triplet_imbalance(df_values, comb_indices):
num_rows = df_values.shape[0]
num_combinations = len(comb_indices)
imbalance_features = np.empty((num_rows, num_combinations))
# 🔁 Loop through all combinations of triplets
for i in prange(num_combinations):
a, b, c = comb_indices[i]
# 🔁 Loop through rows of the DataFrame
for j in range(num_rows):
if df_values[j, a] < df_values[j, b]:
min_val = df_values[j, a]
max_val = df_values[j, b]
else:
max_val = df_values[j, a]
min_val = df_values[j, b]
if min_val < df_values[j, c]:
if df_values[j, c] < max_val:
mid_val = df_values[j, c]
else:
mid_val = max_val
max_val = df_values[j, c]
else:
mid_val = min_val
min_val = df_values[j, c]
# 🚫 Prevent division by zero
if max_val == min_val:
imbalance_features[j, i] = np.nan
elif mid_val == min_val:
imbalance_features[j, i] = np.nan
else:
imbalance_features[j, i] = (max_val - mid_val) / (mid_val - min_val)
return imbalance_features
# 📈 Function to calculate triplet imbalance for given price data and a DataFrame
def calculate_triplet_imbalance_numba(price, df):
# Convert DataFrame to numpy array for Numba compatibility
df_values = df[price].values
comb_indices = [(price.index(a), price.index(b), price.index(c)) for a, b, c in combinations(price, 3)]
# Calculate the triplet imbalance using the Numba-optimized function
features_array = compute_triplet_imbalance(df_values, comb_indices)
# Create a DataFrame from the results
columns = [f"{a}_{b}_{c}_imb2" for a, b, c in combinations(price, 3)]
features = pd.DataFrame(features_array, columns=columns)
return features
def imbalance_features(df):
stock_groups = df.groupby(["date_id", "seconds_in_bucket"])
# Index WAP
df["wwap"] = df.stock_id.map(weights) * df.wap
df["iwap"] = stock_groups["wwap"].transform(lambda x: x.sum())
del df["wwap"]
# Define lists of price and size-related column names
prices = ["reference_price", "far_price", "near_price", "ask_price", "bid_price", "wap"]
sizes = ["matched_size", "bid_size", "ask_size", "imbalance_size"]
# V1 features
# Calculate various features using Pandas eval function
df["volume"] = df.eval("ask_size + bid_size")
df["mid_price"] = df.eval("(ask_price + bid_price) / 2")
df["liquidity_imbalance"] = df.eval("(bid_size-ask_size)/(bid_size+ask_size)")
df["matched_imbalance"] = df.eval("(imbalance_size-matched_size)/(matched_size+imbalance_size)")
df["all_size"] = df.eval("matched_size + imbalance_size") # add
df["imbalance_size_for_buy_sell"] = df.eval("imbalance_size * imbalance_buy_sell_flag") # add
cols = ['wap', 'imbalance_size_for_buy_sell', "bid_size", "ask_size"]
for q in [0.25, 0.5, 0.75]: # Try more/different q
df[[f'{col}_quantile_{q}' for col in cols]] = stock_groups[cols].transform(lambda x: x.quantile(q)).astype(np.float32)
# Create features for pairwise price imbalances
for c in combinations(prices, 2):
df[f"{c[0]}_{c[1]}_imb"] = df.eval(f"({c[0]} - {c[1]})/({c[0]} + {c[1]})").astype(np.float32)
for c in combinations(sizes, 2):
df[f"{c[0]}/{c[1]}"] = df.eval(f"({c[0]})/({c[1]})").astype(np.float32)
# Calculate triplet imbalance features using the Numba-optimized function
for c in [['ask_price', 'bid_price', 'wap', 'reference_price'], sizes]:
triplet_feature = calculate_triplet_imbalance_numba(c, df)
df[triplet_feature.columns] = triplet_feature.values.astype(np.float32)
# V2 features
# Calculate additional features
stock_groups = df.groupby(['stock_id', 'date_id'])
df["imbalance_momentum"] = stock_groups['imbalance_size'].diff(periods=1) / df['matched_size']
df["price_spread"] = df["ask_price"] - df["bid_price"]
df["spread_intensity"] = stock_groups['price_spread'].diff()
df['price_pressure'] = df['imbalance_size'] * (df['ask_price'] - df['bid_price'])
df['market_urgency'] = df['price_spread'] * df['liquidity_imbalance']
df['depth_pressure'] = (df['ask_size'] - df['bid_size']) * (df['far_price'] - df['near_price'])
df['wap_advantage'] = df.wap - df.iwap # add
# Calculate various statistical aggregation features
df_prices = df[prices]
df_sizes = df[sizes]
for func in ["mean", "std", "skew", "kurt"]:
df[f"all_prices_{func}"] = df_prices.agg(func, axis=1)
df[f"all_sizes_{func}"] = df_sizes.agg(func, axis=1)
# V3 features
# Calculate shifted and return features for specific columns
cols = ['matched_size', 'imbalance_size', 'reference_price', 'imbalance_buy_sell_flag', "wap", "iwap"]
stock_groups_cols = stock_groups[cols]
for window in [1, 2, 3, 6, 10]:
df[[f"{col}_shift_{window}" for col in cols]] = stock_groups_cols.shift(window)
cols = ['matched_size', 'imbalance_size', 'reference_price', "iwap"] #wap
stock_groups_cols = stock_groups[cols]
for window in [1, 2, 3, 6, 10]:
df[[f"{col}_ret_{window}" for col in cols]] = stock_groups_cols.pct_change(window).astype(np.float32)
# Calculate diff features for specific columns
cols = ['ask_price', 'bid_price', 'ask_size', 'bid_size', 'wap', 'near_price', 'far_price', 'imbalance_size_for_buy_sell']
stock_groups_cols = stock_groups[cols]
for window in [1, 2, 3, 6, 10]:
df[[f"{col}_diff_{window}" for col in cols]] = stock_groups_cols.diff(window).astype(np.float32)
# V4 features
# Construct `time_since_last_imbalance_change`
# 当`imbalance_buy_sell_flag`改变时,'flag_change'列会为该行赋值为1
# df['flag_change'] = stock_groups['imbalance_buy_sell_flag'].diff().ne(0).astype(int)
# # 使用cumsum创建一个组标识符,每当flag改变时,组标识符增加
# df['group'] = stock_groups['flag_change'].cumsum()
# # 对每个组内的'seconds_in_bucket'计算时间差,以得到自上次flag改变以来的时间
# df['time_since_last_imbalance_change'] = df.groupby(['stock_id', 'date_id', 'group'])['seconds_in_bucket'].transform(lambda x: x - x.min())
# # `flag_change`为1的地方设为0
# df.loc[df['flag_change'] == 1, 'time_since_last_imbalance_change'] = 0
# df.drop(columns=['flag_change', 'group'], inplace=True)
df['flag_change'] = stock_groups['imbalance_buy_sell_flag'].diff().ne(0).astype(int)
# 使用cumsum创建一个组标识符,每当flag改变时,组标识符增加
df['group'] = df.groupby(['stock_id', 'date_id'])['flag_change'].cumsum()
# 对每个组内的'seconds_in_bucket'计算时间差,以得到自上次flag改变以来的时间
group_min = df.groupby(['stock_id', 'date_id', 'group'])['seconds_in_bucket'].transform('min')
df['time_since_last_imbalance_change'] = df['seconds_in_bucket'] - group_min
# `flag_change`为1的地方设为0
df['time_since_last_imbalance_change'] *= (1 - df['flag_change'])
df.drop(columns=['flag_change', 'group'], inplace=True)
cols = ['imbalance_size_for_buy_sell']
stock_groups_cols = stock_groups[cols]
for window in [5, 10]:
mean_col = stock_groups_cols.transform(lambda x: x.rolling(window=window).mean())
std_col = stock_groups_cols.transform(lambda x: x.rolling(window=window).std())
df[[f'z_score_{col}_{window}' for col in cols]] = (df[cols] - mean_col) / std_col
# Replace infinite values with 0
return df.replace([np.inf, -np.inf], 0)
# 📅 Function to generate time and stock-related features
def other_features(df):
df["seconds"] = df["seconds_in_bucket"] % 60 # Seconds
df["minute"] = df["seconds_in_bucket"] // 60 # Minutes
# Map global features to the DataFrame
for key, value in global_stock_id_feats.items():
df[f"global_{key}"] = df["stock_id"].map(value.to_dict())
for key, value in global_seconds_feats.items():
df[f"global_seconds_{key}"] = df["seconds_in_bucket"].map(value.to_dict())
return df
def last_days_features(df: pd.DataFrame, feat_last=None, target_last=None):
size = None
if feat_last is not None and len(feat_last) > 0:
cols = [col for col in df.columns if col in set(feat_last.columns)]
if target_last is not None:
cols.append("target")
feat_last["target"] = target_last
df["target"] = 0
paddings = []
second_start = df.seconds_in_bucket.max()
padding_src = df[df.seconds_in_bucket == second_start]
size = len(df)
size_pad = len(padding_src) * 6
for second in range(second_start + 10, second_start + 70, 10):
padding = padding_src.copy()
padding["seconds_in_bucket"] = second
paddings.append(padding)
df = pd.concat([feat_last[cols], df] + paddings)
# Add Last days features
# TODO: Try more features
cols = ['near_price', 'far_price', 'depth_pressure']
if 'target' in df.columns:
cols.append('target')
stock_groups = df.groupby(['stock_id', 'seconds_in_bucket'])
stock_groups_cols = stock_groups[cols]
for window in [1]:
df[[f"{col}_last_{window}day" for col in cols]] = stock_groups_cols.shift(window)
if cols[-1] == "target":
cols.pop()
cols = [f"{col}_last_{window}day" for col in cols]
stock_groups = df.groupby(['stock_id', 'date_id'])
stock_groups_cols = stock_groups[cols]
for window in [1, 2, 3, 6]:
df[[f"{col}_future_{window}" for col in cols]] = stock_groups_cols.shift(-window)
if size:
return df[-(size + size_pad):-size_pad]
return df
# 🚀 Function to generate all features by combining imbalance and other features
def generate_all_features(df, feat_last=None, target_last=None):
# Select relevant columns for feature generation
cols = [c for c in df.columns if c not in {"row_id", "time_id", "currently_scored"}]
df = df[cols]
# Generate imbalance features
df = imbalance_features(df)
# Generate last days features
df = last_days_features(df, feat_last, target_last)
# Generate time and stock-related features
df = other_features(df)
gc.collect() # Perform garbage collection to free up memory
# Select and return the generated features
feature_name = [i for i in df.columns if i not in {"row_id", "target", "time_id"}]
return df[feature_name]