Python量化策略回测系统MA、RSI、布林策略(文末附完整代码)

内容分享1个月前发布
1 5 0

Python量化策略回测系统MA、RSI、布林策略(文末附完整代码)

在金融数据分析领域,策略回测是每位量化投资者的必修课。今天分享的这套三策略信号K线图系统,经过实测可处理688360、603300等股票数据,生成可视化回测报告。本文将通过真实案例,手把手教你从基础配置到高级应用。

Python量化策略回测系统MA、RSI、布林策略(文末附完整代码)

一、系统架构解析:三把利器的协同作战

这套系统整合了移动平均线、RSI相对强弱指标、布林带三大经典策略,每个策略都有明确的交易逻辑:

移动平均线策略采用5日与20日均线组合,当短期均线上穿长期均线时触发买入信号,反之则卖出。这种策略在趋势行情中表现突出,尤其适合中长期持有。

def generate_signals(self, df):
        """生成MA信号"""
        df = df.copy()
        df['short_ma'] = df['close'].rolling(window=self.params['short_ma']).mean()
        df['long_ma'] = df['close'].rolling(window=self.params['long_ma']).mean()
        
        # 生成交易信号
        df['signal'] = 0
        # 金叉买入,死叉卖出
        df.loc[df['short_ma'] > df['long_ma'], 'signal'] = 1
        df.loc[df['short_ma'] < df['long_ma'], 'signal'] = -1
        
        # 生成持仓状态
        df['position'] = df['signal'].diff()
        
        return df

RSI策略设置30超卖、70超买阈值,通过14日周期计算相对强弱。当RSI从超卖区回升时建仓,从超买区回落时平仓,有效捕捉震荡行情中的反转机会。

def generate_signals(self, df):
        """生成RSI信号"""
        df = df.copy()
        df['rsi'] = self.calculate_rsi(df, self.params['rsi_period'])
        
        df['signal'] = 0
        df['position'] = 0
        
        # 使用状态机方式生成信号
        in_position = False
        
        for i in range(1, len(df)):
            rsi = df['rsi'].iloc[i]
            prev_rsi = df['rsi'].iloc[i-1]
            
            if not in_position and rsi < self.params['oversold'] and prev_rsi >= self.params['oversold']:
                df.loc[df.index[i], 'signal'] = 1
                in_position = True
            elif in_position and rsi > self.params['overbought'] and prev_rsi <= self.params['overbought']:
                df.loc[df.index[i], 'signal'] = -1
                in_position = False
        
        return df

布林带策略基于20日均线构建上下轨,价格突破下轨后反弹买入,触及上轨后回落卖出。这种策略对波动率变化敏感,适合波段操作。

def generate_signals(self, df):
        """生成布林带信号"""
        df = self.calculate_bollinger_bands(df)
        
        df['signal'] = 0
        
        # 使用状态机方式生成信号
        in_position = False
        
        for i in range(1, len(df)):
            close = df['close'].iloc[i]
            prev_close = df['close'].iloc[i-1]
            bb_lower = df['bb_lower'].iloc[i]
            bb_upper = df['bb_upper'].iloc[i]
            prev_bb_lower = df['bb_lower'].iloc[i-1]
            prev_bb_upper = df['bb_upper'].iloc[i-1]
            
            # 价格从下向上突破下轨买入
            if not in_position and prev_close <= prev_bb_lower and close > bb_lower:
                df.loc[df.index[i], 'signal'] = 1
                in_position = True
            # 价格从上向下突破上轨卖出
            elif in_position and prev_close >= prev_bb_upper and close < bb_upper:
                df.loc[df.index[i], 'signal'] = -1
                in_position = False
        
        return df

二、环境搭建:30分钟完成配置

系统依赖三个核心库:akshare负责获取实时股票数据,pandas处理数据清洗,pyecharts生成交互式图表。安装命令如下:

pip install akshare pandas pyecharts

首次运行时,系统会自动下载688360(德马科技)等示例数据,测试环境是否正常。提议先运行单只股票验证,再扩展到多股票分析。

三、实战案例:603300海南华铁完整分析

以海南华铁为例,演示完整分析流程。设置2020-2025年回测周期,初始资金10万元,系统会输出三策略对比结果:

analyzer = MultiStrategyAnalyzer()
results = analyzer.run_all_strategies("603300", "2020-01-01", "2025-12-31")

运行后生成四个关键文件:603300_ma_策略信号图.html展示移动平均线买卖点,603300_rsi_策略信号图.html标注RSI反转位置,603300_bollinger_策略信号图.html显示布林带突破时机,以及策略对比报告。

四、深度应用:参数优化与风险控制

系统支持自定义参数,例如将RSI周期调整为21日,超买超卖阈值设为25/75,可适应不同市场环境。

通过修改策略类的__init__方法实现

# 自定义RSI参数
rsi_strategy = RSIStrategy(rsi_period=21, oversold=25, overbought=75)

风险控制方面,提议设置单笔交易不超过总资金的20%,连续亏损3次暂停交易。可在backtest方法中添加止损逻辑,当单日跌幅超过5%时强制平仓。

五、数据验证:真实回测结果对比

Python量化策略回测系统MA、RSI、布林策略(文末附完整代码)

以688360德马科技为例,2020-2025年回测显示:MA策略产生72次交易,最终收益率6.41%;RSI策略交易30次,收益率139.97%;布林带策略交易24次,收益率50.53%。数据证明趋势策略在成长股中优势明显。

================================================================================
688360 - 三策略回测结果对比
================================================================================
策略名称           最终资金        收益率         交易次数
--------------------------------------------------------------------------------
MA策略           106408.81   6.41%       72
RSI策略          239972.33   139.97%     30
布林带策略          150525.21   50.53%      24

 最佳策略: rsi
 最终收益: 239972.33
 收益率: 139.97%
 德马科技 分析完成!

系统生成的HTML图表支持鼠标悬停查看详情,绿色三角形标记买入点,红色三角形标记卖出点,成交量柱状图辅助判断信号有效性。

Python量化策略回测系统MA、RSI、布林策略(文末附完整代码)

六、常见问题解决方案

数据获取失败:检查股票代码是否正确,部分科创板股票需使用完整代码。网络异常时可先用本地CSV文件测试。

信号过于频繁:调整参数阈值,如将MA策略的均线周期改为10/30组合,减少交易次数。

图表显示异常:确认pyecharts版本≥1.9,旧版本可能出现坐标轴错位。

七、进阶技巧:多股票批量分析

通过循环调用可实现批量分析,提议分批处理避免接口限制:

stock_list = ["601918", "603300", "688360"]
for code in stock_list:
    analyzer.run_all_strategies(code, "2023-01-01", "2024-12-31")

系统会自动为每只股票生成独立报告,便于横向对比不同策略在各股票上的表现差异。

Python量化策略回测系统MA、RSI、布林策略(文末附完整代码)

八、使用提议与风险提示

本系统适用于技术分析学习者、量化入门者,提议先用模拟盘验证3个月以上再实盘应用。历史回测不代表未来收益,实际交易需思考滑点、手续费等因素。

免责声明:本文提供的工具仅用于教育目的,不构成投资提议。股市有风险,投资需谨慎。使用者应充分了解相关风险,自主决策并承担相应后果。

这套系统已协助多位用户完成策略验证,你准备从哪个股票开始测试?欢迎在实践中分享使用心得。

完整代码(有兴趣的可以关注我,共同研究)

# 修正三策略信号K线图系统.py
import pandas as pd
import akshare as ak
from pyecharts.charts import Kline, Line, Scatter
from pyecharts import options as opts
import warnings
import os
warnings.filterwarnings('ignore')

class StockDataLoader:
    """股票数据获取器"""
    def __init__(self):
        pass
    
    def get_stock_data(self, stock_code, start_date, end_date):
        """获取股票历史数据"""
        try:
            df = ak.stock_zh_a_hist(
                symbol=stock_code,
                period="daily",
                start_date=start_date.replace('-', ''),
                end_date=end_date.replace('-', ''),
                adjust="qfq"
            )
            
            if df.empty:
                raise ValueError("获取数据为空")
                
            # 重命名列
            df = df.rename(columns={
                '日期': 'date',
                '开盘': 'open',
                '收盘': 'close',
                '最高': 'high',
                '最低': 'low',
                '成交量': 'volume'
            })
            
            df['date'] = pd.to_datetime(df['date'])
            df = df.sort_values('date')
            df.set_index('date', inplace=True)
            
            return df
        except Exception as e:
            print(f"获取股票数据失败: {e}")
            return None

class BaseStrategy:
    """基础策略类"""
    def __init__(self, name, params):
        self.name = name
        self.params = params
        
    def generate_signals(self, df):
        """生成交易信号"""
        pass
        
    def backtest(self, df, initial_capital=100000):
        """回测策略"""
        pass

class MaStrategy(BaseStrategy):
    """移动平均线策略"""
    def __init__(self, short_ma=5, long_ma=20):
        super().__init__('MA策略', {'short_ma': short_ma, 'long_ma': long_ma})
        
    def generate_signals(self, df):
        """生成MA信号"""
        df = df.copy()
        df['short_ma'] = df['close'].rolling(window=self.params['short_ma']).mean()
        df['long_ma'] = df['close'].rolling(window=self.params['long_ma']).mean()
        
        # 生成交易信号
        df['signal'] = 0
        # 金叉买入,死叉卖出
        df.loc[df['short_ma'] > df['long_ma'], 'signal'] = 1
        df.loc[df['short_ma'] < df['long_ma'], 'signal'] = -1
        
        # 生成持仓状态
        df['position'] = df['signal'].diff()
        
        return df
    
    def backtest(self, df, initial_capital=100000):
        """回测MA策略"""
        df = self.generate_signals(df)
        
        capital = initial_capital
        position = 0
        trades = []
        
        for i in range(1, len(df)):
            date = df.index[i]
            row = df.iloc[i]
            prev_row = df.iloc[i-1]
            
            # 金叉买入
            if prev_row['short_ma'] <= prev_row['long_ma'] and row['short_ma'] > row['long_ma']:
                if position == 0:
                    shares = int(capital / row['close'])
                    if shares > 0:
                        cost = shares * row['close']
                        capital -= cost
                        position = shares
                        trades.append({
                            'date': date,
                            'type': 'buy',
                            'price': row['close'],
                            'shares': shares,
                            'value': cost,
                            'capital': capital
                        })
                        
            # 死叉卖出
            elif prev_row['short_ma'] >= prev_row['long_ma'] and row['short_ma'] < row['long_ma']:
                if position > 0:
                    value = position * row['close']
                    capital += value
                    trades.append({
                        'date': date,
                        'type': 'sell',
                        'price': row['close'],
                        'shares': position,
                        'value': value,
                        'capital': capital
                    })
                    position = 0
        
        final_value = capital + position * df['close'].iloc[-1]
        
        return {
            'final_value': final_value,
            'return_rate': (final_value - initial_capital) / initial_capital,
            'trades': trades,
            'df': df
        }

class RSIStrategy(BaseStrategy):
    """RSI相对强弱指标策略"""
    def __init__(self, rsi_period=14, oversold=30, overbought=70):
        super().__init__('RSI策略', {
            'rsi_period': rsi_period,
            'oversold': oversold,
            'overbought': overbought
        })
    
    def calculate_rsi(self, df, period=14):
        """计算RSI指标"""
        delta = df['close'].diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
        
        rs = gain / loss
        rsi = 100 - (100 / (1 + rs))
        return rsi
    
    def generate_signals(self, df):
        """生成RSI信号"""
        df = df.copy()
        df['rsi'] = self.calculate_rsi(df, self.params['rsi_period'])
        
        df['signal'] = 0
        df['position'] = 0
        
        # 使用状态机方式生成信号
        in_position = False
        
        for i in range(1, len(df)):
            rsi = df['rsi'].iloc[i]
            prev_rsi = df['rsi'].iloc[i-1]
            
            if not in_position and rsi < self.params['oversold'] and prev_rsi >= self.params['oversold']:
                df.loc[df.index[i], 'signal'] = 1
                in_position = True
            elif in_position and rsi > self.params['overbought'] and prev_rsi <= self.params['overbought']:
                df.loc[df.index[i], 'signal'] = -1
                in_position = False
        
        return df
    
    def backtest(self, df, initial_capital=100000):
        """回测RSI策略"""
        df = self.generate_signals(df)
        
        capital = initial_capital
        position = 0
        trades = []
        
        for i in range(1, len(df)):
            date = df.index[i]
            row = df.iloc[i]
            prev_row = df.iloc[i-1]
            
            # RSI从超卖区域回升买入
            if prev_row['signal'] != 1 and row['signal'] == 1:
                if position == 0:
                    shares = int(capital / row['close'])
                    if shares > 0:
                        cost = shares * row['close']
                        capital -= cost
                        position = shares
                        trades.append({
                            'date': date,
                            'type': 'buy',
                            'price': row['close'],
                            'shares': shares,
                            'value': cost,
                            'capital': capital,
                            'rsi': row.get('rsi', 0)
                        })
                        
            # RSI从超买区域回落卖出
            elif prev_row['signal'] != -1 and row['signal'] == -1:
                if position > 0:
                    value = position * row['close']
                    capital += value
                    trades.append({
                        'date': date,
                        'type': 'sell',
                        'price': row['close'],
                        'shares': position,
                        'value': value,
                        'capital': capital,
                        'rsi': row.get('rsi', 0)
                    })
                    position = 0
        
        final_value = capital + position * df['close'].iloc[-1]
        
        return {
            'final_value': final_value,
            'return_rate': (final_value - initial_capital) / initial_capital,
            'trades': trades,
            'df': df
        }

class BollingerStrategy(BaseStrategy):
    """布林带策略"""
    def __init__(self, bb_period=20, bb_std=2):
        super().__init__('布林带策略', {
            'bb_period': bb_period,
            'bb_std': bb_std
        })
    
    def calculate_bollinger_bands(self, df):
        """计算布林带"""
        df = df.copy()
        df['bb_middle'] = df['close'].rolling(window=self.params['bb_period']).mean()
        df['bb_std'] = df['close'].rolling(window=self.params['bb_period']).std()
        df['bb_upper'] = df['bb_middle'] + (df['bb_std'] * self.params['bb_std'])
        df['bb_lower'] = df['bb_middle'] - (df['bb_std'] * self.params['bb_std'])
        return df
    
    def generate_signals(self, df):
        """生成布林带信号"""
        df = self.calculate_bollinger_bands(df)
        
        df['signal'] = 0
        
        # 使用状态机方式生成信号
        in_position = False
        
        for i in range(1, len(df)):
            close = df['close'].iloc[i]
            prev_close = df['close'].iloc[i-1]
            bb_lower = df['bb_lower'].iloc[i]
            bb_upper = df['bb_upper'].iloc[i]
            prev_bb_lower = df['bb_lower'].iloc[i-1]
            prev_bb_upper = df['bb_upper'].iloc[i-1]
            
            # 价格从下向上突破下轨买入
            if not in_position and prev_close <= prev_bb_lower and close > bb_lower:
                df.loc[df.index[i], 'signal'] = 1
                in_position = True
            # 价格从上向下突破上轨卖出
            elif in_position and prev_close >= prev_bb_upper and close < bb_upper:
                df.loc[df.index[i], 'signal'] = -1
                in_position = False
        
        return df
    
    def backtest(self, df, initial_capital=100000):
        """回测布林带策略"""
        df = self.generate_signals(df)
        
        capital = initial_capital
        position = 0
        trades = []
        
        for i in range(1, len(df)):
            date = df.index[i]
            row = df.iloc[i]
            prev_row = df.iloc[i-1]
            
            # 布林带突破买入信号
            if prev_row['signal'] != 1 and row['signal'] == 1:
                if position == 0:
                    shares = int(capital / row['close'])
                    if shares > 0:
                        cost = shares * row['close']
                        capital -= cost
                        position = shares
                        trades.append({
                            'date': date,
                            'type': 'buy',
                            'price': row['close'],
                            'shares': shares,
                            'value': cost,
                            'capital': capital
                        })
                        
            # 布林带突破卖出信号
            elif prev_row['signal'] != -1 and row['signal'] == -1:
                if position > 0:
                    value = position * row['close']
                    capital += value
                    trades.append({
                        'date': date,
                        'type': 'sell',
                        'price': row['close'],
                        'shares': position,
                        'value': value,
                        'capital': capital
                    })
                    position = 0
        
        final_value = capital + position * df['close'].iloc[-1]
        
        return {
            'final_value': final_value,
            'return_rate': (final_value - initial_capital) / initial_capital,
            'trades': trades,
            'df': df
        }

class MultiStrategyAnalyzer:
    """多策略分析器"""
    def __init__(self):
        self.data_loader = StockDataLoader()
        self.strategies = {
            'ma': MaStrategy(),
            'rsi': RSIStrategy(),
            'bollinger': BollingerStrategy()
        }
        
    def run_all_strategies(self, stock_code, start_date, end_date):
        """运行所有策略"""
        print(f"正在获取{stock_code}的历史数据...")
        df = self.data_loader.get_stock_data(stock_code, start_date, end_date)
        if df is None:
            return None
        
        results = {}
        
        for strategy_name, strategy in self.strategies.items():
            print(f"
正在运行{strategy.name}策略...")
            result = strategy.backtest(df)
            results[strategy_name] = result
            
            # 生成策略信号图
            self.create_strategy_chart(df, result['trades'], stock_code, strategy.name, strategy_name)
        
        # 输出对比结果
        self.print_comparison_results(stock_code, results)
        
        return results
    
    def create_strategy_chart(self, df, trades, stock_code, strategy_name, strategy_key):
        """创建策略信号图"""
        kline_data = df[['open', 'close', 'low', 'high']].values.tolist()
        dates = df.index.strftime('%Y-%m-%d').tolist()
        
        # 创建K线图
        kline = (
            Kline()
            .add_xaxis(dates)
            .add_yaxis(
                series_name="K线",
                y_axis=kline_data,
                itemstyle_opts=opts.ItemStyleOpts(
                    color="#ef232a",
                    color0="#14b143"
                )
            )
            .set_global_opts(
                title_opts=opts.TitleOpts(
                    title=f"{stock_code} - {strategy_name}信号图",
                    subtitle=f"回测期间: {dates[0]}{dates[-1]}"
                ),
                xaxis_opts=opts.AxisOpts(type_="category"),
                yaxis_opts=opts.AxisOpts(is_scale=True),
                tooltip_opts=opts.TooltipOpts(trigger="axis"),
                datazoom_opts=[opts.DataZoomOpts(), opts.DataZoomOpts(type_="slider")]
            )
        )
        
        # 添加技术指标
        if strategy_key == 'ma':
            df['ma5'] = df['close'].rolling(5).mean()
            df['ma20'] = df['close'].rolling(20).mean()
            
            line = Line().add_xaxis(dates)
            line.add_yaxis("MA5", df['ma5'].round(2).tolist(), is_smooth=True)
            line.add_yaxis("MA20", df['ma20'].round(2).tolist(), is_smooth=True)
            kline = kline.overlap(line)
            
        elif strategy_key == 'rsi':
            df['rsi'] = self.strategies['rsi'].calculate_rsi(df)
            line = Line().add_xaxis(dates)
            line.add_yaxis("RSI", df['rsi'].round(2).tolist(), is_smooth=True)
            kline = kline.overlap(line)
            
        elif strategy_key == 'bollinger':
            df = self.strategies['bollinger'].calculate_bollinger_bands(df)
            line = Line().add_xaxis(dates)
            line.add_yaxis("BB Upper", df['bb_upper'].round(2).tolist(), is_smooth=True)
            line.add_yaxis("BB Middle", df['bb_middle'].round(2).tolist(), is_smooth=True)
            line.add_yaxis("BB Lower", df['bb_lower'].round(2).tolist(), is_smooth=True)
            kline = kline.overlap(line)
        
        # 添加买卖标记
        buy_dates = [t['date'].strftime('%Y-%m-%d') for t in trades if t['type'] == 'buy']
        buy_prices = [t['price'] for t in trades if t['type'] == 'buy']
        sell_dates = [t['date'].strftime('%Y-%m-%d') for t in trades if t['type'] == 'sell']
        sell_prices = [t['price'] for t in trades if t['type'] == 'sell']
        
        if buy_dates:
            buy_scatter = (
                Scatter()
                .add_xaxis(buy_dates)
                .add_yaxis("买入", buy_prices, symbol_size=10, symbol="triangle",
                          itemstyle_opts=opts.ItemStyleOpts(color="#00FF00"))
            )
            kline = kline.overlap(buy_scatter)
        
        if sell_dates:
            sell_scatter = (
                Scatter()
                .add_xaxis(sell_dates)
                .add_yaxis("卖出", sell_prices, symbol_size=10, symbol="triangle",
                          itemstyle_opts=opts.ItemStyleOpts(color="#FF0000"),
                          symbol_rotate=180)
            )
            kline = kline.overlap(sell_scatter)
        
        # 保存图表
        filename = f"{stock_code}_{strategy_key}_策略信号图.html"
        kline.render(filename)
        print(f"✅ {strategy_name}信号图已保存: {filename}")
    
    def print_comparison_results(self, stock_code, results):
        """打印策略对比结果"""
        print("
" + "=" * 80)
        print(f"{stock_code} - 三策略回测结果对比")
        print("=" * 80)
        
        print(f"{'策略名称':<15}{'最终资金':<12}{'收益率':<12}{'交易次数':<10}")
        print("-" * 80)
        
        for strategy_name, result in results.items():
            strategy_map = {
                'ma': 'MA策略',
                'rsi': 'RSI策略',
                'bollinger': '布林带策略'
            }
            
            final_value = result['final_value']
            return_rate = result['return_rate']
            trades = result['trades']
            
            print(f"{strategy_map[strategy_name]:<15}{final_value:<12.2f}{return_rate:<12.2%}{len(trades):<10}")
        
        # 找出最佳策略
        best_strategy = max(results.items(), key=lambda x: x[1]['final_value'])
        print(f"
 最佳策略: {best_strategy[0]}")
        print(f" 最终收益: {best_strategy[1]['final_value']:.2f}元")
        print(f" 收益率: {best_strategy[1]['return_rate']:.2%}")

# 一键运行脚本
if __name__ == "__main__":
    analyzer = MultiStrategyAnalyzer()
    
    # 示例股票
    stocks = [
        {"code": "688360", "name": "德马科技"},
        {"code": "603300", "name": "海南华铁"},
        {"code": "601918", "name": "新集能源"}
    ]
    
    start_date = "2020-01-01"
    end_date = "2025-12-31"
    
    print(" 开始三策略回测分析...")
    
    for stock in stocks:
        print(f"
 分析 {stock['name']}({stock['code']})...")
        results = analyzer.run_all_strategies(
            stock['code'], 
            start_date, 
            end_date
        )
        
        if results:
            print(f"✅ {stock['name']} 分析完成!")
    
    print("
" + "=" * 80)
    print(" 所有分析完成!")
    print(" 生成的文件:")
    for file in os.listdir('.'):
        if file.endswith('_策略信号图.html'):
            print(f"   - {file}")
    
    print("
 使用说明:")
    print("   1. 双击HTML文件在浏览器中打开")
    print("   2. 绿色三角形:买入信号")
    print("   3. 红色三角形:卖出信号")
    print("   4. 鼠标悬停查看详细信息")
© 版权声明

相关文章

5 条评论

您必须登录才能参与评论!
立即登录
  • 头像
    糊涂蛋不迷糊_ 投稿者

    收藏学习转发

    无记录
  • 头像
    天天开心呀a 投稿者

    感谢收藏转发。

    无记录
  • 头像
    鱼日丘山 读者

    收藏了,感谢分享

    无记录
  • 头像
    小紫 读者

    感谢转发。

    无记录
  • 头像
    努力努力再努力 读者

    感谢转发

    无记录