今日练习主题:Web爬虫与数据采集
今天我们将学习使用Python进行Web爬虫和数据采集,包括requests、BeautifulSoup、Scrapy等工具的使用。
练习1:Requests库基础
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import time
import json
def requests_basics():
"""Requests库基础使用"""
print("=== Requests库基础 ===")
# 1. 基本GET请求
print("1. 基本GET请求")
# 测试用的公共API
url = "https://httpbin.org/get"
try:
response = requests.get(url)
print(f"状态码: {response.status_code}")
print(f"响应头: {dict(response.headers)}")
print(f"响应内容前200字符: {response.text[:200]}...")
# JSON响应处理
if response.headers.get('Content-Type', '').startswith('application/json'):
data = response.json()
print(f"解析的JSON数据: {data}")
except requests.exceptions.RequestException as e:
print(f"请求错误: {e}")
# 2. 带参数的GET请求
print("
2. 带参数的GET请求")
params = {
'key1': 'value1',
'key2': 'value2',
'page': 1,
'limit': 10
}
response = requests.get("https://httpbin.org/get", params=params)
print(f"请求URL: {response.url}")
print(f"请求参数: {params}")
# 3. POST请求
print("
3. POST请求")
data = {
'username': 'testuser',
'password': 'testpass',
'email': 'test@example.com'
}
response = requests.post("https://httpbin.org/post", data=data)
print(f"状态码: {response.status_code}")
if response.status_code == 200:
result = response.json()
print(f"表单数据: {result.get('form', {})}")
# 4. 带JSON数据的POST请求
print("
4. JSON POST请求")
json_data = {
'title': '测试文章',
'content': '这是测试内容',
'author': '测试作者',
'tags': ['python', '爬虫', '测试']
}
response = requests.post(
"https://httpbin.org/post",
json=json_data,
headers={'Content-Type': 'application/json'}
)
if response.status_code == 200:
result = response.json()
print(f"JSON数据: {result.get('json', {})}")
# 5. 设置请求头
print("
5. 自定义请求头")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Referer': 'https://www.example.com'
}
response = requests.get("https://httpbin.org/headers", headers=headers)
if response.status_code == 200:
result = response.json()
print(f"请求头信息: {result.get('headers', {})}")
# 6. 处理Cookies
print("
6. Cookies处理")
# 设置cookies
cookies = {'session_id': 'abc123', 'user_id': '456'}
response = requests.get("https://httpbin.org/cookies", cookies=cookies)
if response.status_code == 200:
result = response.json()
print(f"发送的Cookies: {result.get('cookies', {})}")
# 获取cookies
response = requests.get("https://httpbin.org/cookies/set/sessionid/123456")
print(f"服务器设置的Cookies: {response.cookies.get_dict()}")
def advanced_requests():
"""Requests高级功能"""
print("
=== Requests高级功能 ===")
# 1. 会话保持
print("1. 会话保持")
with requests.Session() as session:
# 第一次请求设置cookies
response1 = session.get("https://httpbin.org/cookies/set/sessionid/789012")
print(f"第一次请求Cookies: {session.cookies.get_dict()}")
# 第二次请求会携带cookies
response2 = session.get("https://httpbin.org/cookies")
if response2.status_code == 200:
result = response2.json()
print(f"第二次请求Cookies: {result.get('cookies', {})}")
# 2. 超时设置
print("
2. 超时设置")
try:
# 设置连接超时和读取超时
response = requests.get("https://httpbin.org/delay/2", timeout=(3.05, 5))
print("请求成功完成")
except requests.exceptions.Timeout:
print("请求超时")
except requests.exceptions.RequestException as e:
print(f"请求错误: {e}")
# 3. 重试机制
print("
3. 重试机制")
def create_session_with_retries():
session = requests.Session()
# 定义重试策略
retry_strategy = Retry(
total=3, # 总重试次数
backoff_factor=1, # 退避因子
status_forcelist=[429, 500, 502, 503, 504], # 需要重试的状态码
)
# 创建适配器
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
session = create_session_with_retries()
try:
response = session.get("https://httpbin.org/status/500")
print(f"最终状态码: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
# 4. 代理设置
print("
4. 代理设置")
# 注意:这里使用免费代理示例,实际使用时需要有效的代理服务器
proxies = {
'http': 'http://proxy.example.com:8080',
'https': 'https://proxy.example.com:8080',
}
# 实际使用时撤销注释
# try:
# response = requests.get("https://httpbin.org/ip", proxies=proxies, timeout=10)
# print(f"通过代理获取的IP: {response.json()}")
# except:
# print("代理请求失败")
print("代理示例已注释,需要有效代理服务器才能运行")
# 5. 文件下载
print("
5. 文件下载")
def download_file(url, filename):
"""下载文件"""
try:
response = requests.get(url, stream=True)
response.raise_for_status()
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"文件下载成功: {filename}")
return True
except requests.exceptions.RequestException as e:
print(f"下载失败: {e}")
return False
# 下载一个小图片示例
image_url = "https://httpbin.org/image/png"
download_file(image_url, "downloaded_image.png")
# 运行Requests示例
requests_basics()
advanced_requests()
练习2:BeautifulSoup网页解析
from bs4 import BeautifulSoup
import requests
import re
def beautifulsoup_basics():
"""BeautifulSoup基础使用"""
print("=== BeautifulSoup网页解析 ===")
# 1. 解析HTML字符串
print("1. 解析HTML字符串")
html_doc = """
<html>
<head>
<title>测试网页</title>
</head>
<body>
<div>
<h1 id="main-title">网页标题</h1>
<p>这是一个段落。</p>
<p>这是另一个段落。</p>
<ul id="list">
<li>项目1</li>
<li>项目2</li>
<li>项目3</li>
</ul>
<a href="https://www.example.com">示例链接</a>
<div>
<span>嵌套内容</span>
</div>
</div>
</body>
</html>
"""
soup = BeautifulSoup(html_doc, 'html.parser')
# 基本操作
print(f"网页标题: {soup.title.string}")
print(f"第一个p标签: {soup.p.text}")
print(f"所有p标签数量: {len(soup.find_all('p'))}")
# 2. 标签选择方法
print("
2. 标签选择方法")
# 通过标签名
print("所有li标签:")
for li in soup.find_all('li'):
print(f" - {li.text}")
# 通过class
print("
class为content的标签:")
for p in soup.find_all(class_='content'):
print(f" - {p.text}")
# 通过id
main_title = soup.find(id='main-title')
print(f"
id为main-title的标签: {main_title.text}")
# 通过属性
link = soup.find('a', href='https://www.example.com')
print(f"链接: {link.text} -> {link['href']}")
# 3. CSS选择器
print("
3. CSS选择器")
# 选择所有item类
items = soup.select('.item')
print("CSS选择器 .item:")
for item in items:
print(f" - {item.text}")
# 选择active类的item
active_item = soup.select('.item.active')
print(f"激活的项目: {active_item[0].text if active_item else '无'}")
# 选择嵌套内容
nested = soup.select('.nested span')
print(f"嵌套内容: {nested[0].text if nested else '无'}")
# 4. 实际网页解析示例
print("
4. 实际网页解析示例")
def parse_example_website():
"""解析示例网站"""
try:
# 使用一个简单的测试网站
url = "https://httpbin.org/html"
response = requests.get(url)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
# 提取标题
title = soup.find('h1')
if title:
print(f"页面标题: {title.text}")
# 提取所有段落
paragraphs = soup.find_all('p')
print("页面段落:")
for i, p in enumerate(paragraphs, 1):
print(f" {i}. {p.text.strip()}")
# 提取所有链接
links = soup.find_all('a')
print("页面链接:")
for link in links:
href = link.get('href', '')
text = link.text.strip()
if text or href:
print(f" - {text} -> {href}")
except requests.exceptions.RequestException as e:
print(f"请求错误: {e}")
parse_example_website()
def advanced_beautifulsoup():
"""BeautifulSoup高级功能"""
print("
=== BeautifulSoup高级功能 ===")
# 创建测试HTML
html_content = """
<div>
<div data-id="1">
<h3>商品A</h3>
<p>¥100.00</p>
<p>这是商品A的描述</p>
<span>热卖</span>
<span>新品</span>
</div>
<div data-id="2">
<h3>商品B</h3>
<p>¥200.00</p>
<p>这是商品B的描述</p>
<span>折扣</span>
</div>
<div data-id="3">
<h3>商品C</h3>
<p>¥150.00</p>
<p>这是商品C的描述</p>
</div>
</div>
"""
soup = BeautifulSoup(html_content, 'html.parser')
# 1. 数据提取和清洗
print("1. 数据提取和清洗")
products = []
for product_div in soup.find_all('div', class_='product'):
product = {}
# 提取商品名称
name_tag = product_div.find('h3')
product['name'] = name_tag.text.strip() if name_tag else '未知'
# 提取价格(使用正则表达式清理)
price_tag = product_div.find('p', class_='price')
if price_tag:
# 提取数字
price_match = re.search(r'[d.]+', price_tag.text)
product['price'] = float(price_match.group()) if price_match else 0.0
else:
product['price'] = 0.0
# 提取描述
desc_tag = product_div.find('p', class_='description')
product['description'] = desc_tag.text.strip() if desc_tag else ''
# 提取标签
tags = [tag.text.strip() for tag in product_div.find_all('span', class_='tag')]
product['tags'] = tags
# 提取自定义属性
product['id'] = product_div.get('data-id', '')
products.append(product)
print("提取的商品信息:")
for i, product in enumerate(products, 1):
print(f"商品{i}: {product}")
# 2. 导航方法
print("
2. 导航方法")
# 父节点和子节点
first_product = soup.find('div', class_='product')
if first_product:
print(f"第一个商品的父节点: {first_product.parent.get('class', [])}")
print(f"第一个商品的直接子节点:")
for child in first_product.children:
if child.name: # 过滤掉文本节点
print(f" - {child.name}: {child.text.strip()}")
# 兄弟节点
second_product = soup.find_all('div', class_='product')[1]
if second_product:
next_sibling = second_product.find_next_sibling('div', class_='product')
prev_sibling = second_product.find_previous_sibling('div', class_='product')
print(f"第二个商品的前一个兄弟: {prev_sibling.get('data-id') if prev_sibling else '无'}")
print(f"第二个商品的后一个兄弟: {next_sibling.get('data-id') if next_sibling else '无'}")
# 3. 修改HTML
print("
3. 修改HTML")
# 修改内容
first_h3 = soup.find('h3')
if first_h3:
original_text = first_h3.text
first_h3.string = "修改后的商品A"
print(f"修改前: {original_text}")
print(f"修改后: {first_h3.text}")
# 添加新元素
new_product = soup.new_tag('div', **{'class': 'product', 'data-id': '4'})
new_product.append(soup.new_tag('h3'))
new_product.h3.string = "商品D"
products_div = soup.find('div', class_='products')
if products_div:
products_div.append(new_product)
print(f"添加新商品后商品数量: {len(products_div.find_all('div', class_='product'))}")
# 4. 输出美化
print("
4. 美化输出")
print(soup.prettify()[:500] + "...")
# 运行BeautifulSoup示例
beautifulsoup_basics()
advanced_beautifulsoup()
练习3:实战爬虫项目 – 新闻爬取
import csv
import json
import os
from datetime import datetime
def news_crawler():
"""新闻爬虫实战项目"""
print("=== 新闻爬虫实战项目 ===")
# 创建数据存储目录
os.makedirs('news_data', exist_ok=True)
class NewsCrawler:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def crawl_news(self, url):
"""爬取新闻页面"""
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# 提取新闻信息(这里使用模拟数据,实际需要根据目标网站调整)
news_data = self.extract_news_data(soup)
return news_data
except requests.exceptions.RequestException as e:
print(f"爬取失败 {url}: {e}")
return None
def extract_news_data(self, soup):
"""提取新闻数据(示例实现)"""
# 这里是一个示例实现,实际爬取时需要根据目标网站结构调整
# 模拟提取数据
news = {
'title': '示例新闻标题',
'content': '这是新闻内容...',
'publish_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'author': '示例作者',
'source': '示例来源',
'url': 'https://example.com/news/1',
'crawl_time': datetime.now().isoformat()
}
# 实际爬取时,需要根据网站结构提取真实数据
# 例如:
# title = soup.find('h1', class_='news-title')
# content = soup.find('div', class_='news-content')
# ...
return news
def save_to_json(self, news_list, filename):
"""保存到JSON文件"""
filepath = os.path.join('news_data', filename)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(news_list, f, ensure_ascii=False, indent=2)
print(f"已保存 {len(news_list)} 条新闻到 {filepath}")
def save_to_csv(self, news_list, filename):
"""保存到CSV文件"""
filepath = os.path.join('news_data', filename)
if news_list:
fieldnames = news_list[0].keys()
with open(filepath, 'w', encoding='utf-8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(news_list)
print(f"已保存 {len(news_list)} 条新闻到 {filepath}")
# 使用示例
crawler = NewsCrawler()
# 模拟爬取多个新闻页面
news_list = []
base_url = "https://httpbin.org/html" # 使用测试页面
print("开始爬取新闻...")
for i in range(5): # 模拟爬取5个页面
print(f"爬取第 {i+1} 个页面...")
# 在实际项目中,这里应该是真实的新闻URL列表
news_data = crawler.crawl_news(base_url)
if news_data:
# 为每个新闻生成唯一数据
news_data['title'] = f"示例新闻标题 {i+1}"
news_data['content'] = f"这是第 {i+1} 个新闻的内容..."
news_data['url'] = f"https://example.com/news/{i+1}"
news_list.append(news_data)
# 添加延迟,避免请求过快
time.sleep(1)
# 保存数据
if news_list:
crawler.save_to_json(news_list, 'news.json')
crawler.save_to_csv(news_list, 'news.csv')
# 显示爬取结果
print("
爬取结果摘要:")
for i, news in enumerate(news_list[:3], 1): # 显示前3条
print(f"{i}. {news['title']}")
print(f" 来源: {news['source']}")
print(f" 时间: {news['publish_time']}")
print(f" 内容预览: {news['content'][:50]}...")
print()
else:
print("没有爬取到新闻数据")
# 运行新闻爬虫
news_crawler()
练习4:动态内容爬取 – Selenium
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import time
def selenium_basics():
"""Selenium基础使用"""
print("=== Selenium动态内容爬取 ===")
# 设置Chrome选项
chrome_options = Options()
chrome_options.add_argument('--headless') # 无头模式
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
try:
# 初始化浏览器驱动
print("初始化浏览器驱动...")
driver = webdriver.Chrome(options=chrome_options)
# 1. 基本页面操作
print("1. 基本页面操作")
# 访问页面
driver.get("https://httpbin.org/html")
print(f"页面标题: {driver.title}")
print(f"当前URL: {driver.current_url}")
# 2. 元素查找和交互
print("
2. 元素查找和交互")
# 等待元素加载
wait = WebDriverWait(driver, 10)
# 查找元素
try:
# 查找h1元素
h1_element = wait.until(
EC.presence_of_element_located((By.TAG_NAME, "h1"))
)
print(f"找到h1元素: {h1_element.text}")
# 查找所有p元素
p_elements = driver.find_elements(By.TAG_NAME, "p")
print(f"找到 {len(p_elements)} 个p元素")
for i, p in enumerate(p_elements[:3], 1): # 显示前3个
print(f" {i}. {p.text}")
except Exception as e:
print(f"元素查找错误: {e}")
# 3. 执行JavaScript
print("
3. 执行JavaScript")
# 执行JavaScript代码
script_result = driver.execute_script("return document.title;")
print(f"通过JavaScript获取标题: {script_result}")
# 滚动页面
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
print("已滚动到页面底部")
# 4. 表单操作示例(在测试页面上)
print("
4. 表单操作示例")
# 访问一个包含表单的测试页面
driver.get("https://httpbin.org/forms/post")
# 查找表单元素
try:
# 填写输入框
name_input = wait.until(
EC.presence_of_element_located((By.NAME, "custname"))
)
name_input.clear()
name_input.send_keys("测试用户")
print("已填写姓名字段")
# 选择单选按钮
size_radio = driver.find_element(By.CSS_SELECTOR, "input[value='medium']")
size_radio.click()
print("已选择中等尺寸")
# 选择复选框
toppings = driver.find_elements(By.NAME, "topping")
for topping in toppings[:2]: # 选择前两个配料
topping.click()
print("已选择配料")
# 在实际爬虫中,这里可以提交表单
# submit_button = driver.find_element(By.TAG_NAME, "button")
# submit_button.click()
except Exception as e:
print(f"表单操作错误: {e}")
# 5. 截图功能
print("
5. 截图功能")
# 保存截图
screenshot_path = "news_data/selenium_screenshot.png"
driver.save_screenshot(screenshot_path)
print(f"截图已保存到: {screenshot_path}")
# 6. 处理弹窗和窗口
print("
6. 窗口处理")
# 打开新窗口
driver.execute_script("window.open('https://httpbin.org/html');")
print("已打开新窗口")
# 切换窗口
windows = driver.window_handles
print(f"当前窗口数量: {len(windows)}")
if len(windows) > 1:
driver.switch_to.window(windows[1])
print(f"切换到新窗口,标题: {driver.title}")
# 关闭新窗口并切换回原窗口
driver.close()
driver.switch_to.window(windows[0])
print("已关闭新窗口并切换回原窗口")
except Exception as e:
print(f"Selenium错误: {e}")
finally:
# 关闭浏览器
if 'driver' in locals():
driver.quit()
print("浏览器已关闭")
def dynamic_content_crawler():
"""动态内容爬虫示例"""
print("
=== 动态内容爬虫示例 ===")
# 这个示例需要实际的动态网站,这里使用模拟说明
class DynamicCrawler:
def __init__(self):
chrome_options = Options()
chrome_options.add_argument('--headless')
self.driver = webdriver.Chrome(options=chrome_options)
self.wait = WebDriverWait(self.driver, 10)
def crawl_dynamic_content(self, url):
"""爬取动态加载的内容"""
try:
self.driver.get(url)
# 等待动态内容加载
time.sleep(3) # 简单等待,实际应该使用明确的等待条件
# 获取渲染后的页面源码
page_source = self.driver.page_source
# 使用BeautifulSoup解析
soup = BeautifulSoup(page_source, 'html.parser')
# 提取动态加载的内容
# 这里根据实际网站结构编写提取逻辑
return soup
except Exception as e:
print(f"动态内容爬取错误: {e}")
return None
def close(self):
"""关闭浏览器"""
self.driver.quit()
# 使用说明
print("动态内容爬虫使用说明:")
print("1. 对于JavaScript渲染的内容,使用Selenium")
print("2. 等待动态内容加载完成")
print("3. 获取渲染后的页面源码")
print("4. 使用BeautifulSoup解析内容")
print("5. 注意添加适当的延迟和等待条件")
# 由于需要实际网站,这里不执行具体爬取
print("
注意: 实际动态爬虫需要指定具体的目标网站")
# 运行Selenium示例(需要安装ChromeDriver)
try:
selenium_basics()
dynamic_content_crawler()
except Exception as e:
print(f"Selenium示例运行失败: {e}")
print("请确保已安装Chrome浏览器和ChromeDriver")
练习5:API数据采集
def api_data_collection():
"""API数据采集"""
print("=== API数据采集 ===")
# 1. 公共API数据采集
print("1. 公共API数据采集")
class APICollector:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json'
})
def get_public_data(self, api_url, params=None):
"""获取公共API数据"""
try:
response = self.session.get(api_url, params=params, timeout=10)
response.raise_for_status()
if response.headers.get('Content-Type', '').startswith('application/json'):
return response.json()
else:
return response.text
except requests.exceptions.RequestException as e:
print(f"API请求错误: {e}")
return None
def collect_weather_data(self, city='Beijing'):
"""收集天气数据示例"""
# 使用开放的天气API(示例,需要注册获取API key)
# 这里使用模拟数据
print(f"获取 {city} 的天气数据...")
# 模拟天气数据
weather_data = {
'city': city,
'temperature': 25.5,
'humidity': 60,
'description': '晴朗',
'wind_speed': 3.2,
'timestamp': datetime.now().isoformat()
}
return weather_data
def collect_financial_data(self, symbol='AAPL'):
"""收集金融数据示例"""
print(f"获取 {symbol} 的金融数据...")
# 模拟金融数据
financial_data = {
'symbol': symbol,
'price': 150.25,
'change': 2.5,
'change_percent': 1.69,
'volume': 12500000,
'timestamp': datetime.now().isoformat()
}
return financial_data
def save_api_data(self, data, filename):
"""保存API数据"""
os.makedirs('api_data', exist_ok=True)
filepath = os.path.join('api_data', filename)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"数据已保存到: {filepath}")
# 使用示例
collector = APICollector()
# 收集多种数据
weather_data = collector.collect_weather_data('Beijing')
financial_data = collector.collect_financial_data('AAPL')
# 保存数据
if weather_data:
collector.save_api_data(weather_data, 'weather.json')
if financial_data:
collector.save_api_data(financial_data, 'financial.json')
# 2. 分页数据采集
print("
2. 分页数据采集")
def paginated_api_collection():
"""分页API数据采集"""
all_data = []
for page in range(1, 4): # 模拟采集3页数据
print(f"采集第 {page} 页数据...")
# 模拟API响应
page_data = {
'page': page,
'data': [
{'id': i, 'name': f'项目{(page-1)*10 + i}'}
for i in range(1, 11) # 每页10条数据
],
'total_pages': 3,
'total_items': 30
}
all_data.extend(page_data['data'])
# 添加延迟避免请求过快
time.sleep(1)
# 检查是否还有下一页
if page >= page_data['total_pages']:
break
print(f"共采集 {len(all_data)} 条数据")
return all_data
paginated_data = paginated_api_collection()
collector.save_api_data(paginated_data, 'paginated_data.json')
# 3. 实时数据监控
print("
3. 实时数据监控")
class RealTimeMonitor:
def __init__(self):
self.data_points = []
def monitor_api(self, api_url, interval=60, duration=300):
"""监控API数据"""
print(f开始监控 {api_url},间隔 {interval}秒,持续 {duration}秒")
start_time = time.time()
point_count = 0
while time.time() - start_time < duration:
# 获取数据
data = collector.get_public_data(api_url)
if data:
monitoring_point = {
'timestamp': datetime.now().isoformat(),
'data': data,
'point_number': point_count
}
self.data_points.append(monitoring_point)
point_count += 1
print(f"采集第 {point_count} 个数据点")
# 等待下一次采集
time.sleep(interval)
print(f"监控结束,共采集 {point_count} 个数据点")
return self.data_points
# 模拟实时监控
monitor = RealTimeMonitor()
# 实际使用时撤销注释
# monitoring_data = monitor.monitor_api('https://api.example.com/data', 10, 60)
print("实时监控示例已准备就绪")
# 运行API数据采集
api_data_collection()
练习6:爬虫伦理与最佳实践
def crawler_ethics_best_practices():
"""爬虫伦理与最佳实践"""
print("=== 爬虫伦理与最佳实践 ===")
# 1. 遵守robots.txt
print("1. 遵守robots.txt")
def check_robots_txt(base_url):
"""检查robots.txt"""
robots_url = f"{base_url}/robots.txt"
try:
response = requests.get(robots_url, timeout=5)
if response.status_code == 200:
print(f"{base_url} 的robots.txt内容:")
print(response.text[:500] + "..." if len(response.text) > 500 else response.text)
else:
print(f"无法获取 {robots_url},状态码: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"获取robots.txt失败: {e}")
# 检查示例网站的robots.txt
check_robots_txt("https://www.example.com")
# 2. 设置合理的请求间隔
print("
2. 请求频率控制")
class PoliteCrawler:
def __init__(self, delay=1.0):
self.delay = delay # 请求间隔(秒)
self.last_request_time = 0
self.session = requests.Session()
def polite_get(self, url, **kwargs):
"""礼貌的GET请求"""
# 计算需要等待的时间
current_time = time.time()
time_since_last = current_time - self.last_request_time
wait_time = max(0, self.delay - time_since_last)
if wait_time > 0:
print(f"等待 {wait_time:.2f} 秒...")
time.sleep(wait_time)
self.last_request_time = time.time()
return self.session.get(url, **kwargs)
# 使用礼貌爬虫
polite_crawler = PoliteCrawler(delay=2.0)
print("使用礼貌爬虫示例:")
for i in range(3):
print(f"第 {i+1} 次请求...")
try:
response = polite_crawler.polite_get("https://httpbin.org/delay/1")
print(f"状态码: {response.status_code}")
except Exception as e:
print(f"请求失败: {e}")
# 3. 错误处理和重试机制
print("
3. 健壮的错误处理")
def robust_crawler(url, max_retries=3):
"""健壮的爬虫函数"""
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response
except requests.exceptions.Timeout:
print(f"请求超时,第 {attempt+1} 次重试...")
except requests.exceptions.HTTPError as e:
print(f"HTTP错误: {e}")
if e.response.status_code == 404:
print("页面不存在,停止重试")
break
except requests.exceptions.RequestException as e:
print(f"请求错误: {e}")
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 指数退避
print(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
print(f"经过 {max_retries} 次尝试后依旧失败")
return None
# 测试健壮爬虫
print("测试健壮爬虫:")
response = robust_crawler("https://httpbin.org/status/500") # 模拟错误
# 4. 数据存储最佳实践
print("
4. 数据存储最佳实践")
class DataManager:
def __init__(self, base_dir='crawled_data'):
self.base_dir = base_dir
os.makedirs(base_dir, exist_ok=True)
def save_data(self, data, filename, format='json'):
"""保存数据"""
filepath = os.path.join(self.base_dir, filename)
try:
if format == 'json':
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
elif format == 'csv' and isinstance(data, list) and data:
with open(filepath, 'w', encoding='utf-8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
print(f"数据已保存: {filepath}")
return True
except Exception as e:
print(f"保存数据失败: {e}")
return False
def backup_data(self):
"""数据备份"""
backup_dir = f"{self.base_dir}_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
os.makedirs(backup_dir, exist_ok=True)
# 模拟备份过程
print(f"数据已备份到: {backup_dir}")
return backup_dir
# 使用数据管理器
data_manager = DataManager()
sample_data = [
{'id': 1, 'name': '测试数据1', 'value': 100},
{'id': 2, 'name': '测试数据2', 'value': 200}
]
data_manager.save_data(sample_data, 'sample.json')
data_manager.save_data(sample_data, 'sample.csv', format='csv')
data_manager.backup_data()
# 5. 法律和伦理思考
print("
5. 法律和伦理思考")
print("重大原则:")
print("✓ 尊重网站的使用条款")
print("✓ 遵守robots.txt协议")
print("✓ 设置合理的请求频率")
print("✓ 不爬取敏感或个人隐私数据")
print("✓ 不将爬取数据用于非法用途")
print("✓ 尊重知识产权")
print("✓ 明确标识爬虫身份(User-Agent)")
# 运行爬虫伦理与最佳实践
crawler_ethics_best_practices()
今日挑战:
创建一个完整的爬虫项目,爬取一个实际网站的数据并进行存储和分析。
# 挑战练习:完整爬虫项目 - 图书信息爬取
def complete_crawler_project():
"""完整爬虫项目:图书信息爬取"""
print("=== 完整爬虫项目:图书信息爬取 ===")
class BookCrawler:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'
})
self.data_manager = DataManager('book_data')
def crawl_book_list(self, base_url, pages=3):
"""爬取图书列表"""
print(f"开始爬取图书列表,共 {pages} 页...")
all_books = []
for page in range(1, pages + 1):
print(f"爬取第 {page} 页...")
# 构建URL(这里使用模拟URL)
url = f"{base_url}?page={page}"
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
# 解析页面
soup = BeautifulSoup(response.text, 'html.parser')
# 提取图书信息(模拟实现)
books = self.extract_books_from_page(soup, page)
all_books.extend(books)
print(f"第 {page} 页爬取完成,获得 {len(books)} 本图书")
# 礼貌延迟
time.sleep(2)
except Exception as e:
print(f"爬取第 {page} 页失败: {e}")
continue
return all_books
def extract_books_from_page(self, soup, page_num):
"""从页面提取图书信息(模拟实现)"""
# 在实际项目中,这里需要根据目标网站的实际HTML结构编写提取逻辑
# 这里使用模拟数据
books = []
for i in range(5): # 每页模拟5本书
book = {
'id': f"book_{page_num}_{i+1}",
'title': f"图书标题 {page_num}-{i+1}",
'author': f"作者 {i+1}",
'price': round(20 + i * 5 + page_num, 2),
'rating': round(3 + i * 0.5, 1),
'description': f"这是第 {page_num} 页第 {i+1} 本书的描述",
'category': ['小说', '文学', '科技'][i % 3],
'publish_date': f"2023-{page_num:02d}-{i+1:02d}",
'page_count': 200 + i * 50,
'crawl_time': datetime.now().isoformat()
}
books.append(book)
return books
def crawl_book_detail(self, book_id):
"""爬取图书详细信息"""
print(f"爬取图书详情: {book_id}")
# 模拟详细爬取
detail = {
'book_id': book_id,
'title': f"图书标题 {book_id}",
'isbn': f"978-7-{book_id.replace('_', '-')}",
'publisher': "示例出版社",
'summary': "这是图书的详细摘要...",
'chapters': [f"第{i+1}章" for i in range(10)],
'reviews': [
{'user': '用户1', 'rating': 5, 'comment': '很好'},
{'user': '用户2', 'rating': 4, 'comment': '不错'}
]
}
time.sleep(1) # 延迟
return detail
def analyze_books(self, books):
"""分析图书数据"""
print("
进行图书数据分析...")
if not books:
print("没有数据可分析")
return
# 基本统计
total_books = len(books)
avg_price = sum(book['price'] for book in books) / total_books
avg_rating = sum(book['rating'] for book in books) / total_books
print(f"图书总数: {total_books}")
print(f"平均价格: ¥{avg_price:.2f}")
print(f"平均评分: {avg_rating:.1f}")
# 价格分布
price_ranges = {'0-20': 0, '21-40': 0, '41-60': 0, '61+': 0}
for book in books:
price = book['price']
if price <= 20:
price_ranges['0-20'] += 1
elif price <= 40:
price_ranges['21-40'] += 1
elif price <= 60:
price_ranges['41-60'] += 1
else:
price_ranges['61+'] += 1
print("
价格分布:")
for range_name, count in price_ranges.items():
percentage = (count / total_books) * 100
print(f" {range_name}元: {count}本 ({percentage:.1f}%)")
# 评分分布
rating_counts = {}
for book in books:
rating = book['rating']
rating_key = f"{rating:.1f}"
rating_counts[rating_key] = rating_counts.get(rating_key, 0) + 1
print("
评分分布:")
for rating, count in sorted(rating_counts.items()):
percentage = (count / total_books) * 100
print(f" {rating}星: {count}本 ({percentage:.1f}%)")
return {
'total_books': total_books,
'avg_price': avg_price,
'avg_rating': avg_rating,
'price_distribution': price_ranges,
'rating_distribution': rating_counts
}
def generate_report(self, books, analysis):
"""生成爬虫报告"""
print("
生成爬虫报告...")
report = {
'crawl_info': {
'crawl_time': datetime.now().isoformat(),
'total_books_crawled': len(books),
'data_source': '模拟数据'
},
'analysis_results': analysis,
'sample_data': books[:3] # 包含3个样本
}
# 保存报告
self.data_manager.save_data(report, 'crawl_report.json')
# 保存完整数据
self.data_manager.save_data(books, 'books.json')
self.data_manager.save_data(books, 'books.csv', format='csv')
# 备份数据
self.data_manager.backup_data()
return report
def run(self):
"""运行完整爬虫流程"""
print("启动图书爬虫...")
# 1. 爬取图书列表
base_url = "https://example.com/books" # 模拟URL
books = self.crawl_book_list(base_url, pages=3)
if not books:
print("没有爬取到图书数据")
return
# 2. 爬取详细信息(可选)
detailed_books = []
for book in books[:2]: # 只爬取前2本的详细信息作为示例
detail = self.crawl_book_detail(book['id'])
detailed_book = {**book, **detail}
detailed_books.append(detailed_book)
# 3. 数据分析
analysis = self.analyze_books(books)
# 4. 生成报告
report = self.generate_report(books, analysis)
print("
" + "="*50)
print("爬虫项目完成!")
print("="*50)
print(f"爬取图书: {len(books)} 本")
print(f"详细爬取: {len(detailed_books)} 本")
print(f"平均价格: ¥{analysis['avg_price']:.2f}")
print(f"平均评分: {analysis['avg_rating']:.1f}")
print(f"数据保存位置: book_data/")
return report
# 运行爬虫项目
crawler = BookCrawler()
report = crawler.run()
return report
# 运行完整爬虫项目
crawler_report = complete_crawler_project()
学习提示:
- Requests库:HTTP请求基础,会话管理,错误处理
- BeautifulSoup:HTML解析,CSS选择器,数据提取
- Selenium:动态内容爬取,浏览器自动化
- API采集:RESTful API调用,分页处理,数据格式
- 爬虫伦理:遵守robots.txt,设置合理延迟,尊重版权
- 数据存储:JSON、CSV文件存储,数据备份
- 项目管理:代码组织,错误处理,日志记录
- 法律合规:了解相关法律法规,避免法律风险
明天我们将学习自动化运维和系统管理!坚持练习,你的爬虫技能会越来越强!
© 版权声明
文章版权归作者所有,未经允许请勿转载。
相关文章
您必须登录才能参与评论!
立即登录
收藏了,感谢分享