Redis有1亿个Key,如何快速找出特定前缀的10万条?

内容分享4天前发布
0 5 0

当Redis中有1亿个key时,使用KEYS命令查找特定前缀的key会导致严重的性能问题,甚至造成服务阻塞。下面通过对比分析,展示最优解决方案。

错误做法:KEYS命令

# 危险!不要在生产环境使用
import redis

r = redis.Redis(host='localhost', port=6379)

# 这会阻塞Redis服务器,导致服务不可用
keys = r.keys('user:*')  # 查找所有以user:开头的key

问题:KEYS命令会遍历所有key,时间复杂度O(n),在1亿key的场景下会造成长时间阻塞。

最优解决方案:SCAN命令

基础用法示例

import redis

def find_keys_by_prefix(host, port, prefix, target_count=100000):
    r = redis.Redis(host=host, port=port)
    found_keys = []
    cursor = 0
    
    while len(found_keys) < target_count:
        cursor, keys = r.scan(cursor=cursor, match=f'{prefix}*', count=1000)
        found_keys.extend(keys)
        
        if cursor == 0:  # 遍历完成
            break
            
        # 限制每次查找数量,避免内存溢出
        if len(found_keys) >= target_count:
            found_keys = found_keys[:target_count]
            break
    
    return found_keys

# 使用示例
keys = find_keys_by_prefix('localhost', 6379, 'user:', 100000)
print(f"找到 {len(keys)} 个key")

生产环境优化版本

import redis
from typing import List

class RedisKeyFinder:
    def __init__(self, host='localhost', port=6379, password=None):
        self.redis_client = redis.Redis(
            host=host, 
            port=port, 
            password=password,
            decode_responses=True
        )
    
    def find_keys_by_prefix_with_stats(self, prefix: str, target_count: int = 100000, 
                                     batch_size: int = 500) -> List[str]:
        """
        使用SCAN命令安全查找key,并返回统计信息
        
        Args:
            prefix: key前缀
            target_count: 目标查找数量
            batch_size: 每次SCAN的数量
            
        Returns:
            找到的key列表
        """
        found_keys = []
        cursor = 0
        total_scanned = 0
        iterations = 0
        
        print(f"开始查找前缀为 '{prefix}' 的key,目标数量: {target_count}")
        
        try:
            while len(found_keys) < target_count:
                iterations += 1
                
                # 执行SCAN命令
                cursor, keys = self.redis_client.scan(
                    cursor=cursor,
                    match=f'{prefix}*',
                    count=batch_size
                )
                
                total_scanned += len(keys)
                found_keys.extend(keys)
                
                # 每10次迭代输出进度
                if iterations % 10 == 0:
                    print(f"进度: 已扫描 {total_scanned} 个key, 找到 {len(found_keys)} 个匹配key")
                
                # 扫描完成
                if cursor == 0:
                    print(f"扫描完成! 总共扫描 {total_scanned} 个key")
                    break
                
                # 达到目标数量
                if len(found_keys) >= target_count:
                    found_keys = found_keys[:target_count]
                    print(f"达到目标数量! 总共扫描 {total_scanned} 个key")
                    break
            
            return found_keys
            
        except Exception as e:
            print(f"查找过程中发生错误: {e}")
            return found_keys

# 使用案例
finder = RedisKeyFinder('localhost', 6379)

# 查找用户相关的key
user_keys = finder.find_keys_by_prefix_with_stats('user:', 100000)

# 查找会话相关的key  
session_keys = finder.find_keys_by_prefix_with_stats('session:', 50000)

性能对比测试

import time
import redis

def performance_comparison():
    r = redis.Redis(host='localhost', port=6379)
    
    # 测试SCAN性能
    start_time = time.time()
    
    keys_scan = []
    cursor = 0
    while len(keys_scan) < 100000:
        cursor, keys = r.scan(cursor=cursor, match='user:*', count=1000)
        keys_scan.extend(keys)
        if cursor == 0:
            break
    
    scan_time = time.time() - start_time
    
    print(f"SCAN命令查找10万个key耗时: {scan_time:.2f}秒")
    print(f"找到 {len(keys_scan)} 个key")

# 运行性能测试
performance_comparison()

进阶技巧:并行SCAN

对于超大规模Redis实例,可以使用并行SCAN提高效率:

import redis
import concurrent.futures

class ParallelRedisScanner:
    def __init__(self, host='localhost', port=6379, cluster_mode=False):
        self.cluster_mode = cluster_mode
        if cluster_mode:
            from rediscluster import RedisCluster
            self.client = RedisCluster(host=host, port=port)
        else:
            self.client = redis.Redis(host=host, port=port)
    
    def parallel_scan(self, prefix, target_count=100000, threads=4):
        """
        并行SCAN,适用于集群模式或大实例
        """
        if self.cluster_mode:
            return self._cluster_parallel_scan(prefix, target_count, threads)
        else:
            return self._single_instance_parallel_scan(prefix, target_count, threads)
    
    def _single_instance_parallel_scan(self, prefix, target_count, threads):
        """
        单实例并行SCAN
        """
        def scan_segment(segment):
            keys_found = []
            cursor = 0
            while len(keys_found) < (target_count // threads):
                cursor, keys = self.client.scan(
                    cursor=cursor, 
                    match=f'{prefix}*', 
                    count=1000
                )
                keys_found.extend(keys)
                if cursor == 0:
                    break
            return keys_found
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
            results = list(executor.map(scan_segment, range(threads)))
        
        # 合并结果
        all_keys = []
        for result in results:
            all_keys.extend(result)
        
        return all_keys[:target_count]

# 使用并行扫描
scanner = ParallelRedisScanner('localhost', 6379)
keys = scanner.parallel_scan('user:', 100000, threads=4)

Lua脚本优化方案

import redis

class LuaScanner:
    def __init__(self, host='localhost', port=6379):
        self.client = redis.Redis(host=host, port=6379)
        
        # 定义Lua脚本,在服务端执行SCAN
        self.lua_script = """
        local prefix = ARGV[1]
        local target_count = tonumber(ARGV[2])
        local batch_size = tonumber(ARGV[3])
        
        local cursor = 0
        local results = {}
        local iterations = 0
        
        repeat
            iterations = iterations + 1
            local scan_result = redis.call('SCAN', cursor, 'MATCH', prefix, 'COUNT', batch_size)
            cursor = tonumber(scan_result[1])
            local keys = scan_result[2]
            
            for i, key in ipairs(keys) do
                table.insert(results, key)
                if #results >= target_count then
                    break
                end
            end
            
            if #results >= target_count then
                break
            end
            
            -- 防止无限循环
            if iterations > 1000 then
                break
            end
        until cursor == 0
        
        return results
        """
    
    def scan_with_lua(self, prefix, target_count=100000, batch_size=1000):
        """
        使用Lua脚本在服务端执行SCAN,减少网络往返
        """
        script = self.client.register_script(self.lua_script)
        result = script(args=[f'{prefix}*', target_count, batch_size])
        return result

# 使用Lua脚本扫描
lua_scanner = LuaScanner()
keys = lua_scanner.scan_with_lua('user:', 100000)

最佳实践总结

  1. 永远不要使用KEYS命令在生产环境
  2. 合理设置COUNT参数:根据实际情况调整(一般500-5000)
  3. 客户端处理超时:添加适当的超时和重试机制
  4. 监控扫描进度:实时输出扫描状态
  5. 内存管理:分批处理结果,避免内存溢出
  6. 错误处理:妥善处理连接中断等异常情况
# 完整的生产级解决方案
def production_ready_scan(redis_client, prefix, target_count, 
                         batch_size=1000, timeout=300):
    """
    生产环境就绪的SCAN实现
    """
    import time
    
    start_time = time.time()
    found_keys = []
    cursor = 0
    iterations = 0
    
    try:
        while len(found_keys) < target_count:
            # 检查超时
            if time.time() - start_time > timeout:
                print(f"扫描超时,已找到 {len(found_keys)} 个key")
                break
            
            iterations += 1
            cursor, keys = redis_client.scan(
                cursor=cursor,
                match=f'{prefix}*',
                count=batch_size
            )
            
            found_keys.extend(keys)
            
            # 输出进度
            if iterations % 20 == 0:
                elapsed = time.time() - start_time
                print(f"进度: {len(found_keys)}/{target_count} | "
                      f"耗时: {elapsed:.1f}s | 迭代: {iterations}次")
            
            if cursor == 0:
                break
                
            # 达到目标数量
            if len(found_keys) >= target_count:
                found_keys = found_keys[:target_count]
                break
        
        total_time = time.time() - start_time
        print(f"扫描完成: 找到 {len(found_keys)} 个key, 耗时 {total_time:.2f}秒")
        return found_keys
        
    except Exception as e:
        print(f"扫描失败: {e}")
        return found_keys

通过上述方案,你可以在不影响Redis服务性能的前提下,高效地从1亿个key中找出特定前缀的10万条记录。

© 版权声明

相关文章

5 条评论

您必须登录才能参与评论!
立即登录