WebFlux入门到精通系列 – WebFlux高级特性 (路由函数、过滤器、异常处理、SSE、WebSocket和响应式安全)

不废话,直接开干


第4章 WebFlux高级特性(续)

4.3 Server-Sent Events(SSE)

SSE基础概念


@RestController
@RequestMapping("/api/sse")
public class ServerSentEventsExplanation {
    
    /**
     * Server-Sent Events (SSE) 特点:
     * - 服务器向客户端推送数据
     * - 基于HTTP协议,单向通信
     * - 自动重连机制
     * - 适合实时通知、日志流、进度更新等场景
     */
    
    @GetMapping(value = "/stock-prices", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<StockPrice>> streamStockPrices() {
        return Flux.interval(Duration.ofSeconds(2))
                .map(sequence -> {
                    // 模拟股票价格变化
                    StockPrice price = generateRandomStockPrice();
                    
                    return ServerSentEvent.<StockPrice>builder()
                            .id(String.valueOf(sequence))
                            .event("price-update")
                            .data(price)
                            .comment("股票价格更新")
                            .retry(Duration.ofSeconds(5))
                            .build();
                })
                .doOnCancel(() -> System.out.println("客户端断开连接"))
                .doOnComplete(() -> System.out.println("数据流结束"))
                .doOnError(error -> System.err.println("流错误: " + error.getMessage()));
    }
    
    @GetMapping(value = "/system-logs", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> streamSystemLogs() {
        return Flux.interval(Duration.ofSeconds(1))
                .map(sequence -> {
                    String logEntry = generateLogEntry(sequence);
                    
                    return ServerSentEvent.<String>builder()
                            .id("log-" + sequence)
                            .event("log-entry")
                            .data(logEntry)
                            .comment("系统日志")
                            .build();
                })
                .take(50) // 限制日志数量
                .doOnNext(event -> 
                    System.out.println("发送日志: " + event.data()));
    }
    
    @GetMapping(value = "/progress/{taskId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<Progress>> streamTaskProgress(@PathVariable String taskId) {
        return Flux.range(0, 101) // 0-100%
                .delayElements(Duration.ofMillis(200))
                .map(progress -> {
                    Progress progressObj = new Progress(taskId, progress, "处理中...");
                    
                    if (progress == 100) {
                        progressObj.setMessage("任务完成");
                    } else if (progress % 10 == 0) {
                        progressObj.setMessage("已完成 " + progress + "%");
                    }
                    
                    return ServerSentEvent.<Progress>builder()
                            .id("progress-" + progress)
                            .event("progress-update")
                            .data(progressObj)
                            .build();
                })
                .doOnSubscribe(subscription -> 
                    System.out.println("开始监控任务进度: " + taskId))
                .doOnComplete(() -> 
                    System.out.println("任务完成: " + taskId));
    }
    
    /**
     * 带过滤的SSE流
     */
    @GetMapping(value = "/notifications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<Notification>> streamNotifications(
            @RequestParam(required = false) String type,
            @RequestParam(required = false) String priority) {
        
        return Flux.interval(Duration.ofSeconds(3))
                .map(sequence -> generateNotification(sequence))
                .filter(notification -> {
                    // 根据查询参数过滤通知
                    if (type != null && !type.equals(notification.getType())) {
                        return false;
                    }
                    if (priority != null && !priority.equals(notification.getPriority())) {
                        return false;
                    }
                    return true;
                })
                .map(notification -> 
                    ServerSentEvent.<Notification>builder()
                            .id(notification.getId())
                            .event(notification.getType())
                            .data(notification)
                            .build())
                .doOnNext(event -> 
                    System.out.println("发送通知: " + event.data().getTitle()));
    }
    
    /**
     * 支持多个事件类型的SSE流
     */
    @GetMapping(value = "/multi-events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<?>> streamMultipleEvents() {
        Flux<ServerSentEvent<?>> userEvents = Flux.interval(Duration.ofSeconds(5))
                .map(seq -> ServerSentEvent.builder()
                        .event("user-online")
                        .data(new UserOnlineEvent("user" + seq, System.currentTimeMillis()))
                        .build());
        
        Flux<ServerSentEvent<?>> systemEvents = Flux.interval(Duration.ofSeconds(10))
                .map(seq -> ServerSentEvent.builder()
                        .event("system-alert")
                        .data(new SystemAlert("警告", "系统运行正常"))
                        .build());
        
        Flux<ServerSentEvent<?>> chatEvents = Flux.interval(Duration.ofSeconds(2))
                .map(seq -> ServerSentEvent.builder()
                        .event("chat-message")
                        .data(new ChatMessage("用户" + seq, "消息内容 " + seq))
                        .build());
        
        return Flux.merge(userEvents, systemEvents, chatEvents)
                .doOnNext(event -> 
                    System.out.println("发送事件: " + event.event() + " - " + event.data()));
    }
    
    // 辅助方法
    private StockPrice generateRandomStockPrice() {
        String[] symbols = {"AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"};
        String symbol = symbols[(int) (Math.random() * symbols.length)];
        double price = 100 + Math.random() * 1000;
        double change = (Math.random() - 0.5) * 20;
        
        return new StockPrice(symbol, price, change);
    }
    
    private String generateLogEntry(long sequence) {
        String[] levels = {"INFO", "WARN", "ERROR"};
        String[] messages = {
            "用户登录成功",
            "数据库查询完成",
            "缓存更新",
            "API调用开始",
            "任务调度执行"
        };
        
        String level = levels[(int) (Math.random() * levels.length)];
        String message = messages[(int) (Math.random() * messages.length)];
        
        return String.format("[%s] %s - %s", level, new Date(), message);
    }
    
    private Notification generateNotification(long sequence) {
        String[] types = {"info", "warning", "error", "success"};
        String[] priorities = {"low", "medium", "high"};
        String[] titles = {
            "系统维护通知",
            "新消息到达",
            "任务完成提醒",
            "安全警告"
        };
        
        return new Notification(
            "notif-" + sequence,
            titles[(int) (sequence % titles.length)],
            "通知内容 " + sequence,
            types[(int) (sequence % types.length)],
            priorities[(int) (sequence % priorities.length)]
        );
    }
}

// SSE数据模型类
class StockPrice {
    private String symbol;
    private double price;
    private double change;
    
    public StockPrice(String symbol, double price, double change) {
        this.symbol = symbol;
        this.price = price;
        this.change = change;
    }
    
    // getters and setters
    public String getSymbol() { return symbol; }
    public void setSymbol(String symbol) { this.symbol = symbol; }
    
    public double getPrice() { return price; }
    public void setPrice(double price) { this.price = price; }
    
    public double getChange() { return change; }
    public void setChange(double change) { this.change = change; }
}

class Progress {
    private String taskId;
    private int percentage;
    private String message;
    
    public Progress(String taskId, int percentage, String message) {
        this.taskId = taskId;
        this.percentage = percentage;
        this.message = message;
    }
    
    // getters and setters
    public String getTaskId() { return taskId; }
    public void setTaskId(String taskId) { this.taskId = taskId; }
    
    public int getPercentage() { return percentage; }
    public void setPercentage(int percentage) { this.percentage = percentage; }
    
    public String getMessage() { return message; }
    public void setMessage(String message) { this.message = message; }
}

class Notification {
    private String id;
    private String title;
    private String content;
    private String type;
    private String priority;
    
    public Notification(String id, String title, String content, String type, String priority) {
        this.id = id;
        this.title = title;
        this.content = content;
        this.type = type;
        this.priority = priority;
    }
    
    // getters and setters
    public String getId() { return id; }
    public void setId(String id) { this.id = id; }
    
    public String getTitle() { return title; }
    public void setTitle(String title) { this.title = title; }
    
    public String getContent() { return content; }
    public void setContent(String content) { this.content = content; }
    
    public String getType() { return type; }
    public void setType(String type) { this.type = type; }
    
    public String getPriority() { return priority; }
    public void setPriority(String priority) { this.priority = priority; }
}

class UserOnlineEvent {
    private String username;
    private long timestamp;
    
    public UserOnlineEvent(String username, long timestamp) {
        this.username = username;
        this.timestamp = timestamp;
    }
    
    // getters and setters
}

class SystemAlert {
    private String level;
    private String message;
    
    public SystemAlert(String level, String message) {
        this.level = level;
        this.message = message;
    }
    
    // getters and setters
}

class ChatMessage {
    private String user;
    private String content;
    
    public ChatMessage(String user, String content) {
        this.user = user;
        this.content = content;
    }
    
    // getters and setters
}

SSE客户端示例


<!DOCTYPE html>
<html>
<head>
    <title>SSE客户端示例</title>
    <style>
        .event-container { margin: 20px; padding: 10px; border: 1px solid #ccc; }
        .event { margin: 5px 0; padding: 5px; background: #f5f5f5; }
        .price-update { border-left: 3px solid #4CAF50; }
        .log-entry { border-left: 3px solid #2196F3; }
        .progress-update { border-left: 3px solid #FF9800; }
    </style>
</head>
<body>
    <h1>Server-Sent Events 演示</h1>
    
    <div class="event-container">
        <h3>股票价格</h3>
        <button onclick="connectStockPrices()">连接股票价格流</button>
        <button onclick="disconnectStockPrices()">断开连接</button>
        <div id="stock-prices"></div>
    </div>
    
    <div class="event-container">
        <h3>系统日志</h3>
        <button onclick="connectSystemLogs()">连接系统日志流</button>
        <button onclick="disconnectSystemLogs()">断开连接</button>
        <div id="system-logs"></div>
    </div>
    
    <div class="event-container">
        <h3>任务进度</h3>
        <button onclick="startProgress('task-123')">开始监控任务进度</button>
        <div id="progress-updates"></div>
    </div>

    <script>
        let stockEventSource = null;
        let logEventSource = null;
        let progressEventSource = null;
        
        function connectStockPrices() {
            if (stockEventSource) return;
            
            stockEventSource = new EventSource('/api/sse/stock-prices');
            
            stockEventSource.addEventListener('price-update', function(event) {
                const data = JSON.parse(event.data);
                const element = document.createElement('div');
                element.className = 'event price-update';
                element.innerHTML = `
                    <strong>${data.symbol}</strong>: $${data.price.toFixed(2)} 
                    (${data.change > 0 ? '+' : ''}${data.change.toFixed(2)})
                    <small>${new Date().toLocaleTimeString()}</small>
                `;
                document.getElementById('stock-prices').prepend(element);
            });
            
            stockEventSource.onerror = function(event) {
                console.error('股票价格流错误:', event);
            };
            
            stockEventSource.onopen = function(event) {
                console.log('股票价格流已连接');
            };
        }
        
        function disconnectStockPrices() {
            if (stockEventSource) {
                stockEventSource.close();
                stockEventSource = null;
                console.log('股票价格流已断开');
            }
        }
        
        function connectSystemLogs() {
            if (logEventSource) return;
            
            logEventSource = new EventSource('/api/sse/system-logs');
            
            logEventSource.addEventListener('log-entry', function(event) {
                const element = document.createElement('div');
                element.className = 'event log-entry';
                element.textContent = event.data;
                document.getElementById('system-logs').prepend(element);
            });
            
            logEventSource.onerror = function(event) {
                console.error('系统日志流错误:', event);
            };
        }
        
        function disconnectSystemLogs() {
            if (logEventSource) {
                logEventSource.close();
                logEventSource = null;
            }
        }
        
        function startProgress(taskId) {
            if (progressEventSource) {
                progressEventSource.close();
            }
            
            progressEventSource = new EventSource(`/api/sse/progress/${taskId}`);
            
            progressEventSource.addEventListener('progress-update', function(event) {
                const data = JSON.parse(event.data);
                const element = document.createElement('div');
                element.className = 'event progress-update';
                element.innerHTML = `
                    <strong>任务 ${data.taskId}</strong>: ${data.percentage}% 
                    - ${data.message}
                    <progress value="${data.percentage}" max="100"></progress>
                `;
                document.getElementById('progress-updates').innerHTML = '';
                document.getElementById('progress-updates').appendChild(element);
                
                if (data.percentage === 100) {
                    progressEventSource.close();
                    progressEventSource = null;
                }
            });
        }
    </script>
</body>
</html>

4.4 WebSocket实时通信

WebSocket服务端配置


@Configuration
@EnableWebFlux
public class WebSocketConfiguration {
    
    /**
     * WebSocket配置类
     */
    @Bean
    public HandlerMapping webSocketHandlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/ws/chat", new ChatWebSocketHandler());
        map.put("/ws/notifications", new NotificationWebSocketHandler());
        map.put("/ws/live-data", new LiveDataWebSocketHandler());
        
        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(map);
        mapping.setOrder(1);
        return mapping;
    }
    
    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
    
    @Bean
    public WebSocketService webSocketService() {
        return new HandshakeWebSocketService(
            new ReactorNettyRequestUpgradeStrategy()
        );
    }
}

@Component
public class ChatWebSocketHandler implements WebSocketHandler {
    
    private final Set<WebSocketSession> sessions = Collections.synchronizedSet(new HashSet<>());
    private final ObjectMapper objectMapper = new ObjectMapper();
    
    /**
     * WebSocket连接建立时调用
     */
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        System.out.println("新的WebSocket连接: " + session.getId());
        sessions.add(session);
        
        // 发送欢迎消息
        ChatMessage welcomeMsg = new ChatMessage("系统", "欢迎加入聊天室!", "system");
        return sendMessage(session, welcomeMsg)
                .thenMany(session.receive()
                    .map(webSocketMessage -> {
                        try {
                            return objectMapper.readValue(webSocketMessage.getPayloadAsText(), ChatMessage.class);
                        } catch (Exception e) {
                            return new ChatMessage("系统", "消息格式错误", "error");
                        }
                    })
                    .flatMap(message -> {
                        // 广播消息给所有连接的客户端
                        return broadcastMessage(message);
                    })
                    .doOnError(error -> {
                        System.err.println("WebSocket错误: " + error.getMessage());
                    })
                    .doFinally(signal -> {
                        // 连接关闭时清理
                        sessions.remove(session);
                        System.out.println("WebSocket连接关闭: " + session.getId());
                        
                        // 广播用户离开消息
                        ChatMessage leaveMsg = new ChatMessage("系统", 
                            "用户离开聊天室", "system");
                        broadcastMessage(leaveMsg).subscribe();
                    })
                )
                .then();
    }
    
    /**
     * 广播消息给所有连接的客户端
     */
    private Mono<Void> broadcastMessage(ChatMessage message) {
        message.setTimestamp(System.currentTimeMillis());
        
        return Flux.fromIterable(sessions)
                .flatMap(session -> sendMessage(session, message))
                .then();
    }
    
    /**
     * 发送消息给指定会话
     */
    private Mono<Void> sendMessage(WebSocketSession session, ChatMessage message) {
        try {
            String json = objectMapper.writeValueAsString(message);
            return session.send(Mono.just(session.textMessage(json)));
        } catch (Exception e) {
            return Mono.error(e);
        }
    }
}

@Component
public class NotificationWebSocketHandler implements WebSocketHandler {
    
    private final Set<WebSocketSession> sessions = Collections.synchronizedSet(new HashSet<>());
    private final ObjectMapper objectMapper = new ObjectMapper();
    
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        sessions.add(session);
        
        // 处理客户端订阅请求
        return session.receive()
                .map(webSocketMessage -> {
                    try {
                        return objectMapper.readValue(webSocketMessage.getPayloadAsText(), SubscribeRequest.class);
                    } catch (Exception e) {
                        return new SubscribeRequest("all");
                    }
                })
                .flatMap(request -> {
                    // 根据订阅请求发送相应的通知
                    return startNotificationStream(session, request.getTopics());
                })
                .doFinally(signal -> {
                    sessions.remove(session);
                })
                .then();
    }
    
    private Mono<Void> startNotificationStream(WebSocketSession session, List<String> topics) {
        return Flux.interval(Duration.ofSeconds(5))
                .map(sequence -> createNotification(sequence, topics))
                .flatMap(notification -> sendNotification(session, notification))
                .then();
    }
    
    private Notification createNotification(long sequence, List<String> topics) {
        String[] types = {"info", "warning", "alert"};
        return new Notification(
            "notif-" + sequence,
            "通知标题 " + sequence,
            "通知内容 " + sequence,
            types[(int) (sequence % types.length)],
            "medium"
        );
    }
    
    private Mono<Void> sendNotification(WebSocketSession session, Notification notification) {
        try {
            String json = objectMapper.writeValueAsString(notification);
            return session.send(Mono.just(session.textMessage(json)));
        } catch (Exception e) {
            return Mono.error(e);
        }
    }
}

@Component
public class LiveDataWebSocketHandler implements WebSocketHandler {
    
    private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
    private final ObjectMapper objectMapper = new ObjectMapper();
    
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        String sessionId = session.getId();
        sessions.put(sessionId, session);
        
        // 处理客户端消息
        return session.receive()
                .map(webSocketMessage -> {
                    try {
                        return objectMapper.readValue(webSocketMessage.getPayloadAsText(), LiveDataRequest.class);
                    } catch (Exception e) {
                        return new LiveDataRequest("subscribe", "default");
                    }
                })
                .flatMap(request -> {
                    switch (request.getAction()) {
                        case "subscribe":
                            return handleSubscribe(session, request.getSymbol());
                        case "unsubscribe":
                            return handleUnsubscribe(session, request.getSymbol());
                        default:
                            return Mono.empty();
                    }
                })
                .doFinally(signal -> {
                    sessions.remove(sessionId);
                })
                .then();
    }
    
    private Mono<Void> handleSubscribe(WebSocketSession session, String symbol) {
        // 开始发送实时数据
        return Flux.interval(Duration.ofSeconds(1))
                .map(sequence -> generateLiveData(symbol, sequence))
                .flatMap(data -> sendLiveData(session, data))
                .take(Duration.ofMinutes(10)) // 限制流时间
                .then();
    }
    
    private Mono<Void> handleUnsubscribe(WebSocketSession session, String symbol) {
        // 停止发送数据
        return Mono.fromRunnable(() -> 
            System.out.println("取消订阅: " + symbol));
    }
    
    private LiveData generateLiveData(String symbol, long sequence) {
        double value = 100 + Math.random() * 50 + Math.sin(sequence * 0.1) * 10;
        return new LiveData(symbol, value, System.currentTimeMillis());
    }
    
    private Mono<Void> sendLiveData(WebSocketSession session, LiveData data) {
        try {
            String json = objectMapper.writeValueAsString(data);
            return session.send(Mono.just(session.textMessage(json)));
        } catch (Exception e) {
            return Mono.error(e);
        }
    }
}

// WebSocket数据模型
class ChatMessage {
    private String user;
    private String content;
    private String type; // "user", "system", "error"
    private long timestamp;
    
    public ChatMessage(String user, String content, String type) {
        this.user = user;
        this.content = content;
        this.type = type;
        this.timestamp = System.currentTimeMillis();
    }
    
    // getters and setters
    public String getUser() { return user; }
    public void setUser(String user) { this.user = user; }
    
    public String getContent() { return content; }
    public void setContent(String content) { this.content = content; }
    
    public String getType() { return type; }
    public void setType(String type) { this.type = type; }
    
    public long getTimestamp() { return timestamp; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
}

class SubscribeRequest {
    private List<String> topics;
    
    public SubscribeRequest(String... topics) {
        this.topics = Arrays.asList(topics);
    }
    
    // getters and setters
    public List<String> getTopics() { return topics; }
    public void setTopics(List<String> topics) { this.topics = topics; }
}

class LiveDataRequest {
    private String action; // "subscribe", "unsubscribe"
    private String symbol;
    
    public LiveDataRequest(String action, String symbol) {
        this.action = action;
        this.symbol = symbol;
    }
    
    // getters and setters
    public String getAction() { return action; }
    public void setAction(String action) { this.action = action; }
    
    public String getSymbol() { return symbol; }
    public void setSymbol(String symbol) { this.symbol = symbol; }
}

class LiveData {
    private String symbol;
    private double value;
    private long timestamp;
    
    public LiveData(String symbol, double value, long timestamp) {
        this.symbol = symbol;
        this.value = value;
        this.timestamp = timestamp;
    }
    
    // getters and setters
    public String getSymbol() { return symbol; }
    public void setSymbol(String symbol) { this.symbol = symbol; }
    
    public double getValue() { return value; }
    public void setValue(double value) { this.value = value; }
    
    public long getTimestamp() { return timestamp; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
}

WebSocket客户端示例


<!DOCTYPE html>
<html>
<head>
    <title>WebSocket客户端示例</title>
    <style>
        .container { margin: 20px; padding: 10px; border: 1px solid #ccc; }
        .message { margin: 5px 0; padding: 5px; background: #f5f5f5; }
        .user-message { border-left: 3px solid #4CAF50; }
        .system-message { border-left: 3px solid #2196F3; }
        .error-message { border-left: 3px solid #f44336; }
        .connected { color: #4CAF50; }
        .disconnected { color: #f44336; }
    </style>
</head>
<body>
    <h1>WebSocket 演示</h1>
    
    <div class="container">
        <h3>聊天室</h3>
        <div>
            <span id="chat-status" class="disconnected">未连接</span>
            <button onclick="connectChat()">连接聊天室</button>
            <button onclick="disconnectChat()">断开连接</button>
        </div>
        <div>
            <input type="text" id="username" placeholder="用户名" value="用户1">
            <input type="text" id="message-input" placeholder="输入消息">
            <button onclick="sendChatMessage()">发送</button>
        </div>
        <div id="chat-messages"></div>
    </div>
    
    <div class="container">
        <h3>实时数据</h3>
        <div>
            <span id="data-status" class="disconnected">未连接</span>
            <button onclick="connectLiveData()">连接实时数据</button>
            <button onclick="disconnectLiveData()">断开连接</button>
        </div>
        <input type="text" id="data-symbol" placeholder="数据符号" value="STOCK1">
        <button onclick="subscribeData()">订阅</button>
        <div id="live-data"></div>
    </div>

    <script>
        let chatSocket = null;
        let dataSocket = null;
        
        function connectChat() {
            if (chatSocket) return;
            
            chatSocket = new WebSocket('ws://localhost:8080/ws/chat');
            
            chatSocket.onopen = function(event) {
                document.getElementById('chat-status').textContent = '已连接';
                document.getElementById('chat-status').className = 'connected';
                addChatMessage('系统', '连接成功', 'system');
            };
            
            chatSocket.onmessage = function(event) {
                const message = JSON.parse(event.data);
                addChatMessage(message.user, message.content, message.type);
            };
            
            chatSocket.onclose = function(event) {
                document.getElementById('chat-status').textContent = '未连接';
                document.getElementById('chat-status').className = 'disconnected';
                addChatMessage('系统', '连接已断开', 'system');
                chatSocket = null;
            };
            
            chatSocket.onerror = function(event) {
                console.error('WebSocket错误:', event);
                addChatMessage('系统', '连接错误', 'error');
            };
        }
        
        function disconnectChat() {
            if (chatSocket) {
                chatSocket.close();
                chatSocket = null;
            }
        }
        
        function sendChatMessage() {
            if (!chatSocket || chatSocket.readyState !== WebSocket.OPEN) {
                alert('请先连接WebSocket');
                return;
            }
            
            const username = document.getElementById('username').value;
            const message = document.getElementById('message-input').value;
            
            if (!username || !message) {
                alert('请输入用户名和消息');
                return;
            }
            
            const chatMessage = {
                user: username,
                content: message,
                type: 'user'
            };
            
            chatSocket.send(JSON.stringify(chatMessage));
            document.getElementById('message-input').value = '';
        }
        
        function addChatMessage(user, content, type) {
            const messagesDiv = document.getElementById('chat-messages');
            const messageDiv = document.createElement('div');
            messageDiv.className = `message ${type}-message`;
            messageDiv.innerHTML = `
                <strong>${user}:</strong> ${content}
                <small>${new Date().toLocaleTimeString()}</small>
            `;
            messagesDiv.appendChild(messageDiv);
            messagesDiv.scrollTop = messagesDiv.scrollHeight;
        }
        
        function connectLiveData() {
            if (dataSocket) return;
            
            dataSocket = new WebSocket('ws://localhost:8080/ws/live-data');
            
            dataSocket.onopen = function(event) {
                document.getElementById('data-status').textContent = '已连接';
                document.getElementById('data-status').className = 'connected';
            };
            
            dataSocket.onmessage = function(event) {
                const data = JSON.parse(event.data);
                displayLiveData(data);
            };
            
            dataSocket.onclose = function(event) {
                document.getElementById('data-status').textContent = '未连接';
                document.getElementById('data-status').className = 'disconnected';
                dataSocket = null;
            };
        }
        
        function disconnectLiveData() {
            if (dataSocket) {
                dataSocket.close();
                dataSocket = null;
            }
        }
        
        function subscribeData() {
            if (!dataSocket || dataSocket.readyState !== WebSocket.OPEN) {
                alert('请先连接WebSocket');
                return;
            }
            
            const symbol = document.getElementById('data-symbol').value;
            const request = {
                action: 'subscribe',
                symbol: symbol
            };
            
            dataSocket.send(JSON.stringify(request));
        }
        
        function displayLiveData(data) {
            const dataDiv = document.getElementById('live-data');
            const dataElement = document.createElement('div');
            dataElement.className = 'message';
            dataElement.innerHTML = `
                <strong>${data.symbol}:</strong> ${data.value.toFixed(2)}
                <small>${new Date(data.timestamp).toLocaleTimeString()}</small>
            `;
            dataDiv.appendChild(dataElement);
        }
    </script>
</body>
</html>

4.5 响应式安全(Spring Security Reactive)

Spring Security Reactive配置


@Configuration
@EnableWebFluxSecurity
public class SecurityConfiguration {
    
    /**
     * Spring Security Reactive 配置
     */
    @Bean
    public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
        return http
                .authorizeExchange(exchanges -> exchanges
                    // 公开路径
                    .pathMatchers("/public/**", "/auth/login", "/auth/register").permitAll()
                    .pathMatchers("/ws/**").permitAll() // WebSocket通常需要单独配置
                    .pathMatchers("/sse/**").permitAll()
                    
                    // 需要认证的路径
                    .pathMatchers("/api/admin/**").hasRole("ADMIN")
                    .pathMatchers("/api/user/**").hasRole("USER")
                    .pathMatchers("/api/**").authenticated()
                    
                    // 其他请求
                    .anyExchange().authenticated()
                )
                .httpBasic(httpBasic -> httpBasic.disable())
                .formLogin(formLogin -> formLogin
                    .loginPage("/auth/login")
                    .authenticationSuccessHandler(authenticationSuccessHandler())
                    .authenticationFailureHandler(authenticationFailureHandler())
                )
                .logout(logout -> logout
                    .logoutUrl("/auth/logout")
                    .logoutSuccessHandler(logoutSuccessHandler())
                )
                .csrf(csrf -> csrf.disable()) // 对于API通常禁用CSRF
                .exceptionHandling(exceptionHandling -> exceptionHandling
                    .authenticationEntryPoint(authenticationEntryPoint())
                    .accessDeniedHandler(accessDeniedHandler())
                )
                .build();
    }
    
    /**
     * 密码编码器
     */
    @Bean
    public PasswordEncoder passwordEncoder() {
        return PasswordEncoderFactories.createDelegatingPasswordEncoder();
    }
    
    /**
     * 响应式用户详情服务
     */
    @Bean
    public ReactiveUserDetailsService reactiveUserDetailsService() {
        UserDetails user = User.withUsername("user")
                .password("{bcrypt}$2a$10$dXJ3SW6G7P.XBLBvanJYv.M5.g.3tJ5f.Nu.ZBLBvanJYv.M5.g.3tJ5")
                .roles("USER")
                .build();
        
        UserDetails admin = User.withUsername("admin")
                .password("{bcrypt}$2a$10$dXJ3SW6G7P.XBLBvanJYv.M5.g.3tJ5f.Nu.ZBLBvanJYv.M5.g.3tJ5")
                .roles("USER", "ADMIN")
                .build();
        
        return new MapReactiveUserDetailsService(user, admin);
    }
    
    // 各种处理器配置
    @Bean
    public ServerAuthenticationSuccessHandler authenticationSuccessHandler() {
        return new WebFilterChainServerAuthenticationSuccessHandler();
    }
    
    @Bean
    public ServerAuthenticationFailureHandler authenticationFailureHandler() {
        return new RedirectServerAuthenticationFailureHandler("/auth/login?error");
    }
    
    @Bean
    public ServerLogoutSuccessHandler logoutSuccessHandler() {
        return new RedirectServerLogoutSuccessHandler();
    }
    
    @Bean
    public ServerAuthenticationEntryPoint authenticationEntryPoint() {
        return (exchange, exception) -> {
            exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
            exchange.getResponse().getHeaders().setContentType(MediaType.APPLICATION_JSON);
            String body = "{"error": "未认证"}";
            DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(body.getBytes());
            return exchange.getResponse().writeWith(Mono.just(buffer));
        };
    }
    
    @Bean
    public ServerAccessDeniedHandler accessDeniedHandler() {
        return (exchange, exception) -> {
            exchange.getResponse().setStatusCode(HttpStatus.FORBIDDEN);
            exchange.getResponse().getHeaders().setContentType(MediaType.APPLICATION_JSON);
            String body = "{"error": "权限不足"}";
            DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(body.getBytes());
            return exchange.getResponse().writeWith(Mono.just(buffer));
        };
    }
}

/**
 * JWT认证过滤器
 */
@Component
public class JwtAuthenticationWebFilter implements WebFilter {
    
    private final JwtTokenProvider tokenProvider;
    private final ReactiveUserDetailsService userDetailsService;
    
    public JwtAuthenticationWebFilter(JwtTokenProvider tokenProvider, 
                                     ReactiveUserDetailsService userDetailsService) {
        this.tokenProvider = tokenProvider;
        this.userDetailsService = userDetailsService;
    }
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        String token = resolveToken(exchange.getRequest());
        
        if (token != null && tokenProvider.validateToken(token)) {
            String username = tokenProvider.getUsernameFromToken(token);
            
            return userDetailsService.findByUsername(username)
                    .map(userDetails -> 
                        new UsernamePasswordAuthenticationToken(
                            userDetails, 
                            null, 
                            userDetails.getAuthorities()
                        )
                    )
                    .flatMap(authentication -> {
                        // 将认证信息设置到SecurityContext中
                        return chain.filter(exchange)
                                .contextWrite(ReactiveSecurityContextHolder
                                        .withAuthentication(authentication));
                    });
        }
        
        return chain.filter(exchange);
    }
    
    private String resolveToken(ServerHttpRequest request) {
        String bearerToken = request.getHeaders().getFirst("Authorization");
        if (bearerToken != null && bearerToken.startsWith("Bearer ")) {
            return bearerToken.substring(7);
        }
        return null;
    }
}

/**
 * JWT令牌提供者
 */
@Component
public class JwtTokenProvider {
    
    private final String secretKey = "your-secret-key";
    private final long validityInMilliseconds = 3600000; // 1小时
    
    public String createToken(String username, List<String> roles) {
        Claims claims = Jwts.claims().setSubject(username);
        claims.put("roles", roles);
        
        Date now = new Date();
        Date validity = new Date(now.getTime() + validityInMilliseconds);
        
        return Jwts.builder()
                .setClaims(claims)
                .setIssuedAt(now)
                .setExpiration(validity)
                .signWith(SignatureAlgorithm.HS256, secretKey)
                .compact();
    }
    
    public String getUsernameFromToken(String token) {
        return Jwts.parser()
                .setSigningKey(secretKey)
                .parseClaimsJws(token)
                .getBody()
                .getSubject();
    }
    
    public boolean validateToken(String token) {
        try {
            Jws<Claims> claims = Jwts.parser()
                    .setSigningKey(secretKey)
                    .parseClaimsJws(token);
            
            return !claims.getBody().getExpiration().before(new Date());
        } catch (JwtException | IllegalArgumentException e) {
            return false;
        }
    }
}

/**
 * 认证控制器
 */
@RestController
@RequestMapping("/auth")
public class AuthController {
    
    private final JwtTokenProvider tokenProvider;
    private final ReactiveAuthenticationManager authenticationManager;
    private final PasswordEncoder passwordEncoder;
    
    public AuthController(JwtTokenProvider tokenProvider, 
                         ReactiveAuthenticationManager authenticationManager,
                         PasswordEncoder passwordEncoder) {
        this.tokenProvider = tokenProvider;
        this.authenticationManager = authenticationManager;
        this.passwordEncoder = passwordEncoder;
    }
    
    @PostMapping("/login")
    public Mono<ResponseEntity<AuthResponse>> login(@RequestBody LoginRequest loginRequest) {
        String username = loginRequest.getUsername();
        String password = loginRequest.getPassword();
        
        Authentication authentication = new UsernamePasswordAuthenticationToken(username, password);
        
        return authenticationManager.authenticate(authentication)
                .map(auth -> {
                    String token = tokenProvider.createToken(username, 
                        auth.getAuthorities().stream()
                            .map(GrantedAuthority::getAuthority)
                            .collect(Collectors.toList()));
                    
                    return ResponseEntity.ok(new AuthResponse(token, "Bearer"));
                })
                .onErrorResume(e -> 
                    Mono.just(ResponseEntity.status(HttpStatus.UNAUTHORIZED).build()));
    }
    
    @PostMapping("/register")
    public Mono<ResponseEntity<AuthResponse>> register(@RequestBody RegisterRequest registerRequest) {
        // 用户注册逻辑
        // 这里简化处理,实际项目中需要保存用户到数据库
        
        String username = registerRequest.getUsername();
        String encodedPassword = passwordEncoder.encode(registerRequest.getPassword());
        
        // 创建用户并生成token
        String token = tokenProvider.createToken(username, Arrays.asList("ROLE_USER"));
        
        return Mono.just(ResponseEntity.ok(new AuthResponse(token, "Bearer")));
    }
    
    @GetMapping("/me")
    public Mono<ResponseEntity<UserInfo>> getCurrentUser() {
        return ReactiveSecurityContextHolder.getContext()
                .map(SecurityContext::getAuthentication)
                .map(authentication -> {
                    String username = authentication.getName();
                    List<String> roles = authentication.getAuthorities().stream()
                            .map(GrantedAuthority::getAuthority)
                            .collect(Collectors.toList());
                    
                    return new UserInfo(username, roles);
                })
                .map(ResponseEntity::ok)
                .defaultIfEmpty(ResponseEntity.status(HttpStatus.UNAUTHORIZED).build());
    }
}

// 认证相关数据模型
class LoginRequest {
    private String username;
    private String password;
    
    // getters and setters
    public String getUsername() { return username; }
    public void setUsername(String username) { this.username = username; }
    
    public String getPassword() { return password; }
    public void setPassword(String password) { this.password = password; }
}

class RegisterRequest {
    private String username;
    private String password;
    private String email;
    
    // getters and setters
}

class AuthResponse {
    private String accessToken;
    private String tokenType;
    
    public AuthResponse(String accessToken, String tokenType) {
        this.accessToken = accessToken;
        this.tokenType = tokenType;
    }
    
    // getters and setters
    public String getAccessToken() { return accessToken; }
    public void setAccessToken(String accessToken) { this.accessToken = accessToken; }
    
    public String getTokenType() { return tokenType; }
    public void setTokenType(String tokenType) { this.tokenType = tokenType; }
}

class UserInfo {
    private String username;
    private List<String> roles;
    
    public UserInfo(String username, List<String> roles) {
        this.username = username;
        this.roles = roles;
    }
    
    // getters and setters
    public String getUsername() { return username; }
    public void setUsername(String username) { this.username = username; }
    
    public List<String> getRoles() { return roles; }
    public void setRoles(List<String> roles) { this.roles = roles; }
}

推开窗,是别人的车马喧嚣;关上门,才是自家的月光皎洁。在属于自己的节奏里泡茶、读书、慢慢行走,力量,自会在不言中生长。 耐得寂寞蓄力气,终有一朝破土鸣。

© 版权声明

相关文章

暂无评论

您必须登录才能参与评论!
立即登录
none
暂无评论...