计算20万个样本的CAR

Python
Author

Tom

Published

October 2, 2024

计算 20 万个事件样本的 CAR,思路是将 20 万个事件样本切分成 1000 个 batch,然后采用多进程并行计算。计算过程使用到了之前博客介绍过的 eventstudy 脚本。完整代码如下:

import numpy as np
import pandas as pd
import eventstudy
import multiprocessing
from tqdm import tqdm

# 读取个股历史回报率数据
daily_stock_returns = pd.read_csv(
    "./result/日个股回报率.csv", dtype={"ticker": str}
)

# 读取 ff 三因子数据
famafrench_factors = pd.read_csv(
    "./result/famafrench_factors.csv"
)

# 读取事件样本数据
sample_famafrench = pd.read_csv(
    "./result/sample_famafrench.csv", dtype={"ticker": str}
)

# 切分 dataframe 函数
def get_batches(df: pd.DataFrame, num_splits: int) -> list:
    """将一个 DataFrame 切分为给定数量的子 DataFrame
    Args:
        df (pd.DataFrame): 待切分 DataFrame
        num_splits (int): 切分成多少个子 DataFrame 
        
    Returns:
        list: 子 DataFrame 列表
    """ 
    batch_array_list = np.array_split(df.values, num_splits)
    
    return [pd.DataFrame(batch_array, columns=df.columns) for batch_array in batch_array_list]

es_famafrench_list = [
    eventstudy.EventStudy(
        sample,
        "FamaFrench",
        famafrench_factors,
        daily_stock_returns,
        0,
        2,
        20,
        220,)
    for sample in get_batches(sample_famafrench, 1000)
]

def process_es(es, progress_queue):
    result = es.get_result(progress=False)
    # 任务完成后向进度队列发送消息
    progress_queue.put(1)
    return result["sample_cars"]

def track_progress(progress_queue, total_tasks):
    with tqdm(total=total_tasks) as pbar:
        for _ in range(total_tasks):
            progress_queue.get()
            # 手动更新进度条
            pbar.update(1)

if __name__ == "__main__":
    # 创建一个队列
    # multiprocessing.Manager().Queue() 队列可用于不同进程间的通信
    manager = multiprocessing.Manager()
    progress_queue = manager.Queue()
    
    # 获取任务数量
    total_tasks = len(es_famafrench_list)
    # 启动进度跟踪进程
    progress_process = multiprocessing.Process(
        target=track_progress, args=(progress_queue, total_tasks)
    )
    progress_process.start()
    
    with multiprocessing.Pool(processes=4) as pool:
         # 使用 pool.apply_async 异步处理 DataFrame 列表中的每个 DataFrame
        results = [
            pool.apply_async(process_es, (es, progress_queue)) for es in es_famafrench_list
        ]
        # 获取每个异步任务的结果
        results = [r.get() for r in results]
    
    # 等待进度跟踪进程完成
    progress_process.join()
    
    final = pd.concat(results, axis=0)
    final.to_csv("./result/car_famafrench_model.csv")