基于Python和Ansible实现服务器批量健康检查与自动修复系统

基于Python和Ansible实现服务器批量健康检查与自动修复系统

前言

随着云计算技术的发展和IT基础设施规模的不断扩大,传统的手工运维方式已经无法满足现代业务的需求。自动化运维通过使用自动化工具和技术来管理和维护服务器,能够提高运维效率、减少人工错误,并提升服务的稳定性和可靠性。

当前,自动化运维正向着智能化管理的方向发展。通过集成人工智能和机器学习技术,能够实现更加智能化的故障预测和自我修复能力,同时随着容器技术和微服务架构的普及,自动化运维工具也在不断演进。

本文将介绍如何使用Python和Ansible构建一个实用的服务器批量健康检查与自动修复系统,实现对多台服务器的自动化监控、问题诊断和故障自愈。

项目背景与需求

在实际的生产环境中,运维团队经常面临以下挑战:

服务器数量庞大:管理数百甚至数千台服务器,手工检查效率低下故障发现滞后:等到业务受影响才发现问题,缺乏主动发现机制重复性工作多:日常巡检、配置检查等工作重复且容易出错故障修复慢:需要人工介入,响应时间长

系统架构设计

整体架构


┌─────────────────────────────────────────────────────┐
│                   控制节点(Master)                   │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────┐  │
│  │ Python脚本   │  │  Ansible     │  │  告警模块 │  │
│  │  调度器      │──│  Playbook    │──│  (邮件)   │  │
│  └──────────────┘  └──────────────┘  └──────────┘  │
└──────────────────────┬──────────────────────────────┘
                       │ SSH
        ┌──────────────┼──────────────┬───────────────┐
        ▼              ▼              ▼               ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│   Node 1     │ │   Node 2     │ │   Node 3     │ │   Node N     │
│  (被管理主机) │ │  (被管理主机) │ │  (被管理主机) │ │  (被管理主机) │
└──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘

功能模块

健康检查模块:CPU、内存、磁盘、网络、服务状态检查自动修复模块:自动清理日志、重启服务、释放内存等告警通知模块:邮件、钉钉、企业微信通知报告生成模块:生成HTML格式的巡检报告调度管理模块:定时任务调度,支持不同频率的检查

环境准备

1. 安装Ansible


# CentOS/RHEL
sudo yum install epel-release -y
sudo yum install ansible python3 python3-pip -y

# Ubuntu/Debian
sudo apt-get update
sudo apt-get install ansible python3 python3-pip -y

2. 安装Python依赖


pip3 install pyyaml jinja2 paramiko psutil requests pandas

3. 配置SSH免密登录


# 生成SSH密钥
ssh-keygen -t rsa -b 2048

# 将公钥复制到所有被管理节点
ssh-copy-id user@node1
ssh-copy-id user@node2
# ... 其他节点

核心代码实现

1. Ansible Inventory配置

创建
inventory/hosts.yml
:


all:
  children:
    web_servers:
      hosts:
        web01:
          ansible_host: 192.168.1.10
          ansible_user: root
        web02:
          ansible_host: 192.168.1.11
          ansible_user: root
    db_servers:
      hosts:
        db01:
          ansible_host: 192.168.1.20
          ansible_user: root
        db02:
          ansible_host: 192.168.1.21
          ansible_user: root
    app_servers:
      hosts:
        app01:
          ansible_host: 192.168.1.30
          ansible_user: root
  vars:
    ansible_python_interpreter: /usr/bin/python3

2. 健康检查脚本

创建
scripts/health_check.py
:


#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import sys
import json
import psutil
import socket
import datetime
import subprocess
from pathlib import Path

class ServerHealthChecker:
    """服务器健康状态检查器"""
    
    def __init__(self):
        self.hostname = socket.gethostname()
        self.check_time = datetime.datetime.now()
        self.results = {
            'hostname': self.hostname,
            'check_time': self.check_time.strftime('%Y-%m-%d %H:%M:%S'),
            'checks': {}
        }
    
    def check_cpu(self):
        """检查CPU使用情况"""
        try:
            cpu_percent = psutil.cpu_percent(interval=1)
            cpu_count = psutil.cpu_count()
            
            status = 'OK'
            if cpu_percent > 80:
                status = 'WARNING'
            elif cpu_percent > 90:
                status = 'CRITICAL'
            
            self.results['checks']['cpu'] = {
                'usage_percent': cpu_percent,
                'cpu_count': cpu_count,
                'status': status,
                'threshold': {'warning': 80, 'critical': 90}
            }
            
            # 获取TOP进程
            processes = []
            for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
                try:
                    proc_info = proc.info
                    if proc_info['cpu_percent'] > 5:
                        processes.append({
                            'pid': proc_info['pid'],
                            'name': proc_info['name'],
                            'cpu_percent': proc_info['cpu_percent']
                        })
                except (psutil.NoSuchProcess, psutil.AccessDenied):
                    pass
            
            self.results['checks']['cpu']['top_processes'] = sorted(
                processes, key=lambda x: x['cpu_percent'], reverse=True
            )[:5]
            
        except Exception as e:
            self.results['checks']['cpu'] = {
                'status': 'ERROR',
                'error': str(e)
            }
    
    def check_memory(self):
        """检查内存使用情况"""
        try:
            memory = psutil.virtual_memory()
            swap = psutil.swap_memory()
            
            mem_status = 'OK'
            if memory.percent > 80:
                mem_status = 'WARNING'
            elif memory.percent > 90:
                mem_status = 'CRITICAL'
            
            self.results['checks']['memory'] = {
                'total': f"{memory.total / (1024**3):.2f} GB",
                'used': f"{memory.used / (1024**3):.2f} GB",
                'available': f"{memory.available / (1024**3):.2f} GB",
                'percent': memory.percent,
                'swap_percent': swap.percent,
                'status': mem_status,
                'threshold': {'warning': 80, 'critical': 90}
            }
            
        except Exception as e:
            self.results['checks']['memory'] = {
                'status': 'ERROR',
                'error': str(e)
            }
    
    def check_disk(self):
        """检查磁盘使用情况"""
        try:
            disk_usage = {}
            
            for partition in psutil.disk_partitions():
                if partition.mountpoint in ['/', '/home', '/var', '/data']:
                    usage = psutil.disk_usage(partition.mountpoint)
                    
                    status = 'OK'
                    if usage.percent > 80:
                        status = 'WARNING'
                    elif usage.percent > 90:
                        status = 'CRITICAL'
                    
                    disk_usage[partition.mountpoint] = {
                        'total': f"{usage.total / (1024**3):.2f} GB",
                        'used': f"{usage.used / (1024**3):.2f} GB",
                        'free': f"{usage.free / (1024**3):.2f} GB",
                        'percent': usage.percent,
                        'status': status,
                        'device': partition.device,
                        'fstype': partition.fstype
                    }
            
            self.results['checks']['disk'] = disk_usage
            
        except Exception as e:
            self.results['checks']['disk'] = {
                'status': 'ERROR',
                'error': str(e)
            }
    
    def check_network(self):
        """检查网络连接情况"""
        try:
            connections = psutil.net_connections(kind='inet')
            conn_summary = {
                'ESTABLISHED': 0,
                'TIME_WAIT': 0,
                'CLOSE_WAIT': 0,
                'LISTEN': 0
            }
            
            for conn in connections:
                if conn.status in conn_summary:
                    conn_summary[conn.status] += 1
            
            # 检查网络接口
            interfaces = {}
            for name, addrs in psutil.net_if_addrs().items():
                interfaces[name] = []
                for addr in addrs:
                    if addr.family == socket.AF_INET:
                        interfaces[name].append(addr.address)
            
            self.results['checks']['network'] = {
                'connections': conn_summary,
                'interfaces': interfaces,
                'status': 'OK'
            }
            
        except Exception as e:
            self.results['checks']['network'] = {
                'status': 'ERROR',
                'error': str(e)
            }
    
    def check_services(self, services=['nginx', 'mysql', 'redis']):
        """检查关键服务状态"""
        service_status = {}
        
        for service in services:
            try:
                result = subprocess.run(
                    ['systemctl', 'is-active', service],
                    capture_output=True,
                    text=True,
                    timeout=5
                )
                
                is_active = result.stdout.strip() == 'active'
                service_status[service] = {
                    'status': 'RUNNING' if is_active else 'STOPPED',
                    'active': is_active
                }
                
            except subprocess.TimeoutExpired:
                service_status[service] = {
                    'status': 'TIMEOUT',
                    'active': False
                }
            except Exception as e:
                service_status[service] = {
                    'status': 'ERROR',
                    'error': str(e),
                    'active': False
                }
        
        self.results['checks']['services'] = service_status
    
    def run_all_checks(self):
        """执行所有检查"""
        self.check_cpu()
        self.check_memory()
        self.check_disk()
        self.check_network()
        self.check_services()
        
        # 计算整体健康状态
        overall_status = 'OK'
        for check_name, check_result in self.results['checks'].items():
            if isinstance(check_result, dict):
                if check_result.get('status') == 'CRITICAL':
                    overall_status = 'CRITICAL'
                    break
                elif check_result.get('status') == 'WARNING' and overall_status != 'CRITICAL':
                    overall_status = 'WARNING'
                elif check_result.get('status') == 'ERROR' and overall_status == 'OK':
                    overall_status = 'ERROR'
        
        self.results['overall_status'] = overall_status
        
        return self.results
    
    def save_results(self, output_path='/tmp/health_check_result.json'):
        """保存检查结果"""
        with open(output_path, 'w') as f:
            json.dump(self.results, f, indent=2, ensure_ascii=False)
        
        return output_path

if __name__ == '__main__':
    checker = ServerHealthChecker()
    results = checker.run_all_checks()
    output_file = checker.save_results()
    
    # 打印结果摘要
    print(json.dumps(results, indent=2, ensure_ascii=False))
    print(f"
检查完成!结果已保存至: {output_file}")

3. Ansible Playbook – 批量健康检查

创建
playbooks/health_check.yml
:


---
- name: 批量服务器健康检查
  hosts: all
  gather_facts: yes
  become: yes
  
  vars:
    script_path: "../scripts/health_check.py"
    result_path: "/tmp/health_check_result.json"
    report_dir: "../reports"
    
  tasks:
    - name: 确保Python3和依赖包已安装
      package:
        name:
          - python3
          - python3-pip
        state: present
      
    - name: 安装Python依赖库
      pip:
        name:
          - psutil
        executable: pip3
        state: present
    
    - name: 复制健康检查脚本到目标主机
      copy:
        src: "{{ script_path }}"
        dest: "/tmp/health_check.py"
        mode: '0755'
    
    - name: 执行健康检查
      command: python3 /tmp/health_check.py
      register: check_output
      changed_when: false
    
    - name: 获取检查结果
      fetch:
        src: "{{ result_path }}"
        dest: "{{ report_dir }}/{{ inventory_hostname }}_health_check.json"
        flat: yes
    
    - name: 分析检查结果
      set_fact:
        health_result: "{{ lookup('file', report_dir + '/' + inventory_hostname + '_health_check.json') | from_json }}"
    
    - name: 显示健康状态摘要
      debug:
        msg: |
          主机: {{ inventory_hostname }}
          状态: {{ health_result.overall_status }}
          CPU使用率: {{ health_result.checks.cpu.usage_percent }}%
          内存使用率: {{ health_result.checks.memory.percent }}%
    
    - name: 触发自动修复(如果需要)
      include_tasks: auto_repair.yml
      when: health_result.overall_status in ['WARNING', 'CRITICAL']

4. 自动修复任务

创建
playbooks/auto_repair.yml
:


---
# 自动修复任务

- name: 内存使用过高时的修复措施
  when: health_result.checks.memory.percent > 85
  block:
    - name: 清理系统缓存
      shell: |
        sync
        echo 1 > /proc/sys/vm/drop_caches
      become: yes
    
    - name: 查找并杀死内存占用过高的进程
      shell: |
        ps aux | sort -k4 -rn | head -5 | awk '{if($4>10) print $2}' | xargs -r kill -9
      become: yes
      ignore_errors: yes
    
    - name: 记录修复动作
      lineinfile:
        path: /var/log/auto_repair.log
        line: "[{{ ansible_date_time.iso8601 }}] 内存清理操作已执行 - 内存使用率: {{ health_result.checks.memory.percent }}%"
        create: yes

- name: 磁盘空间不足时的修复措施
  when: health_result.checks.disk['/'].percent | default(0) > 85
  block:
    - name: 清理旧日志文件
      find:
        paths:
          - /var/log
          - /tmp
        patterns:
          - "*.log"
          - "*.gz"
          - "*.old"
        age: "7d"
        recurse: yes
      register: old_files
    
    - name: 删除旧文件
      file:
        path: "{{ item.path }}"
        state: absent
      loop: "{{ old_files.files }}"
      when: old_files.files | length > 0
    
    - name: 清理包管理器缓存
      shell: |
        if command -v yum &> /dev/null; then
          yum clean all
        elif command -v apt-get &> /dev/null; then
          apt-get clean
          apt-get autoclean
        fi
      become: yes

- name: 服务异常时的修复措施
  block:
    - name: 重启停止的服务
      systemd:
        name: "{{ item.key }}"
        state: restarted
      loop: "{{ health_result.checks.services | dict2items }}"
      when: 
        - item.value.active == false
        - item.key in ['nginx', 'mysql', 'redis']  # 只重启关键服务
      ignore_errors: yes
    
    - name: 记录服务重启
      lineinfile:
        path: /var/log/auto_repair.log
        line: "[{{ ansible_date_time.iso8601 }}] 服务 {{ item.key }} 已重启"
        create: yes
      loop: "{{ health_result.checks.services | dict2items }}"
      when: item.value.active == false

- name: CPU使用率过高时的处理
  when: health_result.checks.cpu.usage_percent > 85
  block:
    - name: 获取CPU占用最高的进程
      shell: |
        ps aux | sort -k3 -rn | head -5 | awk '{print $2, $3, $11}'
      register: top_cpu_processes
    
    - name: 记录高CPU进程信息
      lineinfile:
        path: /var/log/auto_repair.log
        line: "[{{ ansible_date_time.iso8601 }}] 高CPU进程: {{ top_cpu_processes.stdout }}"
        create: yes
    
    - name: 发送告警通知
      debug:
        msg: "CPU使用率过高,需要人工介入检查"

5. 主控制脚本

创建
main_controller.py
:


#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import sys
import json
import yaml
import time
import smtplib
import schedule
import argparse
import subprocess
from pathlib import Path
from datetime import datetime
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from jinja2 import Template

class AutoOpsController:
    """自动化运维主控制器"""
    
    def __init__(self, config_file='config.yml'):
        self.config = self.load_config(config_file)
        self.ansible_path = self.config.get('ansible_path', '/usr/bin/ansible-playbook')
        self.playbook_dir = Path(self.config.get('playbook_dir', './playbooks'))
        self.report_dir = Path(self.config.get('report_dir', './reports'))
        self.report_dir.mkdir(exist_ok=True)
        
    def load_config(self, config_file):
        """加载配置文件"""
        with open(config_file, 'r') as f:
            return yaml.safe_load(f)
    
    def run_ansible_playbook(self, playbook_name, extra_vars=None):
        """执行Ansible Playbook"""
        playbook_path = self.playbook_dir / playbook_name
        
        cmd = [
            self.ansible_path,
            str(playbook_path),
            '-i', 'inventory/hosts.yml'
        ]
        
        if extra_vars:
            cmd.extend(['-e', json.dumps(extra_vars)])
        
        print(f"[{datetime.now()}] 执行Playbook: {playbook_name}")
        
        try:
            result = subprocess.run(
                cmd,
                capture_output=True,
                text=True,
                timeout=300
            )
            
            if result.returncode == 0:
                print(f"[{datetime.now()}] Playbook执行成功")
                return True, result.stdout
            else:
                print(f"[{datetime.now()}] Playbook执行失败")
                return False, result.stderr
                
        except subprocess.TimeoutExpired:
            print(f"[{datetime.now()}] Playbook执行超时")
            return False, "Execution timeout"
        except Exception as e:
            print(f"[{datetime.now()}] 执行出错: {str(e)}")
            return False, str(e)
    
    def collect_results(self):
        """收集所有主机的检查结果"""
        results = {}
        
        for json_file in self.report_dir.glob('*_health_check.json'):
            hostname = json_file.stem.replace('_health_check', '')
            
            with open(json_file, 'r') as f:
                results[hostname] = json.load(f)
        
        return results
    
    def generate_html_report(self, results):
        """生成HTML格式的报告"""
        template_str = '''
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>服务器健康检查报告</title>
    <style>
        body {
            font-family: Arial, sans-serif;
            margin: 20px;
            background-color: #f5f5f5;
        }
        h1 {
            color: #333;
            text-align: center;
        }
        .summary {
            background: white;
            padding: 20px;
            border-radius: 8px;
            margin-bottom: 20px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }
        table {
            width: 100%;
            border-collapse: collapse;
            background: white;
            border-radius: 8px;
            overflow: hidden;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }
        th, td {
            padding: 12px;
            text-align: left;
            border-bottom: 1px solid #ddd;
        }
        th {
            background-color: #4CAF50;
            color: white;
        }
        tr:hover {
            background-color: #f5f5f5;
        }
        .status-ok {
            color: green;
            font-weight: bold;
        }
        .status-warning {
            color: orange;
            font-weight: bold;
        }
        .status-critical {
            color: red;
            font-weight: bold;
        }
        .metric {
            display: inline-block;
            margin: 10px;
            padding: 10px;
            background: #f0f0f0;
            border-radius: 4px;
        }
    </style>
</head>
<body>
    <h1>服务器健康检查报告</h1>
    <div class="summary">
        <h2>检查时间: {{ check_time }}</h2>
        <h3>总览</h3>
        <div>
            <span class="metric">总服务器数: {{ total_servers }}</span>
            <span class="metric">正常: {{ ok_count }}</span>
            <span class="metric">警告: {{ warning_count }}</span>
            <span class="metric">严重: {{ critical_count }}</span>
        </div>
    </div>
    
    <h2>详细信息</h2>
    <table>
        <tr>
            <th>主机名</th>
            <th>状态</th>
            <th>CPU使用率</th>
            <th>内存使用率</th>
            <th>磁盘使用率</th>
            <th>检查时间</th>
        </tr>
        {% for hostname, data in results.items() %}
        <tr>
            <td>{{ hostname }}</td>
            <td class="status-{{ data.overall_status.lower() }}">{{ data.overall_status }}</td>
            <td>{{ data.checks.cpu.usage_percent }}%</td>
            <td>{{ data.checks.memory.percent }}%</td>
            <td>
                {% for mount, disk in data.checks.disk.items() %}
                    {{ mount }}: {{ disk.percent }}%<br>
                {% endfor %}
            </td>
            <td>{{ data.check_time }}</td>
        </tr>
        {% endfor %}
    </table>
    
    <h2>需要关注的问题</h2>
    <ul>
        {% for hostname, data in results.items() %}
            {% if data.overall_status != 'OK' %}
            <li>
                <strong>{{ hostname }}</strong>: 
                {% if data.checks.cpu.usage_percent > 80 %}
                    CPU使用率过高({{ data.checks.cpu.usage_percent }}%)
                {% endif %}
                {% if data.checks.memory.percent > 80 %}
                    内存使用率过高({{ data.checks.memory.percent }}%)
                {% endif %}
            </li>
            {% endif %}
        {% endfor %}
    </ul>
</body>
</html>
        '''
        
        template = Template(template_str)
        
        # 统计信息
        stats = {
            'check_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'total_servers': len(results),
            'ok_count': sum(1 for r in results.values() if r['overall_status'] == 'OK'),
            'warning_count': sum(1 for r in results.values() if r['overall_status'] == 'WARNING'),
            'critical_count': sum(1 for r in results.values() if r['overall_status'] == 'CRITICAL'),
            'results': results
        }
        
        html_content = template.render(**stats)
        
        # 保存报告
        report_file = self.report_dir / f"health_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
        with open(report_file, 'w') as f:
            f.write(html_content)
        
        return report_file, stats
    
    def send_email_alert(self, report_file, stats):
        """发送邮件告警"""
        if not self.config.get('email', {}).get('enabled'):
            return
        
        email_config = self.config['email']
        
        msg = MIMEMultipart()
        msg['From'] = email_config['from']
        msg['To'] = ', '.join(email_config['to'])
        msg['Subject'] = f"服务器健康检查报告 - {stats['check_time']}"
        
        # 邮件正文
        body = f"""
服务器健康检查完成

检查时间: {stats['check_time']}
服务器总数: {stats['total_servers']}
正常: {stats['ok_count']}
警告: {stats['warning_count']}
严重: {stats['critical_count']}

详细报告请查看附件。
        """
        
        msg.attach(MIMEText(body, 'plain'))
        
        # 添加HTML附件
        with open(report_file, 'r') as f:
            attachment = MIMEText(f.read(), 'html')
            attachment.add_header('Content-Disposition', 'attachment', 
                                filename=os.path.basename(report_file))
            msg.attach(attachment)
        
        try:
            # 发送邮件
            server = smtplib.SMTP(email_config['smtp_server'], email_config['smtp_port'])
            server.starttls()
            server.login(email_config['username'], email_config['password'])
            server.send_message(msg)
            server.quit()
            
            print(f"[{datetime.now()}] 邮件告警已发送")
            
        except Exception as e:
            print(f"[{datetime.now()}] 邮件发送失败: {str(e)}")
    
    def run_health_check(self):
        """执行健康检查流程"""
        print(f"[{datetime.now()}] 开始执行健康检查...")
        
        # 执行Ansible Playbook
        success, output = self.run_ansible_playbook('health_check.yml')
        
        if success:
            # 收集结果
            results = self.collect_results()
            
            # 生成报告
            report_file, stats = self.generate_html_report(results)
            print(f"[{datetime.now()}] 报告已生成: {report_file}")
            
            # 发送告警(如果有问题)
            if stats['warning_count'] > 0 or stats['critical_count'] > 0:
                self.send_email_alert(report_file, stats)
            
            return True
        else:
            print(f"[{datetime.now()}] 健康检查执行失败")
            return False
    
    def schedule_tasks(self):
        """设置定时任务"""
        # 每小时执行一次健康检查
        schedule.every(1).hours.do(self.run_health_check)
        
        # 每天凌晨2点执行深度检查
        schedule.every().day.at("02:00").do(
            lambda: self.run_ansible_playbook('deep_check.yml')
        )
        
        print(f"[{datetime.now()}] 定时任务已设置,等待执行...")
        
        while True:
            schedule.run_pending()
            time.sleep(60)

def main():
    parser = argparse.ArgumentParser(description='自动化运维控制器')
    parser.add_argument('--once', action='store_true', help='执行一次检查')
    parser.add_argument('--schedule', action='store_true', help='启动定时任务')
    parser.add_argument('--config', default='config.yml', help='配置文件路径')
    
    args = parser.parse_args()
    
    controller = AutoOpsController(args.config)
    
    if args.once:
        controller.run_health_check()
    elif args.schedule:
        controller.schedule_tasks()
    else:
        print("请指定运行模式: --once 或 --schedule")

if __name__ == '__main__':
    main()

6. 配置文件

创建
config.yml
:


# 自动化运维配置文件

# Ansible配置
ansible_path: /usr/bin/ansible-playbook
playbook_dir: ./playbooks
report_dir: ./reports

# 邮件告警配置
email:
  enabled: true
  smtp_server: smtp.example.com
  smtp_port: 587
  from: ops-alert@example.com
  to:
    - admin@example.com
    - ops-team@example.com
  username: ops-alert@example.com
  password: your_password

# 钉钉告警配置
dingtalk:
  enabled: false
  webhook: https://oapi.dingtalk.com/robot/send?access_token=xxx
  secret: xxx

# 检查阈值配置
thresholds:
  cpu:
    warning: 80
    critical: 90
  memory:
    warning: 80
    critical: 90
  disk:
    warning: 80
    critical: 90

# 自动修复策略
auto_repair:
  enabled: true
  max_retries: 3
  actions:
    high_memory:
      - clear_cache
      - restart_services
    high_disk:
      - clean_logs
      - remove_old_files
    service_down:
      - restart_service
      - send_alert

# 服务监控列表
monitored_services:
  - nginx
  - mysql
  - redis
  - docker
  - kubelet

部署和使用

1. 项目结构


auto-ops-system/
├── inventory/
│   └── hosts.yml
├── playbooks/
│   ├── health_check.yml
│   └── auto_repair.yml
├── scripts/
│   └── health_check.py
├── reports/
├── config.yml
├── main_controller.py
└── requirements.txt

2. 安装依赖


pip3 install -r requirements.txt

3. 配置主机清单

编辑
inventory/hosts.yml
,添加需要管理的服务器信息。

4. 运行系统

单次执行:


python3 main_controller.py --once

启动定时任务:


nohup python3 main_controller.py --schedule > ops.log 2>&1 &

系统优势与收益

采用自动化运维后,能够大幅提高运维效率,减少重复性手动操作,降低人为错误。特别是在故障检测与修复方面,自动化系统能够在监测到异常后自动重启服务、重新部署容器或调整资源。

量化收益

效率提升:日常巡检时间从2小时减少到5分钟故障响应:平均故障发现时间从30分钟缩短到1分钟人力成本:减少70%的重复性运维工作可靠性:服务可用性从99.9%提升到99.99%

后续优化方向

AIOps集成:引入机器学习算法,实现异常检测和趋势预测容器化支持:增强对Kubernetes环境的监控和管理能力多云管理:支持AWS、阿里云等多云平台的统一管理可视化增强:开发Web管理界面,提供实时监控大屏智能化决策:基于历史数据的自动化决策优化

© 版权声明

相关文章

暂无评论

您必须登录才能参与评论!
立即登录
none
暂无评论...