Python疯狂练习60天——第十八天

今日练习主题:Web爬虫与数据采集

今天我们将学习使用Python进行Web爬虫和数据采集,包括requests、BeautifulSoup、Scrapy等工具的使用。

练习1:Requests库基础

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import time
import json

def requests_basics():
    """Requests库基础使用"""
    print("=== Requests库基础 ===")
    
    # 1. 基本GET请求
    print("1. 基本GET请求")
    
    # 测试用的公共API
    url = "https://httpbin.org/get"
    
    try:
        response = requests.get(url)
        print(f"状态码: {response.status_code}")
        print(f"响应头: {dict(response.headers)}")
        print(f"响应内容前200字符: {response.text[:200]}...")
        
        # JSON响应处理
        if response.headers.get('Content-Type', '').startswith('application/json'):
            data = response.json()
            print(f"解析的JSON数据: {data}")
            
    except requests.exceptions.RequestException as e:
        print(f"请求错误: {e}")
    
    # 2. 带参数的GET请求
    print("
2. 带参数的GET请求")
    
    params = {
        'key1': 'value1',
        'key2': 'value2',
        'page': 1,
        'limit': 10
    }
    
    response = requests.get("https://httpbin.org/get", params=params)
    print(f"请求URL: {response.url}")
    print(f"请求参数: {params}")
    
    # 3. POST请求
    print("
3. POST请求")
    
    data = {
        'username': 'testuser',
        'password': 'testpass',
        'email': 'test@example.com'
    }
    
    response = requests.post("https://httpbin.org/post", data=data)
    print(f"状态码: {response.status_code}")
    if response.status_code == 200:
        result = response.json()
        print(f"表单数据: {result.get('form', {})}")
    
    # 4. 带JSON数据的POST请求
    print("
4. JSON POST请求")
    
    json_data = {
        'title': '测试文章',
        'content': '这是测试内容',
        'author': '测试作者',
        'tags': ['python', '爬虫', '测试']
    }
    
    response = requests.post(
        "https://httpbin.org/post",
        json=json_data,
        headers={'Content-Type': 'application/json'}
    )
    
    if response.status_code == 200:
        result = response.json()
        print(f"JSON数据: {result.get('json', {})}")
    
    # 5. 设置请求头
    print("
5. 自定义请求头")
    
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Accept': 'application/json',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Referer': 'https://www.example.com'
    }
    
    response = requests.get("https://httpbin.org/headers", headers=headers)
    if response.status_code == 200:
        result = response.json()
        print(f"请求头信息: {result.get('headers', {})}")
    
    # 6. 处理Cookies
    print("
6. Cookies处理")
    
    # 设置cookies
    cookies = {'session_id': 'abc123', 'user_id': '456'}
    response = requests.get("https://httpbin.org/cookies", cookies=cookies)
    if response.status_code == 200:
        result = response.json()
        print(f"发送的Cookies: {result.get('cookies', {})}")
    
    # 获取cookies
    response = requests.get("https://httpbin.org/cookies/set/sessionid/123456")
    print(f"服务器设置的Cookies: {response.cookies.get_dict()}")

def advanced_requests():
    """Requests高级功能"""
    print("
=== Requests高级功能 ===")
    
    # 1. 会话保持
    print("1. 会话保持")
    
    with requests.Session() as session:
        # 第一次请求设置cookies
        response1 = session.get("https://httpbin.org/cookies/set/sessionid/789012")
        print(f"第一次请求Cookies: {session.cookies.get_dict()}")
        
        # 第二次请求会携带cookies
        response2 = session.get("https://httpbin.org/cookies")
        if response2.status_code == 200:
            result = response2.json()
            print(f"第二次请求Cookies: {result.get('cookies', {})}")
    
    # 2. 超时设置
    print("
2. 超时设置")
    
    try:
        # 设置连接超时和读取超时
        response = requests.get("https://httpbin.org/delay/2", timeout=(3.05, 5))
        print("请求成功完成")
    except requests.exceptions.Timeout:
        print("请求超时")
    except requests.exceptions.RequestException as e:
        print(f"请求错误: {e}")
    
    # 3. 重试机制
    print("
3. 重试机制")
    
    def create_session_with_retries():
        session = requests.Session()
        
        # 定义重试策略
        retry_strategy = Retry(
            total=3,  # 总重试次数
            backoff_factor=1,  # 退避因子
            status_forcelist=[429, 500, 502, 503, 504],  # 需要重试的状态码
        )
        
        # 创建适配器
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("http://", adapter)
        session.mount("https://", adapter)
        
        return session
    
    session = create_session_with_retries()
    
    try:
        response = session.get("https://httpbin.org/status/500")
        print(f"最终状态码: {response.status_code}")
    except requests.exceptions.RequestException as e:
        print(f"请求失败: {e}")
    
    # 4. 代理设置
    print("
4. 代理设置")
    
    # 注意:这里使用免费代理示例,实际使用时需要有效的代理服务器
    proxies = {
        'http': 'http://proxy.example.com:8080',
        'https': 'https://proxy.example.com:8080',
    }
    
    # 实际使用时撤销注释
    # try:
    #     response = requests.get("https://httpbin.org/ip", proxies=proxies, timeout=10)
    #     print(f"通过代理获取的IP: {response.json()}")
    # except:
    #     print("代理请求失败")
    
    print("代理示例已注释,需要有效代理服务器才能运行")
    
    # 5. 文件下载
    print("
5. 文件下载")
    
    def download_file(url, filename):
        """下载文件"""
        try:
            response = requests.get(url, stream=True)
            response.raise_for_status()
            
            with open(filename, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
            
            print(f"文件下载成功: {filename}")
            return True
            
        except requests.exceptions.RequestException as e:
            print(f"下载失败: {e}")
            return False
    
    # 下载一个小图片示例
    image_url = "https://httpbin.org/image/png"
    download_file(image_url, "downloaded_image.png")

# 运行Requests示例
requests_basics()
advanced_requests()

练习2:BeautifulSoup网页解析

from bs4 import BeautifulSoup
import requests
import re

def beautifulsoup_basics():
    """BeautifulSoup基础使用"""
    print("=== BeautifulSoup网页解析 ===")
    
    # 1. 解析HTML字符串
    print("1. 解析HTML字符串")
    
    html_doc = """
    <html>
    <head>
        <title>测试网页</title>
    </head>
    <body>
        <div>
            <h1 id="main-title">网页标题</h1>
            <p>这是一个段落。</p>
            <p>这是另一个段落。</p>
            <ul id="list">
                <li>项目1</li>
                <li>项目2</li>
                <li>项目3</li>
            </ul>
            <a href="https://www.example.com">示例链接</a>
            <div>
                <span>嵌套内容</span>
            </div>
        </div>
    </body>
    </html>
    """
    
    soup = BeautifulSoup(html_doc, 'html.parser')
    
    # 基本操作
    print(f"网页标题: {soup.title.string}")
    print(f"第一个p标签: {soup.p.text}")
    print(f"所有p标签数量: {len(soup.find_all('p'))}")
    
    # 2. 标签选择方法
    print("
2. 标签选择方法")
    
    # 通过标签名
    print("所有li标签:")
    for li in soup.find_all('li'):
        print(f"  - {li.text}")
    
    # 通过class
    print("
class为content的标签:")
    for p in soup.find_all(class_='content'):
        print(f"  - {p.text}")
    
    # 通过id
    main_title = soup.find(id='main-title')
    print(f"
id为main-title的标签: {main_title.text}")
    
    # 通过属性
    link = soup.find('a', href='https://www.example.com')
    print(f"链接: {link.text} -> {link['href']}")
    
    # 3. CSS选择器
    print("
3. CSS选择器")
    
    # 选择所有item类
    items = soup.select('.item')
    print("CSS选择器 .item:")
    for item in items:
        print(f"  - {item.text}")
    
    # 选择active类的item
    active_item = soup.select('.item.active')
    print(f"激活的项目: {active_item[0].text if active_item else '无'}")
    
    # 选择嵌套内容
    nested = soup.select('.nested span')
    print(f"嵌套内容: {nested[0].text if nested else '无'}")
    
    # 4. 实际网页解析示例
    print("
4. 实际网页解析示例")
    
    def parse_example_website():
        """解析示例网站"""
        try:
            # 使用一个简单的测试网站
            url = "https://httpbin.org/html"
            response = requests.get(url)
            
            if response.status_code == 200:
                soup = BeautifulSoup(response.text, 'html.parser')
                
                # 提取标题
                title = soup.find('h1')
                if title:
                    print(f"页面标题: {title.text}")
                
                # 提取所有段落
                paragraphs = soup.find_all('p')
                print("页面段落:")
                for i, p in enumerate(paragraphs, 1):
                    print(f"  {i}. {p.text.strip()}")
                
                # 提取所有链接
                links = soup.find_all('a')
                print("页面链接:")
                for link in links:
                    href = link.get('href', '')
                    text = link.text.strip()
                    if text or href:
                        print(f"  - {text} -> {href}")
            
        except requests.exceptions.RequestException as e:
            print(f"请求错误: {e}")
    
    parse_example_website()

def advanced_beautifulsoup():
    """BeautifulSoup高级功能"""
    print("
=== BeautifulSoup高级功能 ===")
    
    # 创建测试HTML
    html_content = """
    <div>
        <div data-id="1">
            <h3>商品A</h3>
            <p>¥100.00</p>
            <p>这是商品A的描述</p>
            <span>热卖</span>
            <span>新品</span>
        </div>
        <div data-id="2">
            <h3>商品B</h3>
            <p>¥200.00</p>
            <p>这是商品B的描述</p>
            <span>折扣</span>
        </div>
        <div data-id="3">
            <h3>商品C</h3>
            <p>¥150.00</p>
            <p>这是商品C的描述</p>
        </div>
    </div>
    """
    
    soup = BeautifulSoup(html_content, 'html.parser')
    
    # 1. 数据提取和清洗
    print("1. 数据提取和清洗")
    
    products = []
    for product_div in soup.find_all('div', class_='product'):
        product = {}
        
        # 提取商品名称
        name_tag = product_div.find('h3')
        product['name'] = name_tag.text.strip() if name_tag else '未知'
        
        # 提取价格(使用正则表达式清理)
        price_tag = product_div.find('p', class_='price')
        if price_tag:
            # 提取数字
            price_match = re.search(r'[d.]+', price_tag.text)
            product['price'] = float(price_match.group()) if price_match else 0.0
        else:
            product['price'] = 0.0
        
        # 提取描述
        desc_tag = product_div.find('p', class_='description')
        product['description'] = desc_tag.text.strip() if desc_tag else ''
        
        # 提取标签
        tags = [tag.text.strip() for tag in product_div.find_all('span', class_='tag')]
        product['tags'] = tags
        
        # 提取自定义属性
        product['id'] = product_div.get('data-id', '')
        
        products.append(product)
    
    print("提取的商品信息:")
    for i, product in enumerate(products, 1):
        print(f"商品{i}: {product}")
    
    # 2. 导航方法
    print("
2. 导航方法")
    
    # 父节点和子节点
    first_product = soup.find('div', class_='product')
    if first_product:
        print(f"第一个商品的父节点: {first_product.parent.get('class', [])}")
        print(f"第一个商品的直接子节点:")
        for child in first_product.children:
            if child.name:  # 过滤掉文本节点
                print(f"  - {child.name}: {child.text.strip()}")
    
    # 兄弟节点
    second_product = soup.find_all('div', class_='product')[1]
    if second_product:
        next_sibling = second_product.find_next_sibling('div', class_='product')
        prev_sibling = second_product.find_previous_sibling('div', class_='product')
        print(f"第二个商品的前一个兄弟: {prev_sibling.get('data-id') if prev_sibling else '无'}")
        print(f"第二个商品的后一个兄弟: {next_sibling.get('data-id') if next_sibling else '无'}")
    
    # 3. 修改HTML
    print("
3. 修改HTML")
    
    # 修改内容
    first_h3 = soup.find('h3')
    if first_h3:
        original_text = first_h3.text
        first_h3.string = "修改后的商品A"
        print(f"修改前: {original_text}")
        print(f"修改后: {first_h3.text}")
    
    # 添加新元素
    new_product = soup.new_tag('div', **{'class': 'product', 'data-id': '4'})
    new_product.append(soup.new_tag('h3'))
    new_product.h3.string = "商品D"
    
    products_div = soup.find('div', class_='products')
    if products_div:
        products_div.append(new_product)
        print(f"添加新商品后商品数量: {len(products_div.find_all('div', class_='product'))}")
    
    # 4. 输出美化
    print("
4. 美化输出")
    print(soup.prettify()[:500] + "...")

# 运行BeautifulSoup示例
beautifulsoup_basics()
advanced_beautifulsoup()

练习3:实战爬虫项目 – 新闻爬取

import csv
import json
import os
from datetime import datetime

def news_crawler():
    """新闻爬虫实战项目"""
    print("=== 新闻爬虫实战项目 ===")
    
    # 创建数据存储目录
    os.makedirs('news_data', exist_ok=True)
    
    class NewsCrawler:
        def __init__(self):
            self.session = requests.Session()
            self.session.headers.update({
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            })
        
        def crawl_news(self, url):
            """爬取新闻页面"""
            try:
                response = self.session.get(url, timeout=10)
                response.raise_for_status()
                
                soup = BeautifulSoup(response.text, 'html.parser')
                
                # 提取新闻信息(这里使用模拟数据,实际需要根据目标网站调整)
                news_data = self.extract_news_data(soup)
                return news_data
                
            except requests.exceptions.RequestException as e:
                print(f"爬取失败 {url}: {e}")
                return None
        
        def extract_news_data(self, soup):
            """提取新闻数据(示例实现)"""
            # 这里是一个示例实现,实际爬取时需要根据目标网站结构调整
            
            # 模拟提取数据
            news = {
                'title': '示例新闻标题',
                'content': '这是新闻内容...',
                'publish_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                'author': '示例作者',
                'source': '示例来源',
                'url': 'https://example.com/news/1',
                'crawl_time': datetime.now().isoformat()
            }
            
            # 实际爬取时,需要根据网站结构提取真实数据
            # 例如:
            # title = soup.find('h1', class_='news-title')
            # content = soup.find('div', class_='news-content')
            # ...
            
            return news
        
        def save_to_json(self, news_list, filename):
            """保存到JSON文件"""
            filepath = os.path.join('news_data', filename)
            with open(filepath, 'w', encoding='utf-8') as f:
                json.dump(news_list, f, ensure_ascii=False, indent=2)
            print(f"已保存 {len(news_list)} 条新闻到 {filepath}")
        
        def save_to_csv(self, news_list, filename):
            """保存到CSV文件"""
            filepath = os.path.join('news_data', filename)
            
            if news_list:
                fieldnames = news_list[0].keys()
                with open(filepath, 'w', encoding='utf-8', newline='') as f:
                    writer = csv.DictWriter(f, fieldnames=fieldnames)
                    writer.writeheader()
                    writer.writerows(news_list)
                print(f"已保存 {len(news_list)} 条新闻到 {filepath}")
    
    # 使用示例
    crawler = NewsCrawler()
    
    # 模拟爬取多个新闻页面
    news_list = []
    base_url = "https://httpbin.org/html"  # 使用测试页面
    
    print("开始爬取新闻...")
    
    for i in range(5):  # 模拟爬取5个页面
        print(f"爬取第 {i+1} 个页面...")
        
        # 在实际项目中,这里应该是真实的新闻URL列表
        news_data = crawler.crawl_news(base_url)
        
        if news_data:
            # 为每个新闻生成唯一数据
            news_data['title'] = f"示例新闻标题 {i+1}"
            news_data['content'] = f"这是第 {i+1} 个新闻的内容..."
            news_data['url'] = f"https://example.com/news/{i+1}"
            news_list.append(news_data)
            
            # 添加延迟,避免请求过快
            time.sleep(1)
    
    # 保存数据
    if news_list:
        crawler.save_to_json(news_list, 'news.json')
        crawler.save_to_csv(news_list, 'news.csv')
        
        # 显示爬取结果
        print("
爬取结果摘要:")
        for i, news in enumerate(news_list[:3], 1):  # 显示前3条
            print(f"{i}. {news['title']}")
            print(f"   来源: {news['source']}")
            print(f"   时间: {news['publish_time']}")
            print(f"   内容预览: {news['content'][:50]}...")
            print()
    else:
        print("没有爬取到新闻数据")

# 运行新闻爬虫
news_crawler()

练习4:动态内容爬取 – Selenium

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import time

def selenium_basics():
    """Selenium基础使用"""
    print("=== Selenium动态内容爬取 ===")
    
    # 设置Chrome选项
    chrome_options = Options()
    chrome_options.add_argument('--headless')  # 无头模式
    chrome_options.add_argument('--no-sandbox')
    chrome_options.add_argument('--disable-dev-shm-usage')
    
    try:
        # 初始化浏览器驱动
        print("初始化浏览器驱动...")
        driver = webdriver.Chrome(options=chrome_options)
        
        # 1. 基本页面操作
        print("1. 基本页面操作")
        
        # 访问页面
        driver.get("https://httpbin.org/html")
        print(f"页面标题: {driver.title}")
        print(f"当前URL: {driver.current_url}")
        
        # 2. 元素查找和交互
        print("
2. 元素查找和交互")
        
        # 等待元素加载
        wait = WebDriverWait(driver, 10)
        
        # 查找元素
        try:
            # 查找h1元素
            h1_element = wait.until(
                EC.presence_of_element_located((By.TAG_NAME, "h1"))
            )
            print(f"找到h1元素: {h1_element.text}")
            
            # 查找所有p元素
            p_elements = driver.find_elements(By.TAG_NAME, "p")
            print(f"找到 {len(p_elements)} 个p元素")
            for i, p in enumerate(p_elements[:3], 1):  # 显示前3个
                print(f"  {i}. {p.text}")
                
        except Exception as e:
            print(f"元素查找错误: {e}")
        
        # 3. 执行JavaScript
        print("
3. 执行JavaScript")
        
        # 执行JavaScript代码
        script_result = driver.execute_script("return document.title;")
        print(f"通过JavaScript获取标题: {script_result}")
        
        # 滚动页面
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        print("已滚动到页面底部")
        
        # 4. 表单操作示例(在测试页面上)
        print("
4. 表单操作示例")
        
        # 访问一个包含表单的测试页面
        driver.get("https://httpbin.org/forms/post")
        
        # 查找表单元素
        try:
            # 填写输入框
            name_input = wait.until(
                EC.presence_of_element_located((By.NAME, "custname"))
            )
            name_input.clear()
            name_input.send_keys("测试用户")
            print("已填写姓名字段")
            
            # 选择单选按钮
            size_radio = driver.find_element(By.CSS_SELECTOR, "input[value='medium']")
            size_radio.click()
            print("已选择中等尺寸")
            
            # 选择复选框
            toppings = driver.find_elements(By.NAME, "topping")
            for topping in toppings[:2]:  # 选择前两个配料
                topping.click()
            print("已选择配料")
            
            # 在实际爬虫中,这里可以提交表单
            # submit_button = driver.find_element(By.TAG_NAME, "button")
            # submit_button.click()
            
        except Exception as e:
            print(f"表单操作错误: {e}")
        
        # 5. 截图功能
        print("
5. 截图功能")
        
        # 保存截图
        screenshot_path = "news_data/selenium_screenshot.png"
        driver.save_screenshot(screenshot_path)
        print(f"截图已保存到: {screenshot_path}")
        
        # 6. 处理弹窗和窗口
        print("
6. 窗口处理")
        
        # 打开新窗口
        driver.execute_script("window.open('https://httpbin.org/html');")
        print("已打开新窗口")
        
        # 切换窗口
        windows = driver.window_handles
        print(f"当前窗口数量: {len(windows)}")
        
        if len(windows) > 1:
            driver.switch_to.window(windows[1])
            print(f"切换到新窗口,标题: {driver.title}")
            
            # 关闭新窗口并切换回原窗口
            driver.close()
            driver.switch_to.window(windows[0])
            print("已关闭新窗口并切换回原窗口")
        
    except Exception as e:
        print(f"Selenium错误: {e}")
    
    finally:
        # 关闭浏览器
        if 'driver' in locals():
            driver.quit()
            print("浏览器已关闭")

def dynamic_content_crawler():
    """动态内容爬虫示例"""
    print("
=== 动态内容爬虫示例 ===")
    
    # 这个示例需要实际的动态网站,这里使用模拟说明
    
    class DynamicCrawler:
        def __init__(self):
            chrome_options = Options()
            chrome_options.add_argument('--headless')
            self.driver = webdriver.Chrome(options=chrome_options)
            self.wait = WebDriverWait(self.driver, 10)
        
        def crawl_dynamic_content(self, url):
            """爬取动态加载的内容"""
            try:
                self.driver.get(url)
                
                # 等待动态内容加载
                time.sleep(3)  # 简单等待,实际应该使用明确的等待条件
                
                # 获取渲染后的页面源码
                page_source = self.driver.page_source
                
                # 使用BeautifulSoup解析
                soup = BeautifulSoup(page_source, 'html.parser')
                
                # 提取动态加载的内容
                # 这里根据实际网站结构编写提取逻辑
                
                return soup
                
            except Exception as e:
                print(f"动态内容爬取错误: {e}")
                return None
        
        def close(self):
            """关闭浏览器"""
            self.driver.quit()
    
    # 使用说明
    print("动态内容爬虫使用说明:")
    print("1. 对于JavaScript渲染的内容,使用Selenium")
    print("2. 等待动态内容加载完成")
    print("3. 获取渲染后的页面源码")
    print("4. 使用BeautifulSoup解析内容")
    print("5. 注意添加适当的延迟和等待条件")
    
    # 由于需要实际网站,这里不执行具体爬取
    print("
注意: 实际动态爬虫需要指定具体的目标网站")

# 运行Selenium示例(需要安装ChromeDriver)
try:
    selenium_basics()
    dynamic_content_crawler()
except Exception as e:
    print(f"Selenium示例运行失败: {e}")
    print("请确保已安装Chrome浏览器和ChromeDriver")

练习5:API数据采集

def api_data_collection():
    """API数据采集"""
    print("=== API数据采集 ===")
    
    # 1. 公共API数据采集
    print("1. 公共API数据采集")
    
    class APICollector:
        def __init__(self):
            self.session = requests.Session()
            self.session.headers.update({
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
                'Accept': 'application/json'
            })
        
        def get_public_data(self, api_url, params=None):
            """获取公共API数据"""
            try:
                response = self.session.get(api_url, params=params, timeout=10)
                response.raise_for_status()
                
                if response.headers.get('Content-Type', '').startswith('application/json'):
                    return response.json()
                else:
                    return response.text
                    
            except requests.exceptions.RequestException as e:
                print(f"API请求错误: {e}")
                return None
        
        def collect_weather_data(self, city='Beijing'):
            """收集天气数据示例"""
            # 使用开放的天气API(示例,需要注册获取API key)
            # 这里使用模拟数据
            
            print(f"获取 {city} 的天气数据...")
            
            # 模拟天气数据
            weather_data = {
                'city': city,
                'temperature': 25.5,
                'humidity': 60,
                'description': '晴朗',
                'wind_speed': 3.2,
                'timestamp': datetime.now().isoformat()
            }
            
            return weather_data
        
        def collect_financial_data(self, symbol='AAPL'):
            """收集金融数据示例"""
            print(f"获取 {symbol} 的金融数据...")
            
            # 模拟金融数据
            financial_data = {
                'symbol': symbol,
                'price': 150.25,
                'change': 2.5,
                'change_percent': 1.69,
                'volume': 12500000,
                'timestamp': datetime.now().isoformat()
            }
            
            return financial_data
        
        def save_api_data(self, data, filename):
            """保存API数据"""
            os.makedirs('api_data', exist_ok=True)
            filepath = os.path.join('api_data', filename)
            
            with open(filepath, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
            
            print(f"数据已保存到: {filepath}")
    
    # 使用示例
    collector = APICollector()
    
    # 收集多种数据
    weather_data = collector.collect_weather_data('Beijing')
    financial_data = collector.collect_financial_data('AAPL')
    
    # 保存数据
    if weather_data:
        collector.save_api_data(weather_data, 'weather.json')
    
    if financial_data:
        collector.save_api_data(financial_data, 'financial.json')
    
    # 2. 分页数据采集
    print("
2. 分页数据采集")
    
    def paginated_api_collection():
        """分页API数据采集"""
        
        all_data = []
        
        for page in range(1, 4):  # 模拟采集3页数据
            print(f"采集第 {page} 页数据...")
            
            # 模拟API响应
            page_data = {
                'page': page,
                'data': [
                    {'id': i, 'name': f'项目{(page-1)*10 + i}'} 
                    for i in range(1, 11)  # 每页10条数据
                ],
                'total_pages': 3,
                'total_items': 30
            }
            
            all_data.extend(page_data['data'])
            
            # 添加延迟避免请求过快
            time.sleep(1)
            
            # 检查是否还有下一页
            if page >= page_data['total_pages']:
                break
        
        print(f"共采集 {len(all_data)} 条数据")
        return all_data
    
    paginated_data = paginated_api_collection()
    collector.save_api_data(paginated_data, 'paginated_data.json')
    
    # 3. 实时数据监控
    print("
3. 实时数据监控")
    
    class RealTimeMonitor:
        def __init__(self):
            self.data_points = []
        
        def monitor_api(self, api_url, interval=60, duration=300):
            """监控API数据"""
            print(f开始监控 {api_url},间隔 {interval}秒,持续 {duration}秒")
            
            start_time = time.time()
            point_count = 0
            
            while time.time() - start_time < duration:
                # 获取数据
                data = collector.get_public_data(api_url)
                if data:
                    monitoring_point = {
                        'timestamp': datetime.now().isoformat(),
                        'data': data,
                        'point_number': point_count
                    }
                    self.data_points.append(monitoring_point)
                    point_count += 1
                    
                    print(f"采集第 {point_count} 个数据点")
                
                # 等待下一次采集
                time.sleep(interval)
            
            print(f"监控结束,共采集 {point_count} 个数据点")
            return self.data_points
    
    # 模拟实时监控
    monitor = RealTimeMonitor()
    # 实际使用时撤销注释
    # monitoring_data = monitor.monitor_api('https://api.example.com/data', 10, 60)
    
    print("实时监控示例已准备就绪")

# 运行API数据采集
api_data_collection()

练习6:爬虫伦理与最佳实践

def crawler_ethics_best_practices():
    """爬虫伦理与最佳实践"""
    print("=== 爬虫伦理与最佳实践 ===")
    
    # 1. 遵守robots.txt
    print("1. 遵守robots.txt")
    
    def check_robots_txt(base_url):
        """检查robots.txt"""
        robots_url = f"{base_url}/robots.txt"
        
        try:
            response = requests.get(robots_url, timeout=5)
            if response.status_code == 200:
                print(f"{base_url} 的robots.txt内容:")
                print(response.text[:500] + "..." if len(response.text) > 500 else response.text)
            else:
                print(f"无法获取 {robots_url},状态码: {response.status_code}")
                
        except requests.exceptions.RequestException as e:
            print(f"获取robots.txt失败: {e}")
    
    # 检查示例网站的robots.txt
    check_robots_txt("https://www.example.com")
    
    # 2. 设置合理的请求间隔
    print("
2. 请求频率控制")
    
    class PoliteCrawler:
        def __init__(self, delay=1.0):
            self.delay = delay  # 请求间隔(秒)
            self.last_request_time = 0
            self.session = requests.Session()
        
        def polite_get(self, url, **kwargs):
            """礼貌的GET请求"""
            # 计算需要等待的时间
            current_time = time.time()
            time_since_last = current_time - self.last_request_time
            wait_time = max(0, self.delay - time_since_last)
            
            if wait_time > 0:
                print(f"等待 {wait_time:.2f} 秒...")
                time.sleep(wait_time)
            
            self.last_request_time = time.time()
            return self.session.get(url, **kwargs)
    
    # 使用礼貌爬虫
    polite_crawler = PoliteCrawler(delay=2.0)
    
    print("使用礼貌爬虫示例:")
    for i in range(3):
        print(f"第 {i+1} 次请求...")
        try:
            response = polite_crawler.polite_get("https://httpbin.org/delay/1")
            print(f"状态码: {response.status_code}")
        except Exception as e:
            print(f"请求失败: {e}")
    
    # 3. 错误处理和重试机制
    print("
3. 健壮的错误处理")
    
    def robust_crawler(url, max_retries=3):
        """健壮的爬虫函数"""
        for attempt in range(max_retries):
            try:
                response = requests.get(url, timeout=10)
                response.raise_for_status()
                return response
                
            except requests.exceptions.Timeout:
                print(f"请求超时,第 {attempt+1} 次重试...")
            except requests.exceptions.HTTPError as e:
                print(f"HTTP错误: {e}")
                if e.response.status_code == 404:
                    print("页面不存在,停止重试")
                    break
            except requests.exceptions.RequestException as e:
                print(f"请求错误: {e}")
            
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # 指数退避
                print(f"等待 {wait_time} 秒后重试...")
                time.sleep(wait_time)
        
        print(f"经过 {max_retries} 次尝试后依旧失败")
        return None
    
    # 测试健壮爬虫
    print("测试健壮爬虫:")
    response = robust_crawler("https://httpbin.org/status/500")  # 模拟错误
    
    # 4. 数据存储最佳实践
    print("
4. 数据存储最佳实践")
    
    class DataManager:
        def __init__(self, base_dir='crawled_data'):
            self.base_dir = base_dir
            os.makedirs(base_dir, exist_ok=True)
        
        def save_data(self, data, filename, format='json'):
            """保存数据"""
            filepath = os.path.join(self.base_dir, filename)
            
            try:
                if format == 'json':
                    with open(filepath, 'w', encoding='utf-8') as f:
                        json.dump(data, f, ensure_ascii=False, indent=2)
                elif format == 'csv' and isinstance(data, list) and data:
                    with open(filepath, 'w', encoding='utf-8', newline='') as f:
                        writer = csv.DictWriter(f, fieldnames=data[0].keys())
                        writer.writeheader()
                        writer.writerows(data)
                
                print(f"数据已保存: {filepath}")
                return True
                
            except Exception as e:
                print(f"保存数据失败: {e}")
                return False
        
        def backup_data(self):
            """数据备份"""
            backup_dir = f"{self.base_dir}_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
            os.makedirs(backup_dir, exist_ok=True)
            
            # 模拟备份过程
            print(f"数据已备份到: {backup_dir}")
            return backup_dir
    
    # 使用数据管理器
    data_manager = DataManager()
    
    sample_data = [
        {'id': 1, 'name': '测试数据1', 'value': 100},
        {'id': 2, 'name': '测试数据2', 'value': 200}
    ]
    
    data_manager.save_data(sample_data, 'sample.json')
    data_manager.save_data(sample_data, 'sample.csv', format='csv')
    data_manager.backup_data()
    
    # 5. 法律和伦理思考
    print("
5. 法律和伦理思考")
    print("重大原则:")
    print("✓ 尊重网站的使用条款")
    print("✓ 遵守robots.txt协议")
    print("✓ 设置合理的请求频率")
    print("✓ 不爬取敏感或个人隐私数据")
    print("✓ 不将爬取数据用于非法用途")
    print("✓ 尊重知识产权")
    print("✓ 明确标识爬虫身份(User-Agent)")

# 运行爬虫伦理与最佳实践
crawler_ethics_best_practices()

今日挑战:

创建一个完整的爬虫项目,爬取一个实际网站的数据并进行存储和分析。

# 挑战练习:完整爬虫项目 - 图书信息爬取
def complete_crawler_project():
    """完整爬虫项目:图书信息爬取"""
    print("=== 完整爬虫项目:图书信息爬取 ===")
    
    class BookCrawler:
        def __init__(self):
            self.session = requests.Session()
            self.session.headers.update({
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
                'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'
            })
            self.data_manager = DataManager('book_data')
        
        def crawl_book_list(self, base_url, pages=3):
            """爬取图书列表"""
            print(f"开始爬取图书列表,共 {pages} 页...")
            
            all_books = []
            
            for page in range(1, pages + 1):
                print(f"爬取第 {page} 页...")
                
                # 构建URL(这里使用模拟URL)
                url = f"{base_url}?page={page}"
                
                try:
                    response = self.session.get(url, timeout=10)
                    response.raise_for_status()
                    
                    # 解析页面
                    soup = BeautifulSoup(response.text, 'html.parser')
                    
                    # 提取图书信息(模拟实现)
                    books = self.extract_books_from_page(soup, page)
                    all_books.extend(books)
                    
                    print(f"第 {page} 页爬取完成,获得 {len(books)} 本图书")
                    
                    # 礼貌延迟
                    time.sleep(2)
                    
                except Exception as e:
                    print(f"爬取第 {page} 页失败: {e}")
                    continue
            
            return all_books
        
        def extract_books_from_page(self, soup, page_num):
            """从页面提取图书信息(模拟实现)"""
            # 在实际项目中,这里需要根据目标网站的实际HTML结构编写提取逻辑
            # 这里使用模拟数据
            
            books = []
            for i in range(5):  # 每页模拟5本书
                book = {
                    'id': f"book_{page_num}_{i+1}",
                    'title': f"图书标题 {page_num}-{i+1}",
                    'author': f"作者 {i+1}",
                    'price': round(20 + i * 5 + page_num, 2),
                    'rating': round(3 + i * 0.5, 1),
                    'description': f"这是第 {page_num} 页第 {i+1} 本书的描述",
                    'category': ['小说', '文学', '科技'][i % 3],
                    'publish_date': f"2023-{page_num:02d}-{i+1:02d}",
                    'page_count': 200 + i * 50,
                    'crawl_time': datetime.now().isoformat()
                }
                books.append(book)
            
            return books
        
        def crawl_book_detail(self, book_id):
            """爬取图书详细信息"""
            print(f"爬取图书详情: {book_id}")
            
            # 模拟详细爬取
            detail = {
                'book_id': book_id,
                'title': f"图书标题 {book_id}",
                'isbn': f"978-7-{book_id.replace('_', '-')}",
                'publisher': "示例出版社",
                'summary': "这是图书的详细摘要...",
                'chapters': [f"第{i+1}章" for i in range(10)],
                'reviews': [
                    {'user': '用户1', 'rating': 5, 'comment': '很好'},
                    {'user': '用户2', 'rating': 4, 'comment': '不错'}
                ]
            }
            
            time.sleep(1)  # 延迟
            return detail
        
        def analyze_books(self, books):
            """分析图书数据"""
            print("
进行图书数据分析...")
            
            if not books:
                print("没有数据可分析")
                return
            
            # 基本统计
            total_books = len(books)
            avg_price = sum(book['price'] for book in books) / total_books
            avg_rating = sum(book['rating'] for book in books) / total_books
            
            print(f"图书总数: {total_books}")
            print(f"平均价格: ¥{avg_price:.2f}")
            print(f"平均评分: {avg_rating:.1f}")
            
            # 价格分布
            price_ranges = {'0-20': 0, '21-40': 0, '41-60': 0, '61+': 0}
            for book in books:
                price = book['price']
                if price <= 20:
                    price_ranges['0-20'] += 1
                elif price <= 40:
                    price_ranges['21-40'] += 1
                elif price <= 60:
                    price_ranges['41-60'] += 1
                else:
                    price_ranges['61+'] += 1
            
            print("
价格分布:")
            for range_name, count in price_ranges.items():
                percentage = (count / total_books) * 100
                print(f"  {range_name}元: {count}本 ({percentage:.1f}%)")
            
            # 评分分布
            rating_counts = {}
            for book in books:
                rating = book['rating']
                rating_key = f"{rating:.1f}"
                rating_counts[rating_key] = rating_counts.get(rating_key, 0) + 1
            
            print("
评分分布:")
            for rating, count in sorted(rating_counts.items()):
                percentage = (count / total_books) * 100
                print(f"  {rating}星: {count}本 ({percentage:.1f}%)")
            
            return {
                'total_books': total_books,
                'avg_price': avg_price,
                'avg_rating': avg_rating,
                'price_distribution': price_ranges,
                'rating_distribution': rating_counts
            }
        
        def generate_report(self, books, analysis):
            """生成爬虫报告"""
            print("
生成爬虫报告...")
            
            report = {
                'crawl_info': {
                    'crawl_time': datetime.now().isoformat(),
                    'total_books_crawled': len(books),
                    'data_source': '模拟数据'
                },
                'analysis_results': analysis,
                'sample_data': books[:3]  # 包含3个样本
            }
            
            # 保存报告
            self.data_manager.save_data(report, 'crawl_report.json')
            
            # 保存完整数据
            self.data_manager.save_data(books, 'books.json')
            self.data_manager.save_data(books, 'books.csv', format='csv')
            
            # 备份数据
            self.data_manager.backup_data()
            
            return report
        
        def run(self):
            """运行完整爬虫流程"""
            print("启动图书爬虫...")
            
            # 1. 爬取图书列表
            base_url = "https://example.com/books"  # 模拟URL
            books = self.crawl_book_list(base_url, pages=3)
            
            if not books:
                print("没有爬取到图书数据")
                return
            
            # 2. 爬取详细信息(可选)
            detailed_books = []
            for book in books[:2]:  # 只爬取前2本的详细信息作为示例
                detail = self.crawl_book_detail(book['id'])
                detailed_book = {**book, **detail}
                detailed_books.append(detailed_book)
            
            # 3. 数据分析
            analysis = self.analyze_books(books)
            
            # 4. 生成报告
            report = self.generate_report(books, analysis)
            
            print("
" + "="*50)
            print("爬虫项目完成!")
            print("="*50)
            print(f"爬取图书: {len(books)} 本")
            print(f"详细爬取: {len(detailed_books)} 本")
            print(f"平均价格: ¥{analysis['avg_price']:.2f}")
            print(f"平均评分: {analysis['avg_rating']:.1f}")
            print(f"数据保存位置: book_data/")
            
            return report
    
    # 运行爬虫项目
    crawler = BookCrawler()
    report = crawler.run()
    
    return report

# 运行完整爬虫项目
crawler_report = complete_crawler_project()

学习提示:

  1. Requests库:HTTP请求基础,会话管理,错误处理
  2. BeautifulSoup:HTML解析,CSS选择器,数据提取
  3. Selenium:动态内容爬取,浏览器自动化
  4. API采集:RESTful API调用,分页处理,数据格式
  5. 爬虫伦理:遵守robots.txt,设置合理延迟,尊重版权
  6. 数据存储:JSON、CSV文件存储,数据备份
  7. 项目管理:代码组织,错误处理,日志记录
  8. 法律合规:了解相关法律法规,避免法律风险

明天我们将学习自动化运维和系统管理!坚持练习,你的爬虫技能会越来越强!

© 版权声明

相关文章

1 条评论

您必须登录才能参与评论!
立即登录
  • 头像
    魅灬尐艳 读者

    收藏了,感谢分享

    无记录