今日练习主题:网络编程
今天我们将学习Python的网络编程,包括socket编程、HTTP客户端和服务端开发,以及常用的网络协议。
练习1:基础Socket编程
import socket
import threading
import time
# TCP服务器
def tcp_server():
"""简单的TCP服务器"""
print("=== TCP服务器 ===")
# 创建socket对象
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 绑定地址和端口
server_address = ('localhost', 8888)
server_socket.bind(server_address)
# 开始监听
server_socket.listen(5)
print(f"服务器启动在 {server_address}")
def handle_client(client_socket, client_address):
"""处理客户端连接"""
print(f"接收到来自 {client_address} 的连接")
try:
while True:
# 接收数据
data = client_socket.recv(1024)
if not data:
break
message = data.decode('utf-8')
print(f"收到来自 {client_address} 的消息: {message}")
# 发送响应
response = f"服务器已收到: {message}"
client_socket.send(response.encode('utf-8'))
except ConnectionResetError:
print(f"客户端 {client_address} 断开连接")
finally:
client_socket.close()
try:
while True:
# 接受客户端连接
client_socket, client_address = server_socket.accept()
# 为每个客户端创建新线程
client_thread = threading.Thread(
target=handle_client,
args=(client_socket, client_address)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("服务器关闭")
finally:
server_socket.close()
# TCP客户端
def tcp_client():
"""简单的TCP客户端"""
print("=== TCP客户端 ===")
# 创建socket对象
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# 连接服务器
server_address = ('localhost', 8888)
client_socket.connect(server_address)
print(f"连接到服务器 {server_address}")
# 发送消息
messages = ["Hello", "World", "Python", "Network"]
for message in messages:
print(f"发送: {message}")
client_socket.send(message.encode('utf-8'))
# 接收响应
response = client_socket.recv(1024)
print(f"收到: {response.decode('utf-8')}")
time.sleep(1)
except ConnectionRefusedError:
print("无法连接到服务器")
except Exception as e:
print(f"客户端错误: {e}")
finally:
client_socket.close()
# 运行TCP服务器和客户端
def run_tcp_example():
"""运行TCP示例"""
# 在后台启动服务器
server_thread = threading.Thread(target=tcp_server, daemon=True)
server_thread.start()
# 给服务器时间启动
time.sleep(1)
# 运行客户端
tcp_client()
# run_tcp_example() # 撤销注释运行
print("
" + "="*50 + "
")
# UDP通信
def udp_example():
"""UDP通信示例"""
print("=== UDP通信 ===")
def udp_server():
"""UDP服务器"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind(('localhost', 9999))
print("UDP服务器启动")
try:
while True:
data, client_address = server_socket.recvfrom(1024)
message = data.decode('utf-8')
print(f"收到来自 {client_address} 的UDP消息: {message}")
response = f"UDP响应: {message}"
server_socket.sendto(response.encode('utf-8'), client_address)
except KeyboardInterrupt:
print("UDP服务器关闭")
finally:
server_socket.close()
def udp_client():
"""UDP客户端"""
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
server_address = ('localhost', 9999)
messages = ["UDP测试1", "UDP测试2", "UDP测试3"]
for message in messages:
print(f"发送UDP: {message}")
client_socket.sendto(message.encode('utf-8'), server_address)
# 接收响应
response, _ = client_socket.recvfrom(1024)
print(f"收到UDP响应: {response.decode('utf-8')}")
time.sleep(1)
except Exception as e:
print(f"UDP客户端错误: {e}")
finally:
client_socket.close()
# 运行UDP示例
server_thread = threading.Thread(target=udp_server, daemon=True)
server_thread.start()
time.sleep(1)
udp_client()
# udp_example() # 撤销注释运行
练习2:HTTP客户端
import requests
import json
from urllib.parse import urlencode
def http_client_examples():
"""HTTP客户端示例"""
print("=== HTTP客户端 ===")
# 1. 基本的GET请求
print("1. 基本GET请求:")
response = requests.get('https://httpbin.org/get')
print(f"状态码: {response.status_code}")
print(f"响应头: {dict(response.headers)}")
print(f"响应内容: {response.json()}")
print("
" + "-"*30 + "
")
# 2. 带参数的GET请求
print("2. 带参数GET请求:")
params = {'key1': 'value1', 'key2': 'value2'}
response = requests.get('https://httpbin.org/get', params=params)
print(f"URL: {response.url}")
print(f"参数: {response.json()['args']}")
print("
" + "-"*30 + "
")
# 3. POST请求(表单数据)
print("3. POST表单数据:")
data = {'username': 'testuser', 'password': 'testpass'}
response = requests.post('https://httpbin.org/post', data=data)
print(f"表单数据: {response.json()['form']}")
print("
" + "-"*30 + "
")
# 4. POST请求(JSON数据)
print("4. POST JSON数据:")
json_data = {'name': 'Alice', 'age': 25, 'city': 'Beijing'}
response = requests.post(
'https://httpbin.org/post',
json=json_data,
headers={'Content-Type': 'application/json'}
)
print(f"JSON数据: {response.json()['json']}")
print("
" + "-"*30 + "
")
# 5. 设置请求头
print("5. 自定义请求头:")
headers = {
'User-Agent': 'MyPythonClient/1.0',
'Authorization': 'Bearer token123'
}
response = requests.get('https://httpbin.org/headers', headers=headers)
print(f"请求头: {response.json()['headers']}")
print("
" + "-"*30 + "
")
# 6. 处理超时和错误
print("6. 错误处理:")
try:
response = requests.get('https://httpbin.org/delay/5', timeout=2)
except requests.exceptions.Timeout:
print("请求超时")
except requests.exceptions.RequestException as e:
print(f"请求错误: {e}")
print("
" + "-"*30 + "
")
# 7. 会话保持(Cookies)
print("7. 使用会话:")
with requests.Session() as session:
# 第一次请求设置cookie
response1 = session.get('https://httpbin.org/cookies/set/sessionid/12345')
print(f"第一次请求Cookies: {response1.json()['cookies']}")
# 第二次请求会携带cookie
response2 = session.get('https://httpbin.org/cookies')
print(f"第二次请求Cookies: {response2.json()['cookies']}")
http_client_examples()
练习3:HTTP服务器
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import urllib.parse
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
"""简单的HTTP请求处理器"""
def do_GET(self):
"""处理GET请求"""
parsed_path = urllib.parse.urlparse(self.path)
path = parsed_path.path
if path == '/':
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(b'<h1>欢迎来到Python HTTP服务器</h1>')
elif path == '/api/data':
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {'message': 'Hello World', 'status': 'success'}
self.wfile.write(json.dumps(response).encode())
elif path == '/api/users':
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
users = [
{'id': 1, 'name': 'Alice', 'email': 'alice@example.com'},
{'id': 2, 'name': 'Bob', 'email': 'bob@example.com'},
{'id': 3, 'name': 'Charlie', 'email': 'charlie@example.com'}
]
self.wfile.write(json.dumps(users).encode())
else:
self.send_response(404)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {'error': 'Not Found', 'path': path}
self.wfile.write(json.dumps(response).encode())
def do_POST(self):
"""处理POST请求"""
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length)
try:
data = json.loads(post_data.decode('utf-8'))
except json.JSONDecodeError:
data = {}
self.send_response(201)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {
'message': 'Data received',
'received_data': data,
'method': 'POST'
}
self.wfile.write(json.dumps(response).encode())
def do_PUT(self):
"""处理PUT请求"""
content_length = int(self.headers.get('Content-Length', 0))
put_data = self.rfile.read(content_length)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {
'message': 'Data updated',
'data_length': len(put_data),
'method': 'PUT'
}
self.wfile.write(json.dumps(response).encode())
def do_DELETE(self):
"""处理DELETE请求"""
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {'message': 'Resource deleted', 'method': 'DELETE'}
self.wfile.write(json.dumps(response).encode())
def run_http_server():
"""运行HTTP服务器"""
print("=== 启动HTTP服务器 ===")
server_address = ('localhost', 8000)
httpd = HTTPServer(server_address, SimpleHTTPRequestHandler)
print(f"服务器运行在 http://{server_address[0]}:{server_address[1]}")
print("按 Ctrl+C 停止服务器")
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("
服务器关闭")
finally:
httpd.server_close()
# 在后台启动服务器
def start_server_in_background():
"""在后台启动服务器"""
server_thread = threading.Thread(target=run_http_server, daemon=True)
server_thread.start()
time.sleep(1) # 给服务器时间启动
# start_server_in_background() # 撤销注释运行
# 测试HTTP服务器
def test_http_server():
"""测试HTTP服务器"""
print("=== 测试HTTP服务器 ===")
base_url = 'http://localhost:8000'
try:
# 测试根路径
response = requests.get(f'{base_url}/')
print(f"根路径: {response.status_code} - {response.text[:50]}...")
# 测试API数据
response = requests.get(f'{base_url}/api/data')
print(f"API数据: {response.status_code} - {response.json()}")
# 测试用户API
response = requests.get(f'{base_url}/api/users')
print(f"用户API: {response.status_code} - {response.json()}")
# 测试POST请求
data = {'name': 'John', 'age': 30}
response = requests.post(f'{base_url}/api/data', json=data)
print(f"POST请求: {response.status_code} - {response.json()}")
# 测试404
response = requests.get(f'{base_url}/nonexistent')
print(f"404测试: {response.status_code} - {response.json()}")
except requests.exceptions.ConnectionError:
print("无法连接到服务器,请先启动服务器")
# test_http_server() # 撤销注释测试
练习4:WebSocket编程
import websockets
import asyncio
async def websocket_server():
"""WebSocket服务器"""
print("=== WebSocket服务器 ===")
async def handler(websocket, path):
"""处理WebSocket连接"""
print(f"客户端连接: {websocket.remote_address}")
try:
async for message in websocket:
print(f"收到消息: {message}")
# 回应客户端
response = f"服务器收到: {message}"
await websocket.send(response)
except websockets.exceptions.ConnectionClosed:
print("客户端断开连接")
# 启动服务器
server = await websockets.serve(handler, "localhost", 8765)
print("WebSocket服务器启动在 ws://localhost:8765")
# 保持服务器运行
await server.wait_closed()
async def websocket_client():
"""WebSocket客户端"""
print("=== WebSocket客户端 ===")
try:
async with websockets.connect("ws://localhost:8765") as websocket:
messages = ["Hello", "WebSocket", "Python"]
for message in messages:
# 发送消息
await websocket.send(message)
print(f"发送: {message}")
# 接收响应
response = await websocket.recv()
print(f"收到: {response}")
await asyncio.sleep(1)
except ConnectionRefusedError:
print("无法连接到WebSocket服务器")
async def run_websocket_example():
"""运行WebSocket示例"""
# 启动服务器
server_task = asyncio.create_task(websocket_server())
# 给服务器时间启动
await asyncio.sleep(1)
# 运行客户端
await websocket_client()
# 撤销服务器任务
server_task.cancel()
try:
await server_task
except asyncio.CancelledError:
pass
# asyncio.run(run_websocket_example()) # 撤销注释运行
练习5:网络工具函数
import socket
import subprocess
import platform
def network_utilities():
"""网络工具函数"""
print("=== 网络工具函数 ===")
# 1. 获取本机IP地址
def get_local_ip():
"""获取本机IP地址"""
try:
# 创建一个UDP socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 连接到一个远程地址(不会真正发送数据)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "127.0.0.1"
print(f"本机IP地址: {get_local_ip()}")
# 2. 端口扫描
def port_scan(host, start_port, end_port):
"""简单的端口扫描"""
open_ports = []
for port in range(start_port, end_port + 1):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.5)
result = s.connect_ex((host, port))
if result == 0:
open_ports.append(port)
print(f"端口 {port} 开放")
except socket.error:
pass
return open_ports
print("
端口扫描 localhost:1-100")
open_ports = port_scan('localhost', 1, 100)
print(f"开放的端口: {open_ports}")
# 3. DNS查询
def dns_lookup(hostname):
"""DNS查询"""
try:
ip = socket.gethostbyname(hostname)
print(f"{hostname} -> {ip}")
return ip
except socket.gaierror as e:
print(f"DNS查询失败: {e}")
return None
print("
DNS查询:")
dns_lookup("google.com")
dns_lookup("github.com")
# 4. 网络连通性测试
def ping_host(host):
"""ping主机测试连通性"""
param = "-n" if platform.system().lower() == "windows" else "-c"
command = ["ping", param, "2", host]
try:
result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=5
)
if result.returncode == 0:
print(f"可以ping通 {host}")
return True
else:
print(f"无法ping通 {host}")
return False
except subprocess.TimeoutExpired:
print(f"ping {host} 超时")
return False
print("
网络连通性测试:")
ping_host("8.8.8.8") # Google DNS
ping_host("1.1.1.1") # Cloudflare DNS
network_utilities()
练习6:电子邮件发送
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import os
def email_examples():
"""电子邮件示例"""
print("=== 电子邮件发送 ===")
# 配置(需要替换为真实的邮箱配置)
smtp_config = {
'host': 'smtp.example.com', # 替换为你的SMTP服务器
'port': 587, # 一般587用于TLS
'username': 'your_email@example.com', # 替换为你的邮箱
'password': 'your_password', # 替换为你的密码或应用密码
'use_tls': True
}
def send_simple_email(to_email, subject, body):
"""发送简单文本邮件"""
try:
# 创建邮件
msg = MIMEText(body, 'plain', 'utf-8')
msg['Subject'] = subject
msg['From'] = smtp_config['username']
msg['To'] = to_email
# 连接SMTP服务器
with smtplib.SMTP(smtp_config['host'], smtp_config['port']) as server:
if smtp_config['use_tls']:
server.starttls() # 启用TLS加密
server.login(smtp_config['username'], smtp_config['password'])
server.send_message(msg)
print(f"邮件发送成功到 {to_email}")
return True
except Exception as e:
print(f"邮件发送失败: {e}")
return False
def send_html_email(to_email, subject, html_body):
"""发送HTML邮件"""
try:
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = smtp_config['username']
msg['To'] = to_email
# 添加HTML内容
html_part = MIMEText(html_body, 'html', 'utf-8')
msg.attach(html_part)
with smtplib.SMTP(smtp_config['host'], smtp_config['port']) as server:
if smtp_config['use_tls']:
server.starttls()
server.login(smtp_config['username'], smtp_config['password'])
server.send_message(msg)
print(f"HTML邮件发送成功到 {to_email}")
return True
except Exception as e:
print(f"HTML邮件发送失败: {e}")
return False
def send_email_with_attachment(to_email, subject, body, attachment_path):
"""发送带附件的邮件"""
try:
msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = smtp_config['username']
msg['To'] = to_email
# 添加文本内容
text_part = MIMEText(body, 'plain', 'utf-8')
msg.attach(text_part)
# 添加附件
if os.path.exists(attachment_path):
with open(attachment_path, 'rb') as f:
attachment = MIMEApplication(f.read())
attachment.add_header(
'Content-Disposition',
'attachment',
filename=os.path.basename(attachment_path)
)
msg.attach(attachment)
with smtplib.SMTP(smtp_config['host'], smtp_config['port']) as server:
if smtp_config['use_tls']:
server.starttls()
server.login(smtp_config['username'], smtp_config['password'])
server.send_message(msg)
print(f"带附件邮件发送成功到 {to_email}")
return True
except Exception as e:
print(f"带附件邮件发送失败: {e}")
return False
# 示例使用(需要配置真实的SMTP信息才能运行)
print("注意:需要配置真实的SMTP信息才能运行以下示例")
# 创建测试文件
with open('test_attachment.txt', 'w', encoding='utf-8') as f:
f.write('这是一个测试附件文件')
# 示例调用(撤销注释并配置后运行)
# send_simple_email('recipient@example.com', '测试邮件', '这是一封测试邮件')
# send_html_email('recipient@example.com', 'HTML测试', '<h1>标题</h1><p>这是一封HTML邮件</p>')
# send_email_with_attachment('recipient@example.com', '带附件测试', '请查看附件', 'test_attachment.txt')
# 清理测试文件
if os.path.exists('test_attachment.txt'):
os.remove('test_attachment.txt')
email_examples()
练习7:FTP客户端
from ftplib import FTP
import os
def ftp_examples():
"""FTP客户端示例"""
print("=== FTP客户端 ===")
# FTP配置(需要替换为真实的FTP服务器信息)
ftp_config = {
'host': 'ftp.example.com', # 替换为FTP服务器地址
'username': 'your_username', # 替换为用户名
'password': 'your_password', # 替换为密码
'port': 21 # FTP默认端口
}
def list_ftp_files():
"""列出FTP服务器上的文件"""
try:
with FTP() as ftp:
ftp.connect(ftp_config['host'], ftp_config['port'])
ftp.login(ftp_config['username'], ftp_config['password'])
print("FTP服务器文件列表:")
ftp.retrlines('LIST')
except Exception as e:
print(f"FTP连接失败: {e}")
def upload_file(local_path, remote_path):
"""上传文件到FTP服务器"""
try:
with FTP() as ftp:
ftp.connect(ftp_config['host'], ftp_config['port'])
ftp.login(ftp_config['username'], ftp_config['password'])
with open(local_path, 'rb') as f:
ftp.storbinary(f'STOR {remote_path}', f)
print(f"文件上传成功: {local_path} -> {remote_path}")
except Exception as e:
print(f"文件上传失败: {e}")
def download_file(remote_path, local_path):
"""从FTP服务器下载文件"""
try:
with FTP() as ftp:
ftp.connect(ftp_config['host'], ftp_config['port'])
ftp.login(ftp_config['username'], ftp_config['password'])
with open(local_path, 'wb') as f:
ftp.retrbinary(f'RETR {remote_path}', f.write)
print(f"文件下载成功: {remote_path} -> {local_path}")
except Exception as e:
print(f"文件下载失败: {e}")
# 示例使用(需要配置真实的FTP信息才能运行)
print("注意:需要配置真实的FTP信息才能运行以下示例")
# 创建测试文件
with open('test_upload.txt', 'w', encoding='utf-8') as f:
f.write('这是一个测试上传文件')
# 示例调用(撤销注释并配置后运行)
# list_ftp_files()
# upload_file('test_upload.txt', 'test_upload.txt')
# download_file('test_upload.txt', 'test_download.txt')
# 清理测试文件
for file in ['test_upload.txt', 'test_download.txt']:
if os.path.exists(file):
os.remove(file)
ftp_examples()
今日挑战:
创建一个完整的网络监控工具,可以检查网站状态、端口开放情况和网络连通性。
# 挑战练习:网络监控工具
import requests
import socket
import time
from datetime import datetime
import threading
from concurrent.futures import ThreadPoolExecutor
import json
class NetworkMonitor:
"""网络监控工具"""
def __init__(self):
self.monitoring_tasks = []
self.results = []
self.is_monitoring = False
def add_website_monitor(self, url, interval=60, timeout=10):
"""添加网站监控任务"""
task = {
'type': 'website',
'target': url,
'interval': interval,
'timeout': timeout,
'last_check': None,
'status': 'unknown'
}
self.monitoring_tasks.append(task)
print(f"添加网站监控: {url}, 间隔: {interval}秒")
def add_port_monitor(self, host, port, interval=30, timeout=5):
"""添加端口监控任务"""
task = {
'type': 'port',
'target': f"{host}:{port}",
'host': host,
'port': port,
'interval': interval,
'timeout': timeout,
'last_check': None,
'status': 'unknown'
}
self.monitoring_tasks.append(task)
print(f"添加端口监控: {host}:{port}, 间隔: {interval}秒")
def check_website(self, url, timeout):
"""检查网站状态"""
try:
start_time = time.time()
response = requests.get(url, timeout=timeout, allow_redirects=True)
end_time = time.time()
return {
'status': 'up' if response.status_code == 200 else 'down',
'response_time': round((end_time - start_time) * 1000, 2), # 毫秒
'status_code': response.status_code,
'timestamp': datetime.now().isoformat()
}
except requests.exceptions.RequestException as e:
return {
'status': 'down',
'error': str(e),
'timestamp': datetime.now().isoformat()
}
def check_port(self, host, port, timeout):
"""检查端口状态"""
try:
start_time = time.time()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
end_time = time.time()
return {
'status': 'open' if result == 0 else 'closed',
'response_time': round((end_time - start_time) * 1000, 2),
'timestamp': datetime.now().isoformat()
}
except socket.error as e:
return {
'status': 'error',
'error': str(e),
'timestamp': datetime.now().isoformat()
}
def monitor_task(self, task):
"""监控任务循环"""
while self.is_monitoring:
try:
if task['type'] == 'website':
result = self.check_website(task['target'], task['timeout'])
elif task['type'] == 'port':
result = self.check_port(task['host'], task['port'], task['timeout'])
else:
continue
# 更新任务状态
task['last_check'] = datetime.now().isoformat()
task['status'] = result['status']
# 记录结果
result['target'] = task['target']
result['type'] = task['type']
self.results.append(result)
# 打印状态变化
status_emoji = "✅" if result['status'] in ['up', 'open'] else "❌"
print(f"{status_emoji} {task['target']} - {result['status']} "
f"(响应时间: {result.get('response_time', 'N/A')}ms)")
# 如果有错误,打印错误信息
if 'error' in result:
print(f" 错误: {result['error']}")
except Exception as e:
print(f"监控任务错误: {e}")
# 等待间隔时间
time.sleep(task['interval'])
def start_monitoring(self):
"""开始监控"""
if not self.monitoring_tasks:
print("没有监控任务,请先添加监控任务")
return
print(f"
开始监控 {len(self.monitoring_tasks)} 个目标...")
print("按 Ctrl+C 停止监控
")
self.is_monitoring = True
self.results = []
# 使用线程池执行监控任务
with ThreadPoolExecutor(max_workers=len(self.monitoring_tasks)) as executor:
try:
# 提交所有监控任务
futures = [
executor.submit(self.monitor_task, task)
for task in self.monitoring_tasks
]
# 等待所有任务完成(实际上会一直运行直到中断)
for future in futures:
future.result()
except KeyboardInterrupt:
print("
停止监控...")
finally:
self.is_monitoring = False
executor.shutdown(wait=False)
def generate_report(self):
"""生成监控报告"""
if not self.results:
print("没有监控数据")
return
print("
=== 监控报告 ===")
# 统计信息
total_checks = len(self.results)
up_checks = len([r for r in self.results if r['status'] in ['up', 'open']])
downtime_percentage = ((total_checks - up_checks) / total_checks * 100) if total_checks > 0 else 0
print(f"总检查次数: {total_checks}")
print(f"正常运行次数: {up_checks}")
print(f"故障率: {downtime_percentage:.2f}%")
# 按目标分组
targets = {}
for result in self.results:
target = result['target']
if target not in targets:
targets[target] = []
targets[target].append(result)
# 每个目标的统计
print("
各目标统计:")
for target, results in targets.items():
total = len(results)
up = len([r for r in results if r['status'] in ['up', 'open']])
uptime = (up / total * 100) if total > 0 else 0
# 计算平均响应时间
response_times = [r.get('response_time', 0) for r in results if 'response_time' in r]
avg_response = sum(response_times) / len(response_times) if response_times else 0
status = "✅" if uptime > 95 else "⚠️" if uptime > 80 else "❌"
print(f"{status} {target}: 可用率 {uptime:.1f}%, 平均响应 {avg_response:.2f}ms")
# 保存报告到文件
report = {
'generated_at': datetime.now().isoformat(),
'statistics': {
'total_checks': total_checks,
'up_checks': up_checks,
'downtime_percentage': downtime_percentage
},
'targets': targets
}
with open('network_monitor_report.json', 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"
详细报告已保存到 network_monitor_report.json")
def demo_network_monitor():
"""演示网络监控工具"""
print("=== 网络监控工具演示 ===")
monitor = NetworkMonitor()
# 添加监控任务(使用测试网站和端口)
monitor.add_website_monitor("https://httpbin.org/status/200", interval=10, timeout=5)
monitor.add_website_monitor("https://httpbin.org/status/404", interval=15, timeout=5)
monitor.add_port_monitor("google.com", 80, interval=20, timeout=5)
monitor.add_port_monitor("localhost", 22, interval=25, timeout=3)
# 运行监控一段时间
print("
开始监控(运行30秒后自动停止)...")
# 在后台运行监控
monitor_thread = threading.Thread(target=monitor.start_monitoring)
monitor_thread.daemon = True
monitor_thread.start()
# 运行30秒后停止
time.sleep(30)
monitor.is_monitoring = False
# 等待监控线程结束
monitor_thread.join(timeout=5)
# 生成报告
monitor.generate_report()
# 运行演示
demo_network_monitor()
学习提示:
- Socket编程:TCP是面向连接的可靠协议,UDP是无连接的快速协议
- HTTP客户端:使用requests库简化HTTP请求
- HTTP服务器:可以使用内置的http.server或第三方框架如Flask
- WebSocket:适合实时双向通信的应用
- 电子邮件:使用smtplib和email库发送邮件
- FTP客户端:使用ftplib进行文件传输
- 并发处理:网络操作一般是I/O密集型的,适合使用多线程或异步
- 错误处理:网络操作容易失败,需要妥善处理各种异常
明天我们将学习数据序列化和存储!坚持练习,你的网络编程能力会越来越强!
© 版权声明
文章版权归作者所有,未经允许请勿转载。
相关文章
您必须登录才能参与评论!
立即登录



赞👍
收藏了,感谢分享