Python 的网络与互联网访问模块及应用实例(二)

Python 的网络与互联网访问能力超级强劲,无论是进行底层的套接字编程,还是利用高级库进行高效的网络通信和数据传输,都能找到合适的工具。下面介绍一些关键的模块、应用实例,并探讨如何实现高并发处理。

一、核心网络模块与应用

Python 的网络编程能力构建在其丰富的标准库和第三方库之上。

底层网络接口:socket 模块

socket 模块是 Python 进行网络编程的基石,它提供了标准的 BSD Socket API,允许你进行 TCP 或 UDP 协议的通信。

TCP 服务器与客户端示例:

TCP 提供可靠的、面向连接的通信。

TCP 服务器:监听特定端口,等待客户端连接并处理请求。

import socket

def start_server(host=’127.0.0.1′, port=65432):

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:

s.bind((host, port))

s.listen()

print(“Server is listening…”)

conn, addr = s.accept()

with conn:

print(‘Connected by’, addr)

while True:

data = conn.recv(1024)

if not data:

break

conn.sendall(data)

if name == “main”:

start_server()

TCP 客户端:连接到服务器并发送/接收数据。

import socket

def connect_to_server(host=’127.0.0.1′, port=65432):

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:

s.connect((host, port))

s.sendall(b’Hello, world’)

data = s.recv(1024)

print(‘Received’, repr(data))

if name == “main”:

connect_to_server()

UDP 服务器与客户端示例:

UDP 是无连接的协议,传输速度快,但不保证可靠性,适用于视频流、在线游戏等场景。

UDP 服务器:

import socket

def start_udp_server(host=’127.0.0.1′, port=65432):

with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:

s.bind((host, port))

while True:

data, addr = s.recvfrom(1024)

print(“Received from:”, addr)

s.sendto(data, addr)

if name == “main”:

start_udp_server()

UDP 客户端:

import socket

def connect_to_udp_server(host=’127.0.0.1′, port=65432):

with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:

s.sendto(b’Hello, UDP server!’, (host, port))

data, addr = s.recvfrom(1024)

print(‘Received’, repr(data), ‘from’, addr)

if name == “main”:

connect_to_udp_server()

高级 HTTP 客户端:requests 库

虽然 http.client 是标准库,但第三方 requests 库因其人性化的 API 而更受欢迎。它简化了 HTTP 请求的处理,支持 GET、POST 等各种方法,并能轻松处理响应、Cookies、会话等。

import requests

def fetch_webpage(url=”http://example.com “):

response = requests.get(url)

return response.text

if name == “main”:

print(fetch_webpage())

网络运维自动化

Python 在网络运维自动化中扮演着重大角色,例如配置管理、流量分析和监控。

使用 paramiko 进行网络设备配置(如 Cisco 路由器):

import paramiko

host = ‘192.168.1.100’

port = 22

username = ‘admin’

password = ‘cisco123’

ssh = paramiko.SSHClient()

ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

ssh.connect(host, port, username, password)

stdin, stdout, stderr = ssh.exec_command(‘enable’)

stdin, stdout, stderr = ssh.exec_command(‘configure terminal’)

stdin, stdout, stderr = ssh.exec_command(‘interface fa0/0’)

stdin, stdout, stderr = ssh.exec_command(‘ip address 192.168.1.100 255.255.255.0’)

stdin, stdout, stderr = ssh.exec_command(‘exit’)

ssh.close()

使用 scapy 进行网络流量分析:

from scapy.all import sniff

interface = ‘eth0’

duration = 30

captured_packets = sniff(iface=interface, timeout=duration)

for packet in captured_packets:

if packet.haslayer(‘TCP’):

print(‘TCP packet:’, packet.summary())

elif packet.haslayer(‘UDP’):

print(‘UDP packet:’, packet.summary())

其他实用网络工具脚本

网络服务状态检查:监控服务是否在线。

import socket

def check_service_status(host, port, timeout=5):

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:

s.settimeout(timeout)

try:

s.connect((host, port))

return “Service is up.”

except socket.error as e:

return f”Service is down: {e}”

print(check_service_status(‘example.com’, 80))

端口扫描器:发现主机上开放的端口。

import socket

def port_scanner(host, start_port, end_port):

open_ports = []

for port in range(start_port, end_port + 1):

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:

s.settimeout(1)

if s.connect_ex((host, port)) == 0:

open_ports.append(port)

return open_ports

print(port_scanner(‘example.com’, 1, 1024))

简单的 HTTP 服务器:用于快速测试或本地文件共享。

from http.server import SimpleHTTPRequestHandler, HTTPServer

def run_server(port=8000):

server_address = (”, port)

httpd = HTTPServer(server_address, SimpleHTTPRequestHandler)

print(f”Server started on port {port}”)

httpd.serve_forever()

run_server()

网络聊天室(多线程):允许多个客户端连接并广播消息。

import socket

import threading

clients = []

def client_thread(conn, addr):

print(f”Connected by {addr}”)

while True:

try:

message = conn.recv(1024).decode()

if not message or message == ‘!quit’:

break

print(f”{addr}: {message}”)

broadcast(message, conn)

except:

break

conn.close()

remove_client(conn)

def broadcast(message, connection):

for client in clients:

if client != connection:

try:

client.send(message.encode())

except:

client.close()

remove_client(client)

def remove_client(connection):

if connection in clients:

clients.remove(connection)

def main():

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

server_socket.bind((‘0.0.0.0’, 12345))

server_socket.listen()

print(“Chat server started…”)

while True:

conn, addr = server_socket.accept()

clients.append(conn)

threading.Thread(target=client_thread, args=(conn, addr)).start()

if name == “main”:

main()

二、实现高并发网络应用

当需要处理大量并发连接时,传统的同步阻塞模型(如一个连接一个线程)会成为性能瓶颈。Python 提供了多种高并发处理方案。

异步编程与 asyncio 库

异步编程是一种编程范式,它允许程序在等待 I/O 操作(如网络请求、文件读写)完成时,去执行其他任务,而不是干等着,从而极大提高程序的并发能力和资源利用率。asyncio 是 Python 内置的异步编程框架,核心是事件循环(Event Loop)、协程(Coroutine)和任务(Task)。

异步函数与 async/await:

使用 async def 定义异步函数,其调用返回一个协程对象。await 用于等待一个异步操作完成。

import asyncio

async def fetch_data(url):

print(f”Fetching {url}…”)

await asyncio.sleep(2)

return f”Data from {url}”

async def main():

results = await asyncio.gather(

fetch_data(“https://api.example.com/1 “),

fetch_data(“https://api.example.com/2 “)

)

print(results)

asyncio.run(main())

异步 HTTP 请求(aiohttp):

对于网络请求,可以使用 aiohttp 这样的异步 HTTP 客户端库来取代同步的 requests 库。

import aiohttp

import asyncio

async def fetch(session, url):

async with session.get(url) as response:

return await response.text()

async def main():

urls = [“https://api.example.com/1 “, “https://api.example.com/2 “]

async with aiohttp.ClientSession() as session:

tasks = [fetch(session, url) for url in urls]

results = await asyncio.gather(*tasks)

print(results)

asyncio.run(main())

高性能异步 Web 框架(FastAPI):

FastAPI 是一个现代的高性能 Web 框架,内置了对异步的支持,超级适合构建高并发的 API 服务。

from fastapi import FastAPI

import aiohttp

app = FastAPI()

@app.get(“/proxy”)

async def proxy(url: str):

async with aiohttp.ClientSession() as session:

async with session.get(url) as resp:

return await resp.json()

多线程与多进程

多线程 (threading 模块):适用于 I/O 密集型任务。当某个线程在等待 I/O 时,其他线程可以继续执行。但由于 Python 的全局解释器锁(GIL),多线程不适合 CPU 密集型任务。

之前的网络聊天室示例就使用了多线程来处理多个客户端连接。

多进程 (multiprocessing 模块):适用于 CPU 密集型任务。每个进程有自己独立的 Python 解释器和内存空间,避免了 GIL 的限制,真正实现了并行计算。但进程创建和切换的开销比线程大。

高并发 Socket 技巧

对于直接的 Socket 编程,也可以采用一些模式来实现高并发:

异步非阻塞模式:使用 selectors 模块(基于 select 或更高效的 epoll/kqueue)来监听多个 Socket 的事件(如可读、可写),在单个线程中处理多个连接。

import socket

import selectors

sel = selectors.DefaultSelector()

sock = socket.socket()

sock.bind((‘0.0.0.0’, 8888))

sock.listen(100)

sock.setblocking(False)

def accept(sock, mask):

conn, addr = sock.accept()

conn.setblocking(False)

sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):

try:

data = conn.recv(1000)

if data:

conn.send(data)

else:

sel.unregister(conn)

conn.close()

except ConnectionResetError:

sel.unregister(conn)

conn.close()

sel.register(sock, selectors.EVENT_READ, accept)

while True:

events = sel.select()

for key, mask in events:

callback = key.data

callback(key.fileobj, mask)

连接池:对于需要频繁创建和销毁网络连接的情况,使用连接池可以避免重复建立连接的开销。

from queue import Queue

class SocketPool:

def init(self, host, port, pool_size=10):

self.pool = Queue()

for _ in range(pool_size):

sock = socket.socket()

sock.connect((host, port))

self.pool.put(sock)

def get(self):

return self.pool.get()

def put(self, sock):

self.pool.put(sock)

心跳机制:在长连接中,定期发送心跳包(如 ping/pong)来检测连接是否存活,及时清理失效连接。

import time

def handle_client(conn):

last_heartbeat = time.time()

while True:

try:

data = conn.recv(1024)

if data == b’ping’:

conn.send(b’pong’)

last_heartbeat = time.time()

if time.time() – last_heartbeat > 30:

raise ConnectionError(“心跳超时”)

except Exception:

conn.close()

break

三、最佳实践与注意事项

异常处理与重试:网络操作极不稳定,必须使用 try-except 块妥善处理超时、连接错误、解析错误等异常,并可根据需要实现重试逻辑。

资源管理:使用 with 语句或确保手动调用 close() 方法,及时释放 Socket 连接、文件句柄等资源,避免泄漏。

安全性:

验证和清理所有输入,防止注入攻击。

使用 HTTPS、SSL/TLS 加密敏感数据传输。

妥善保管密钥、令牌等敏感信息,不要硬编码在代码中。

性能考量:

I/O 密集型 vs CPU 密集型:根据任务类型选择合适的并发模型(异步、多线程、多进程)。

使用高效库:例如,在高并发异步应用中使用 aiohttp 取代 requests,使用 asyncpg 访问 PostgreSQL 数据库等。

数据库优化:为常用查询字段建立索引,使用连接池减少连接开销。

缓存:使用 Redis 或 Memcached 等缓存频繁访问的数据,减轻后端压力。

遵守规则:编写网络爬虫时,尊重 robots.txt,设置合理的请求间隔,避免对目标网站造成过大压力。

© 版权声明

相关文章

暂无评论

您必须登录才能参与评论!
立即登录
none
暂无评论...