Python疯狂练习60天——第十三天

今日练习主题:网络编程

今天我们将学习Python的网络编程,包括socket编程、HTTP客户端和服务端开发,以及常用的网络协议。

练习1:基础Socket编程

import socket
import threading
import time

# TCP服务器
def tcp_server():
    """简单的TCP服务器"""
    print("=== TCP服务器 ===")
    
    # 创建socket对象
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    # 绑定地址和端口
    server_address = ('localhost', 8888)
    server_socket.bind(server_address)
    
    # 开始监听
    server_socket.listen(5)
    print(f"服务器启动在 {server_address}")
    
    def handle_client(client_socket, client_address):
        """处理客户端连接"""
        print(f"接收到来自 {client_address} 的连接")
        
        try:
            while True:
                # 接收数据
                data = client_socket.recv(1024)
                if not data:
                    break
                
                message = data.decode('utf-8')
                print(f"收到来自 {client_address} 的消息: {message}")
                
                # 发送响应
                response = f"服务器已收到: {message}"
                client_socket.send(response.encode('utf-8'))
                
        except ConnectionResetError:
            print(f"客户端 {client_address} 断开连接")
        finally:
            client_socket.close()
    
    try:
        while True:
            # 接受客户端连接
            client_socket, client_address = server_socket.accept()
            
            # 为每个客户端创建新线程
            client_thread = threading.Thread(
                target=handle_client, 
                args=(client_socket, client_address)
            )
            client_thread.daemon = True
            client_thread.start()
            
    except KeyboardInterrupt:
        print("服务器关闭")
    finally:
        server_socket.close()

# TCP客户端
def tcp_client():
    """简单的TCP客户端"""
    print("=== TCP客户端 ===")
    
    # 创建socket对象
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    try:
        # 连接服务器
        server_address = ('localhost', 8888)
        client_socket.connect(server_address)
        print(f"连接到服务器 {server_address}")
        
        # 发送消息
        messages = ["Hello", "World", "Python", "Network"]
        
        for message in messages:
            print(f"发送: {message}")
            client_socket.send(message.encode('utf-8'))
            
            # 接收响应
            response = client_socket.recv(1024)
            print(f"收到: {response.decode('utf-8')}")
            
            time.sleep(1)
            
    except ConnectionRefusedError:
        print("无法连接到服务器")
    except Exception as e:
        print(f"客户端错误: {e}")
    finally:
        client_socket.close()

# 运行TCP服务器和客户端
def run_tcp_example():
    """运行TCP示例"""
    # 在后台启动服务器
    server_thread = threading.Thread(target=tcp_server, daemon=True)
    server_thread.start()
    
    # 给服务器时间启动
    time.sleep(1)
    
    # 运行客户端
    tcp_client()

# run_tcp_example()  # 撤销注释运行

print("
" + "="*50 + "
")

# UDP通信
def udp_example():
    """UDP通信示例"""
    print("=== UDP通信 ===")
    
    def udp_server():
        """UDP服务器"""
        server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        server_socket.bind(('localhost', 9999))
        print("UDP服务器启动")
        
        try:
            while True:
                data, client_address = server_socket.recvfrom(1024)
                message = data.decode('utf-8')
                print(f"收到来自 {client_address} 的UDP消息: {message}")
                
                response = f"UDP响应: {message}"
                server_socket.sendto(response.encode('utf-8'), client_address)
                
        except KeyboardInterrupt:
            print("UDP服务器关闭")
        finally:
            server_socket.close()
    
    def udp_client():
        """UDP客户端"""
        client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        
        try:
            server_address = ('localhost', 9999)
            messages = ["UDP测试1", "UDP测试2", "UDP测试3"]
            
            for message in messages:
                print(f"发送UDP: {message}")
                client_socket.sendto(message.encode('utf-8'), server_address)
                
                # 接收响应
                response, _ = client_socket.recvfrom(1024)
                print(f"收到UDP响应: {response.decode('utf-8')}")
                
                time.sleep(1)
                
        except Exception as e:
            print(f"UDP客户端错误: {e}")
        finally:
            client_socket.close()
    
    # 运行UDP示例
    server_thread = threading.Thread(target=udp_server, daemon=True)
    server_thread.start()
    
    time.sleep(1)
    udp_client()

# udp_example()  # 撤销注释运行

练习2:HTTP客户端

import requests
import json
from urllib.parse import urlencode

def http_client_examples():
    """HTTP客户端示例"""
    print("=== HTTP客户端 ===")
    
    # 1. 基本的GET请求
    print("1. 基本GET请求:")
    response = requests.get('https://httpbin.org/get')
    print(f"状态码: {response.status_code}")
    print(f"响应头: {dict(response.headers)}")
    print(f"响应内容: {response.json()}")
    
    print("
" + "-"*30 + "
")
    
    # 2. 带参数的GET请求
    print("2. 带参数GET请求:")
    params = {'key1': 'value1', 'key2': 'value2'}
    response = requests.get('https://httpbin.org/get', params=params)
    print(f"URL: {response.url}")
    print(f"参数: {response.json()['args']}")
    
    print("
" + "-"*30 + "
")
    
    # 3. POST请求(表单数据)
    print("3. POST表单数据:")
    data = {'username': 'testuser', 'password': 'testpass'}
    response = requests.post('https://httpbin.org/post', data=data)
    print(f"表单数据: {response.json()['form']}")
    
    print("
" + "-"*30 + "
")
    
    # 4. POST请求(JSON数据)
    print("4. POST JSON数据:")
    json_data = {'name': 'Alice', 'age': 25, 'city': 'Beijing'}
    response = requests.post(
        'https://httpbin.org/post', 
        json=json_data,
        headers={'Content-Type': 'application/json'}
    )
    print(f"JSON数据: {response.json()['json']}")
    
    print("
" + "-"*30 + "
")
    
    # 5. 设置请求头
    print("5. 自定义请求头:")
    headers = {
        'User-Agent': 'MyPythonClient/1.0',
        'Authorization': 'Bearer token123'
    }
    response = requests.get('https://httpbin.org/headers', headers=headers)
    print(f"请求头: {response.json()['headers']}")
    
    print("
" + "-"*30 + "
")
    
    # 6. 处理超时和错误
    print("6. 错误处理:")
    try:
        response = requests.get('https://httpbin.org/delay/5', timeout=2)
    except requests.exceptions.Timeout:
        print("请求超时")
    except requests.exceptions.RequestException as e:
        print(f"请求错误: {e}")
    
    print("
" + "-"*30 + "
")
    
    # 7. 会话保持(Cookies)
    print("7. 使用会话:")
    with requests.Session() as session:
        # 第一次请求设置cookie
        response1 = session.get('https://httpbin.org/cookies/set/sessionid/12345')
        print(f"第一次请求Cookies: {response1.json()['cookies']}")
        
        # 第二次请求会携带cookie
        response2 = session.get('https://httpbin.org/cookies')
        print(f"第二次请求Cookies: {response2.json()['cookies']}")

http_client_examples()

练习3:HTTP服务器

from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import urllib.parse

class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
    """简单的HTTP请求处理器"""
    
    def do_GET(self):
        """处理GET请求"""
        parsed_path = urllib.parse.urlparse(self.path)
        path = parsed_path.path
        
        if path == '/':
            self.send_response(200)
            self.send_header('Content-type', 'text/html')
            self.end_headers()
            self.wfile.write(b'<h1>欢迎来到Python HTTP服务器</h1>')
        
        elif path == '/api/data':
            self.send_response(200)
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            response = {'message': 'Hello World', 'status': 'success'}
            self.wfile.write(json.dumps(response).encode())
        
        elif path == '/api/users':
            self.send_response(200)
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            users = [
                {'id': 1, 'name': 'Alice', 'email': 'alice@example.com'},
                {'id': 2, 'name': 'Bob', 'email': 'bob@example.com'},
                {'id': 3, 'name': 'Charlie', 'email': 'charlie@example.com'}
            ]
            self.wfile.write(json.dumps(users).encode())
        
        else:
            self.send_response(404)
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            response = {'error': 'Not Found', 'path': path}
            self.wfile.write(json.dumps(response).encode())
    
    def do_POST(self):
        """处理POST请求"""
        content_length = int(self.headers.get('Content-Length', 0))
        post_data = self.rfile.read(content_length)
        
        try:
            data = json.loads(post_data.decode('utf-8'))
        except json.JSONDecodeError:
            data = {}
        
        self.send_response(201)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        
        response = {
            'message': 'Data received',
            'received_data': data,
            'method': 'POST'
        }
        self.wfile.write(json.dumps(response).encode())
    
    def do_PUT(self):
        """处理PUT请求"""
        content_length = int(self.headers.get('Content-Length', 0))
        put_data = self.rfile.read(content_length)
        
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        
        response = {
            'message': 'Data updated',
            'data_length': len(put_data),
            'method': 'PUT'
        }
        self.wfile.write(json.dumps(response).encode())
    
    def do_DELETE(self):
        """处理DELETE请求"""
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        
        response = {'message': 'Resource deleted', 'method': 'DELETE'}
        self.wfile.write(json.dumps(response).encode())

def run_http_server():
    """运行HTTP服务器"""
    print("=== 启动HTTP服务器 ===")
    server_address = ('localhost', 8000)
    httpd = HTTPServer(server_address, SimpleHTTPRequestHandler)
    
    print(f"服务器运行在 http://{server_address[0]}:{server_address[1]}")
    print("按 Ctrl+C 停止服务器")
    
    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        print("
服务器关闭")
    finally:
        httpd.server_close()

# 在后台启动服务器
def start_server_in_background():
    """在后台启动服务器"""
    server_thread = threading.Thread(target=run_http_server, daemon=True)
    server_thread.start()
    time.sleep(1)  # 给服务器时间启动

# start_server_in_background()  # 撤销注释运行

# 测试HTTP服务器
def test_http_server():
    """测试HTTP服务器"""
    print("=== 测试HTTP服务器 ===")
    
    base_url = 'http://localhost:8000'
    
    try:
        # 测试根路径
        response = requests.get(f'{base_url}/')
        print(f"根路径: {response.status_code} - {response.text[:50]}...")
        
        # 测试API数据
        response = requests.get(f'{base_url}/api/data')
        print(f"API数据: {response.status_code} - {response.json()}")
        
        # 测试用户API
        response = requests.get(f'{base_url}/api/users')
        print(f"用户API: {response.status_code} - {response.json()}")
        
        # 测试POST请求
        data = {'name': 'John', 'age': 30}
        response = requests.post(f'{base_url}/api/data', json=data)
        print(f"POST请求: {response.status_code} - {response.json()}")
        
        # 测试404
        response = requests.get(f'{base_url}/nonexistent')
        print(f"404测试: {response.status_code} - {response.json()}")
        
    except requests.exceptions.ConnectionError:
        print("无法连接到服务器,请先启动服务器")

# test_http_server()  # 撤销注释测试

练习4:WebSocket编程

import websockets
import asyncio

async def websocket_server():
    """WebSocket服务器"""
    print("=== WebSocket服务器 ===")
    
    async def handler(websocket, path):
        """处理WebSocket连接"""
        print(f"客户端连接: {websocket.remote_address}")
        
        try:
            async for message in websocket:
                print(f"收到消息: {message}")
                
                # 回应客户端
                response = f"服务器收到: {message}"
                await websocket.send(response)
                
        except websockets.exceptions.ConnectionClosed:
            print("客户端断开连接")
    
    # 启动服务器
    server = await websockets.serve(handler, "localhost", 8765)
    print("WebSocket服务器启动在 ws://localhost:8765")
    
    # 保持服务器运行
    await server.wait_closed()

async def websocket_client():
    """WebSocket客户端"""
    print("=== WebSocket客户端 ===")
    
    try:
        async with websockets.connect("ws://localhost:8765") as websocket:
            messages = ["Hello", "WebSocket", "Python"]
            
            for message in messages:
                # 发送消息
                await websocket.send(message)
                print(f"发送: {message}")
                
                # 接收响应
                response = await websocket.recv()
                print(f"收到: {response}")
                
                await asyncio.sleep(1)
                
    except ConnectionRefusedError:
        print("无法连接到WebSocket服务器")

async def run_websocket_example():
    """运行WebSocket示例"""
    # 启动服务器
    server_task = asyncio.create_task(websocket_server())
    
    # 给服务器时间启动
    await asyncio.sleep(1)
    
    # 运行客户端
    await websocket_client()
    
    # 撤销服务器任务
    server_task.cancel()
    try:
        await server_task
    except asyncio.CancelledError:
        pass

# asyncio.run(run_websocket_example())  # 撤销注释运行

练习5:网络工具函数

import socket
import subprocess
import platform

def network_utilities():
    """网络工具函数"""
    print("=== 网络工具函数 ===")
    
    # 1. 获取本机IP地址
    def get_local_ip():
        """获取本机IP地址"""
        try:
            # 创建一个UDP socket
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            # 连接到一个远程地址(不会真正发送数据)
            s.connect(("8.8.8.8", 80))
            ip = s.getsockname()[0]
            s.close()
            return ip
        except Exception:
            return "127.0.0.1"
    
    print(f"本机IP地址: {get_local_ip()}")
    
    # 2. 端口扫描
    def port_scan(host, start_port, end_port):
        """简单的端口扫描"""
        open_ports = []
        for port in range(start_port, end_port + 1):
            try:
                with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                    s.settimeout(0.5)
                    result = s.connect_ex((host, port))
                    if result == 0:
                        open_ports.append(port)
                        print(f"端口 {port} 开放")
            except socket.error:
                pass
        return open_ports
    
    print("
端口扫描 localhost:1-100")
    open_ports = port_scan('localhost', 1, 100)
    print(f"开放的端口: {open_ports}")
    
    # 3. DNS查询
    def dns_lookup(hostname):
        """DNS查询"""
        try:
            ip = socket.gethostbyname(hostname)
            print(f"{hostname} -> {ip}")
            return ip
        except socket.gaierror as e:
            print(f"DNS查询失败: {e}")
            return None
    
    print("
DNS查询:")
    dns_lookup("google.com")
    dns_lookup("github.com")
    
    # 4. 网络连通性测试
    def ping_host(host):
        """ping主机测试连通性"""
        param = "-n" if platform.system().lower() == "windows" else "-c"
        command = ["ping", param, "2", host]
        
        try:
            result = subprocess.run(
                command, 
                stdout=subprocess.PIPE, 
                stderr=subprocess.PIPE,
                text=True,
                timeout=5
            )
            if result.returncode == 0:
                print(f"可以ping通 {host}")
                return True
            else:
                print(f"无法ping通 {host}")
                return False
        except subprocess.TimeoutExpired:
            print(f"ping {host} 超时")
            return False
    
    print("
网络连通性测试:")
    ping_host("8.8.8.8")  # Google DNS
    ping_host("1.1.1.1")  # Cloudflare DNS

network_utilities()

练习6:电子邮件发送

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import os

def email_examples():
    """电子邮件示例"""
    print("=== 电子邮件发送 ===")
    
    # 配置(需要替换为真实的邮箱配置)
    smtp_config = {
        'host': 'smtp.example.com',      # 替换为你的SMTP服务器
        'port': 587,                     # 一般587用于TLS
        'username': 'your_email@example.com',  # 替换为你的邮箱
        'password': 'your_password',     # 替换为你的密码或应用密码
        'use_tls': True
    }
    
    def send_simple_email(to_email, subject, body):
        """发送简单文本邮件"""
        try:
            # 创建邮件
            msg = MIMEText(body, 'plain', 'utf-8')
            msg['Subject'] = subject
            msg['From'] = smtp_config['username']
            msg['To'] = to_email
            
            # 连接SMTP服务器
            with smtplib.SMTP(smtp_config['host'], smtp_config['port']) as server:
                if smtp_config['use_tls']:
                    server.starttls()  # 启用TLS加密
                
                server.login(smtp_config['username'], smtp_config['password'])
                server.send_message(msg)
            
            print(f"邮件发送成功到 {to_email}")
            return True
            
        except Exception as e:
            print(f"邮件发送失败: {e}")
            return False
    
    def send_html_email(to_email, subject, html_body):
        """发送HTML邮件"""
        try:
            msg = MIMEMultipart('alternative')
            msg['Subject'] = subject
            msg['From'] = smtp_config['username']
            msg['To'] = to_email
            
            # 添加HTML内容
            html_part = MIMEText(html_body, 'html', 'utf-8')
            msg.attach(html_part)
            
            with smtplib.SMTP(smtp_config['host'], smtp_config['port']) as server:
                if smtp_config['use_tls']:
                    server.starttls()
                server.login(smtp_config['username'], smtp_config['password'])
                server.send_message(msg)
            
            print(f"HTML邮件发送成功到 {to_email}")
            return True
            
        except Exception as e:
            print(f"HTML邮件发送失败: {e}")
            return False
    
    def send_email_with_attachment(to_email, subject, body, attachment_path):
        """发送带附件的邮件"""
        try:
            msg = MIMEMultipart()
            msg['Subject'] = subject
            msg['From'] = smtp_config['username']
            msg['To'] = to_email
            
            # 添加文本内容
            text_part = MIMEText(body, 'plain', 'utf-8')
            msg.attach(text_part)
            
            # 添加附件
            if os.path.exists(attachment_path):
                with open(attachment_path, 'rb') as f:
                    attachment = MIMEApplication(f.read())
                    attachment.add_header(
                        'Content-Disposition',
                        'attachment',
                        filename=os.path.basename(attachment_path)
                    )
                    msg.attach(attachment)
            
            with smtplib.SMTP(smtp_config['host'], smtp_config['port']) as server:
                if smtp_config['use_tls']:
                    server.starttls()
                server.login(smtp_config['username'], smtp_config['password'])
                server.send_message(msg)
            
            print(f"带附件邮件发送成功到 {to_email}")
            return True
            
        except Exception as e:
            print(f"带附件邮件发送失败: {e}")
            return False
    
    # 示例使用(需要配置真实的SMTP信息才能运行)
    print("注意:需要配置真实的SMTP信息才能运行以下示例")
    
    # 创建测试文件
    with open('test_attachment.txt', 'w', encoding='utf-8') as f:
        f.write('这是一个测试附件文件')
    
    # 示例调用(撤销注释并配置后运行)
    # send_simple_email('recipient@example.com', '测试邮件', '这是一封测试邮件')
    # send_html_email('recipient@example.com', 'HTML测试', '<h1>标题</h1><p>这是一封HTML邮件</p>')
    # send_email_with_attachment('recipient@example.com', '带附件测试', '请查看附件', 'test_attachment.txt')
    
    # 清理测试文件
    if os.path.exists('test_attachment.txt'):
        os.remove('test_attachment.txt')

email_examples()

练习7:FTP客户端

from ftplib import FTP
import os

def ftp_examples():
    """FTP客户端示例"""
    print("=== FTP客户端 ===")
    
    # FTP配置(需要替换为真实的FTP服务器信息)
    ftp_config = {
        'host': 'ftp.example.com',      # 替换为FTP服务器地址
        'username': 'your_username',    # 替换为用户名
        'password': 'your_password',    # 替换为密码
        'port': 21                      # FTP默认端口
    }
    
    def list_ftp_files():
        """列出FTP服务器上的文件"""
        try:
            with FTP() as ftp:
                ftp.connect(ftp_config['host'], ftp_config['port'])
                ftp.login(ftp_config['username'], ftp_config['password'])
                
                print("FTP服务器文件列表:")
                ftp.retrlines('LIST')
                
        except Exception as e:
            print(f"FTP连接失败: {e}")
    
    def upload_file(local_path, remote_path):
        """上传文件到FTP服务器"""
        try:
            with FTP() as ftp:
                ftp.connect(ftp_config['host'], ftp_config['port'])
                ftp.login(ftp_config['username'], ftp_config['password'])
                
                with open(local_path, 'rb') as f:
                    ftp.storbinary(f'STOR {remote_path}', f)
                
                print(f"文件上传成功: {local_path} -> {remote_path}")
                
        except Exception as e:
            print(f"文件上传失败: {e}")
    
    def download_file(remote_path, local_path):
        """从FTP服务器下载文件"""
        try:
            with FTP() as ftp:
                ftp.connect(ftp_config['host'], ftp_config['port'])
                ftp.login(ftp_config['username'], ftp_config['password'])
                
                with open(local_path, 'wb') as f:
                    ftp.retrbinary(f'RETR {remote_path}', f.write)
                
                print(f"文件下载成功: {remote_path} -> {local_path}")
                
        except Exception as e:
            print(f"文件下载失败: {e}")
    
    # 示例使用(需要配置真实的FTP信息才能运行)
    print("注意:需要配置真实的FTP信息才能运行以下示例")
    
    # 创建测试文件
    with open('test_upload.txt', 'w', encoding='utf-8') as f:
        f.write('这是一个测试上传文件')
    
    # 示例调用(撤销注释并配置后运行)
    # list_ftp_files()
    # upload_file('test_upload.txt', 'test_upload.txt')
    # download_file('test_upload.txt', 'test_download.txt')
    
    # 清理测试文件
    for file in ['test_upload.txt', 'test_download.txt']:
        if os.path.exists(file):
            os.remove(file)

ftp_examples()

今日挑战:

创建一个完整的网络监控工具,可以检查网站状态、端口开放情况和网络连通性。

# 挑战练习:网络监控工具
import requests
import socket
import time
from datetime import datetime
import threading
from concurrent.futures import ThreadPoolExecutor
import json

class NetworkMonitor:
    """网络监控工具"""
    
    def __init__(self):
        self.monitoring_tasks = []
        self.results = []
        self.is_monitoring = False
    
    def add_website_monitor(self, url, interval=60, timeout=10):
        """添加网站监控任务"""
        task = {
            'type': 'website',
            'target': url,
            'interval': interval,
            'timeout': timeout,
            'last_check': None,
            'status': 'unknown'
        }
        self.monitoring_tasks.append(task)
        print(f"添加网站监控: {url}, 间隔: {interval}秒")
    
    def add_port_monitor(self, host, port, interval=30, timeout=5):
        """添加端口监控任务"""
        task = {
            'type': 'port',
            'target': f"{host}:{port}",
            'host': host,
            'port': port,
            'interval': interval,
            'timeout': timeout,
            'last_check': None,
            'status': 'unknown'
        }
        self.monitoring_tasks.append(task)
        print(f"添加端口监控: {host}:{port}, 间隔: {interval}秒")
    
    def check_website(self, url, timeout):
        """检查网站状态"""
        try:
            start_time = time.time()
            response = requests.get(url, timeout=timeout, allow_redirects=True)
            end_time = time.time()
            
            return {
                'status': 'up' if response.status_code == 200 else 'down',
                'response_time': round((end_time - start_time) * 1000, 2),  # 毫秒
                'status_code': response.status_code,
                'timestamp': datetime.now().isoformat()
            }
        except requests.exceptions.RequestException as e:
            return {
                'status': 'down',
                'error': str(e),
                'timestamp': datetime.now().isoformat()
            }
    
    def check_port(self, host, port, timeout):
        """检查端口状态"""
        try:
            start_time = time.time()
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
                sock.settimeout(timeout)
                result = sock.connect_ex((host, port))
                end_time = time.time()
                
                return {
                    'status': 'open' if result == 0 else 'closed',
                    'response_time': round((end_time - start_time) * 1000, 2),
                    'timestamp': datetime.now().isoformat()
                }
        except socket.error as e:
            return {
                'status': 'error',
                'error': str(e),
                'timestamp': datetime.now().isoformat()
            }
    
    def monitor_task(self, task):
        """监控任务循环"""
        while self.is_monitoring:
            try:
                if task['type'] == 'website':
                    result = self.check_website(task['target'], task['timeout'])
                elif task['type'] == 'port':
                    result = self.check_port(task['host'], task['port'], task['timeout'])
                else:
                    continue
                
                # 更新任务状态
                task['last_check'] = datetime.now().isoformat()
                task['status'] = result['status']
                
                # 记录结果
                result['target'] = task['target']
                result['type'] = task['type']
                self.results.append(result)
                
                # 打印状态变化
                status_emoji = "✅" if result['status'] in ['up', 'open'] else "❌"
                print(f"{status_emoji} {task['target']} - {result['status']} "
                      f"(响应时间: {result.get('response_time', 'N/A')}ms)")
                
                # 如果有错误,打印错误信息
                if 'error' in result:
                    print(f"   错误: {result['error']}")
                
            except Exception as e:
                print(f"监控任务错误: {e}")
            
            # 等待间隔时间
            time.sleep(task['interval'])
    
    def start_monitoring(self):
        """开始监控"""
        if not self.monitoring_tasks:
            print("没有监控任务,请先添加监控任务")
            return
        
        print(f"
开始监控 {len(self.monitoring_tasks)} 个目标...")
        print("按 Ctrl+C 停止监控
")
        
        self.is_monitoring = True
        self.results = []
        
        # 使用线程池执行监控任务
        with ThreadPoolExecutor(max_workers=len(self.monitoring_tasks)) as executor:
            try:
                # 提交所有监控任务
                futures = [
                    executor.submit(self.monitor_task, task)
                    for task in self.monitoring_tasks
                ]
                
                # 等待所有任务完成(实际上会一直运行直到中断)
                for future in futures:
                    future.result()
                    
            except KeyboardInterrupt:
                print("
停止监控...")
            finally:
                self.is_monitoring = False
                executor.shutdown(wait=False)
    
    def generate_report(self):
        """生成监控报告"""
        if not self.results:
            print("没有监控数据")
            return
        
        print("
=== 监控报告 ===")
        
        # 统计信息
        total_checks = len(self.results)
        up_checks = len([r for r in self.results if r['status'] in ['up', 'open']])
        downtime_percentage = ((total_checks - up_checks) / total_checks * 100) if total_checks > 0 else 0
        
        print(f"总检查次数: {total_checks}")
        print(f"正常运行次数: {up_checks}")
        print(f"故障率: {downtime_percentage:.2f}%")
        
        # 按目标分组
        targets = {}
        for result in self.results:
            target = result['target']
            if target not in targets:
                targets[target] = []
            targets[target].append(result)
        
        # 每个目标的统计
        print("
各目标统计:")
        for target, results in targets.items():
            total = len(results)
            up = len([r for r in results if r['status'] in ['up', 'open']])
            uptime = (up / total * 100) if total > 0 else 0
            
            # 计算平均响应时间
            response_times = [r.get('response_time', 0) for r in results if 'response_time' in r]
            avg_response = sum(response_times) / len(response_times) if response_times else 0
            
            status = "✅" if uptime > 95 else "⚠️" if uptime > 80 else "❌"
            print(f"{status} {target}: 可用率 {uptime:.1f}%, 平均响应 {avg_response:.2f}ms")
        
        # 保存报告到文件
        report = {
            'generated_at': datetime.now().isoformat(),
            'statistics': {
                'total_checks': total_checks,
                'up_checks': up_checks,
                'downtime_percentage': downtime_percentage
            },
            'targets': targets
        }
        
        with open('network_monitor_report.json', 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2, ensure_ascii=False)
        
        print(f"
详细报告已保存到 network_monitor_report.json")

def demo_network_monitor():
    """演示网络监控工具"""
    print("=== 网络监控工具演示 ===")
    
    monitor = NetworkMonitor()
    
    # 添加监控任务(使用测试网站和端口)
    monitor.add_website_monitor("https://httpbin.org/status/200", interval=10, timeout=5)
    monitor.add_website_monitor("https://httpbin.org/status/404", interval=15, timeout=5)
    monitor.add_port_monitor("google.com", 80, interval=20, timeout=5)
    monitor.add_port_monitor("localhost", 22, interval=25, timeout=3)
    
    # 运行监控一段时间
    print("
开始监控(运行30秒后自动停止)...")
    
    # 在后台运行监控
    monitor_thread = threading.Thread(target=monitor.start_monitoring)
    monitor_thread.daemon = True
    monitor_thread.start()
    
    # 运行30秒后停止
    time.sleep(30)
    monitor.is_monitoring = False
    
    # 等待监控线程结束
    monitor_thread.join(timeout=5)
    
    # 生成报告
    monitor.generate_report()

# 运行演示
demo_network_monitor()

学习提示:

  1. Socket编程:TCP是面向连接的可靠协议,UDP是无连接的快速协议
  2. HTTP客户端:使用requests库简化HTTP请求
  3. HTTP服务器:可以使用内置的http.server或第三方框架如Flask
  4. WebSocket:适合实时双向通信的应用
  5. 电子邮件:使用smtplib和email库发送邮件
  6. FTP客户端:使用ftplib进行文件传输
  7. 并发处理:网络操作一般是I/O密集型的,适合使用多线程或异步
  8. 错误处理:网络操作容易失败,需要妥善处理各种异常

明天我们将学习数据序列化和存储!坚持练习,你的网络编程能力会越来越强!

© 版权声明

相关文章

2 条评论

您必须登录才能参与评论!
立即登录