import numpy as np
import pandas as pd
import eventstudy
import multiprocessing
from tqdm import tqdm
# 读取个股历史回报率数据
daily_stock_returns = pd.read_csv(
"./result/日个股回报率.csv", dtype={"ticker": str}
)
# 读取 ff 三因子数据
famafrench_factors = pd.read_csv(
"./result/famafrench_factors.csv"
)
# 读取事件样本数据
sample_famafrench = pd.read_csv(
"./result/sample_famafrench.csv", dtype={"ticker": str}
)
# 切分 dataframe 函数
def get_batches(df: pd.DataFrame, num_splits: int) -> list:
"""将一个 DataFrame 切分为给定数量的子 DataFrame
Args:
df (pd.DataFrame): 待切分 DataFrame
num_splits (int): 切分成多少个子 DataFrame
Returns:
list: 子 DataFrame 列表
"""
batch_array_list = np.array_split(df.values, num_splits)
return [pd.DataFrame(batch_array, columns=df.columns) for batch_array in batch_array_list]
es_famafrench_list = [
eventstudy.EventStudy(
sample,
"FamaFrench",
famafrench_factors,
daily_stock_returns,
0,
2,
20,
220,)
for sample in get_batches(sample_famafrench, 1000)
]
def process_es(es, progress_queue):
result = es.get_result(progress=False)
# 任务完成后向进度队列发送消息
progress_queue.put(1)
return result["sample_cars"]
def track_progress(progress_queue, total_tasks):
with tqdm(total=total_tasks) as pbar:
for _ in range(total_tasks):
progress_queue.get()
# 手动更新进度条
pbar.update(1)
if __name__ == "__main__":
# 创建一个队列
# multiprocessing.Manager().Queue() 队列可用于不同进程间的通信
manager = multiprocessing.Manager()
progress_queue = manager.Queue()
# 获取任务数量
total_tasks = len(es_famafrench_list)
# 启动进度跟踪进程
progress_process = multiprocessing.Process(
target=track_progress, args=(progress_queue, total_tasks)
)
progress_process.start()
with multiprocessing.Pool(processes=4) as pool:
# 使用 pool.apply_async 异步处理 DataFrame 列表中的每个 DataFrame
results = [
pool.apply_async(process_es, (es, progress_queue)) for es in es_famafrench_list
]
# 获取每个异步任务的结果
results = [r.get() for r in results]
# 等待进度跟踪进程完成
progress_process.join()
final = pd.concat(results, axis=0)
final.to_csv("./result/car_famafrench_model.csv")计算 20 万个事件样本的 CAR,思路是将 20 万个事件样本切分成 1000 个 batch,然后采用多进程并行计算。计算过程使用到了之前博客介绍过的 eventstudy 脚本。完整代码如下: