import numpy as np
import pandas as pd
import eventstudy
import multiprocessing
from tqdm import tqdm
# 读取个股历史回报率数据
= pd.read_csv(
daily_stock_returns "./result/日个股回报率.csv", dtype={"ticker": str}
)
# 读取 ff 三因子数据
= pd.read_csv(
famafrench_factors "./result/famafrench_factors.csv"
)
# 读取事件样本数据
= pd.read_csv(
sample_famafrench "./result/sample_famafrench.csv", dtype={"ticker": str}
)
# 切分 dataframe 函数
def get_batches(df: pd.DataFrame, num_splits: int) -> list:
"""将一个 DataFrame 切分为给定数量的子 DataFrame
Args:
df (pd.DataFrame): 待切分 DataFrame
num_splits (int): 切分成多少个子 DataFrame
Returns:
list: 子 DataFrame 列表
"""
= np.array_split(df.values, num_splits)
batch_array_list
return [pd.DataFrame(batch_array, columns=df.columns) for batch_array in batch_array_list]
= [
es_famafrench_list
eventstudy.EventStudy(
sample,"FamaFrench",
famafrench_factors,
daily_stock_returns,0,
2,
20,
220,)
for sample in get_batches(sample_famafrench, 1000)
]
def process_es(es, progress_queue):
= es.get_result(progress=False)
result # 任务完成后向进度队列发送消息
1)
progress_queue.put(return result["sample_cars"]
def track_progress(progress_queue, total_tasks):
with tqdm(total=total_tasks) as pbar:
for _ in range(total_tasks):
progress_queue.get()# 手动更新进度条
1)
pbar.update(
if __name__ == "__main__":
# 创建一个队列
# multiprocessing.Manager().Queue() 队列可用于不同进程间的通信
= multiprocessing.Manager()
manager = manager.Queue()
progress_queue
# 获取任务数量
= len(es_famafrench_list)
total_tasks # 启动进度跟踪进程
= multiprocessing.Process(
progress_process =track_progress, args=(progress_queue, total_tasks)
target
)
progress_process.start()
with multiprocessing.Pool(processes=4) as pool:
# 使用 pool.apply_async 异步处理 DataFrame 列表中的每个 DataFrame
= [
results for es in es_famafrench_list
pool.apply_async(process_es, (es, progress_queue))
]# 获取每个异步任务的结果
= [r.get() for r in results]
results
# 等待进度跟踪进程完成
progress_process.join()
= pd.concat(results, axis=0)
final "./result/car_famafrench_model.csv") final.to_csv(
计算 20 万个事件样本的 CAR,思路是将 20 万个事件样本切分成 1000 个 batch,然后采用多进程并行计算。计算过程使用到了之前博客介绍过的 eventstudy 脚本。完整代码如下: