不废话,直接开干
第4章 WebFlux高级特性(续)
4.3 Server-Sent Events(SSE)
SSE基础概念
@RestController
@RequestMapping("/api/sse")
public class ServerSentEventsExplanation {
/**
* Server-Sent Events (SSE) 特点:
* - 服务器向客户端推送数据
* - 基于HTTP协议,单向通信
* - 自动重连机制
* - 适合实时通知、日志流、进度更新等场景
*/
@GetMapping(value = "/stock-prices", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<StockPrice>> streamStockPrices() {
return Flux.interval(Duration.ofSeconds(2))
.map(sequence -> {
// 模拟股票价格变化
StockPrice price = generateRandomStockPrice();
return ServerSentEvent.<StockPrice>builder()
.id(String.valueOf(sequence))
.event("price-update")
.data(price)
.comment("股票价格更新")
.retry(Duration.ofSeconds(5))
.build();
})
.doOnCancel(() -> System.out.println("客户端断开连接"))
.doOnComplete(() -> System.out.println("数据流结束"))
.doOnError(error -> System.err.println("流错误: " + error.getMessage()));
}
@GetMapping(value = "/system-logs", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamSystemLogs() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> {
String logEntry = generateLogEntry(sequence);
return ServerSentEvent.<String>builder()
.id("log-" + sequence)
.event("log-entry")
.data(logEntry)
.comment("系统日志")
.build();
})
.take(50) // 限制日志数量
.doOnNext(event ->
System.out.println("发送日志: " + event.data()));
}
@GetMapping(value = "/progress/{taskId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Progress>> streamTaskProgress(@PathVariable String taskId) {
return Flux.range(0, 101) // 0-100%
.delayElements(Duration.ofMillis(200))
.map(progress -> {
Progress progressObj = new Progress(taskId, progress, "处理中...");
if (progress == 100) {
progressObj.setMessage("任务完成");
} else if (progress % 10 == 0) {
progressObj.setMessage("已完成 " + progress + "%");
}
return ServerSentEvent.<Progress>builder()
.id("progress-" + progress)
.event("progress-update")
.data(progressObj)
.build();
})
.doOnSubscribe(subscription ->
System.out.println("开始监控任务进度: " + taskId))
.doOnComplete(() ->
System.out.println("任务完成: " + taskId));
}
/**
* 带过滤的SSE流
*/
@GetMapping(value = "/notifications", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Notification>> streamNotifications(
@RequestParam(required = false) String type,
@RequestParam(required = false) String priority) {
return Flux.interval(Duration.ofSeconds(3))
.map(sequence -> generateNotification(sequence))
.filter(notification -> {
// 根据查询参数过滤通知
if (type != null && !type.equals(notification.getType())) {
return false;
}
if (priority != null && !priority.equals(notification.getPriority())) {
return false;
}
return true;
})
.map(notification ->
ServerSentEvent.<Notification>builder()
.id(notification.getId())
.event(notification.getType())
.data(notification)
.build())
.doOnNext(event ->
System.out.println("发送通知: " + event.data().getTitle()));
}
/**
* 支持多个事件类型的SSE流
*/
@GetMapping(value = "/multi-events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<?>> streamMultipleEvents() {
Flux<ServerSentEvent<?>> userEvents = Flux.interval(Duration.ofSeconds(5))
.map(seq -> ServerSentEvent.builder()
.event("user-online")
.data(new UserOnlineEvent("user" + seq, System.currentTimeMillis()))
.build());
Flux<ServerSentEvent<?>> systemEvents = Flux.interval(Duration.ofSeconds(10))
.map(seq -> ServerSentEvent.builder()
.event("system-alert")
.data(new SystemAlert("警告", "系统运行正常"))
.build());
Flux<ServerSentEvent<?>> chatEvents = Flux.interval(Duration.ofSeconds(2))
.map(seq -> ServerSentEvent.builder()
.event("chat-message")
.data(new ChatMessage("用户" + seq, "消息内容 " + seq))
.build());
return Flux.merge(userEvents, systemEvents, chatEvents)
.doOnNext(event ->
System.out.println("发送事件: " + event.event() + " - " + event.data()));
}
// 辅助方法
private StockPrice generateRandomStockPrice() {
String[] symbols = {"AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"};
String symbol = symbols[(int) (Math.random() * symbols.length)];
double price = 100 + Math.random() * 1000;
double change = (Math.random() - 0.5) * 20;
return new StockPrice(symbol, price, change);
}
private String generateLogEntry(long sequence) {
String[] levels = {"INFO", "WARN", "ERROR"};
String[] messages = {
"用户登录成功",
"数据库查询完成",
"缓存更新",
"API调用开始",
"任务调度执行"
};
String level = levels[(int) (Math.random() * levels.length)];
String message = messages[(int) (Math.random() * messages.length)];
return String.format("[%s] %s - %s", level, new Date(), message);
}
private Notification generateNotification(long sequence) {
String[] types = {"info", "warning", "error", "success"};
String[] priorities = {"low", "medium", "high"};
String[] titles = {
"系统维护通知",
"新消息到达",
"任务完成提醒",
"安全警告"
};
return new Notification(
"notif-" + sequence,
titles[(int) (sequence % titles.length)],
"通知内容 " + sequence,
types[(int) (sequence % types.length)],
priorities[(int) (sequence % priorities.length)]
);
}
}
// SSE数据模型类
class StockPrice {
private String symbol;
private double price;
private double change;
public StockPrice(String symbol, double price, double change) {
this.symbol = symbol;
this.price = price;
this.change = change;
}
// getters and setters
public String getSymbol() { return symbol; }
public void setSymbol(String symbol) { this.symbol = symbol; }
public double getPrice() { return price; }
public void setPrice(double price) { this.price = price; }
public double getChange() { return change; }
public void setChange(double change) { this.change = change; }
}
class Progress {
private String taskId;
private int percentage;
private String message;
public Progress(String taskId, int percentage, String message) {
this.taskId = taskId;
this.percentage = percentage;
this.message = message;
}
// getters and setters
public String getTaskId() { return taskId; }
public void setTaskId(String taskId) { this.taskId = taskId; }
public int getPercentage() { return percentage; }
public void setPercentage(int percentage) { this.percentage = percentage; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
}
class Notification {
private String id;
private String title;
private String content;
private String type;
private String priority;
public Notification(String id, String title, String content, String type, String priority) {
this.id = id;
this.title = title;
this.content = content;
this.type = type;
this.priority = priority;
}
// getters and setters
public String getId() { return id; }
public void setId(String id) { this.id = id; }
public String getTitle() { return title; }
public void setTitle(String title) { this.title = title; }
public String getContent() { return content; }
public void setContent(String content) { this.content = content; }
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getPriority() { return priority; }
public void setPriority(String priority) { this.priority = priority; }
}
class UserOnlineEvent {
private String username;
private long timestamp;
public UserOnlineEvent(String username, long timestamp) {
this.username = username;
this.timestamp = timestamp;
}
// getters and setters
}
class SystemAlert {
private String level;
private String message;
public SystemAlert(String level, String message) {
this.level = level;
this.message = message;
}
// getters and setters
}
class ChatMessage {
private String user;
private String content;
public ChatMessage(String user, String content) {
this.user = user;
this.content = content;
}
// getters and setters
}
SSE客户端示例
<!DOCTYPE html>
<html>
<head>
<title>SSE客户端示例</title>
<style>
.event-container { margin: 20px; padding: 10px; border: 1px solid #ccc; }
.event { margin: 5px 0; padding: 5px; background: #f5f5f5; }
.price-update { border-left: 3px solid #4CAF50; }
.log-entry { border-left: 3px solid #2196F3; }
.progress-update { border-left: 3px solid #FF9800; }
</style>
</head>
<body>
<h1>Server-Sent Events 演示</h1>
<div class="event-container">
<h3>股票价格</h3>
<button onclick="connectStockPrices()">连接股票价格流</button>
<button onclick="disconnectStockPrices()">断开连接</button>
<div id="stock-prices"></div>
</div>
<div class="event-container">
<h3>系统日志</h3>
<button onclick="connectSystemLogs()">连接系统日志流</button>
<button onclick="disconnectSystemLogs()">断开连接</button>
<div id="system-logs"></div>
</div>
<div class="event-container">
<h3>任务进度</h3>
<button onclick="startProgress('task-123')">开始监控任务进度</button>
<div id="progress-updates"></div>
</div>
<script>
let stockEventSource = null;
let logEventSource = null;
let progressEventSource = null;
function connectStockPrices() {
if (stockEventSource) return;
stockEventSource = new EventSource('/api/sse/stock-prices');
stockEventSource.addEventListener('price-update', function(event) {
const data = JSON.parse(event.data);
const element = document.createElement('div');
element.className = 'event price-update';
element.innerHTML = `
<strong>${data.symbol}</strong>: $${data.price.toFixed(2)}
(${data.change > 0 ? '+' : ''}${data.change.toFixed(2)})
<small>${new Date().toLocaleTimeString()}</small>
`;
document.getElementById('stock-prices').prepend(element);
});
stockEventSource.onerror = function(event) {
console.error('股票价格流错误:', event);
};
stockEventSource.onopen = function(event) {
console.log('股票价格流已连接');
};
}
function disconnectStockPrices() {
if (stockEventSource) {
stockEventSource.close();
stockEventSource = null;
console.log('股票价格流已断开');
}
}
function connectSystemLogs() {
if (logEventSource) return;
logEventSource = new EventSource('/api/sse/system-logs');
logEventSource.addEventListener('log-entry', function(event) {
const element = document.createElement('div');
element.className = 'event log-entry';
element.textContent = event.data;
document.getElementById('system-logs').prepend(element);
});
logEventSource.onerror = function(event) {
console.error('系统日志流错误:', event);
};
}
function disconnectSystemLogs() {
if (logEventSource) {
logEventSource.close();
logEventSource = null;
}
}
function startProgress(taskId) {
if (progressEventSource) {
progressEventSource.close();
}
progressEventSource = new EventSource(`/api/sse/progress/${taskId}`);
progressEventSource.addEventListener('progress-update', function(event) {
const data = JSON.parse(event.data);
const element = document.createElement('div');
element.className = 'event progress-update';
element.innerHTML = `
<strong>任务 ${data.taskId}</strong>: ${data.percentage}%
- ${data.message}
<progress value="${data.percentage}" max="100"></progress>
`;
document.getElementById('progress-updates').innerHTML = '';
document.getElementById('progress-updates').appendChild(element);
if (data.percentage === 100) {
progressEventSource.close();
progressEventSource = null;
}
});
}
</script>
</body>
</html>
4.4 WebSocket实时通信
WebSocket服务端配置
@Configuration
@EnableWebFlux
public class WebSocketConfiguration {
/**
* WebSocket配置类
*/
@Bean
public HandlerMapping webSocketHandlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/ws/chat", new ChatWebSocketHandler());
map.put("/ws/notifications", new NotificationWebSocketHandler());
map.put("/ws/live-data", new LiveDataWebSocketHandler());
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(1);
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
@Bean
public WebSocketService webSocketService() {
return new HandshakeWebSocketService(
new ReactorNettyRequestUpgradeStrategy()
);
}
}
@Component
public class ChatWebSocketHandler implements WebSocketHandler {
private final Set<WebSocketSession> sessions = Collections.synchronizedSet(new HashSet<>());
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* WebSocket连接建立时调用
*/
@Override
public Mono<Void> handle(WebSocketSession session) {
System.out.println("新的WebSocket连接: " + session.getId());
sessions.add(session);
// 发送欢迎消息
ChatMessage welcomeMsg = new ChatMessage("系统", "欢迎加入聊天室!", "system");
return sendMessage(session, welcomeMsg)
.thenMany(session.receive()
.map(webSocketMessage -> {
try {
return objectMapper.readValue(webSocketMessage.getPayloadAsText(), ChatMessage.class);
} catch (Exception e) {
return new ChatMessage("系统", "消息格式错误", "error");
}
})
.flatMap(message -> {
// 广播消息给所有连接的客户端
return broadcastMessage(message);
})
.doOnError(error -> {
System.err.println("WebSocket错误: " + error.getMessage());
})
.doFinally(signal -> {
// 连接关闭时清理
sessions.remove(session);
System.out.println("WebSocket连接关闭: " + session.getId());
// 广播用户离开消息
ChatMessage leaveMsg = new ChatMessage("系统",
"用户离开聊天室", "system");
broadcastMessage(leaveMsg).subscribe();
})
)
.then();
}
/**
* 广播消息给所有连接的客户端
*/
private Mono<Void> broadcastMessage(ChatMessage message) {
message.setTimestamp(System.currentTimeMillis());
return Flux.fromIterable(sessions)
.flatMap(session -> sendMessage(session, message))
.then();
}
/**
* 发送消息给指定会话
*/
private Mono<Void> sendMessage(WebSocketSession session, ChatMessage message) {
try {
String json = objectMapper.writeValueAsString(message);
return session.send(Mono.just(session.textMessage(json)));
} catch (Exception e) {
return Mono.error(e);
}
}
}
@Component
public class NotificationWebSocketHandler implements WebSocketHandler {
private final Set<WebSocketSession> sessions = Collections.synchronizedSet(new HashSet<>());
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public Mono<Void> handle(WebSocketSession session) {
sessions.add(session);
// 处理客户端订阅请求
return session.receive()
.map(webSocketMessage -> {
try {
return objectMapper.readValue(webSocketMessage.getPayloadAsText(), SubscribeRequest.class);
} catch (Exception e) {
return new SubscribeRequest("all");
}
})
.flatMap(request -> {
// 根据订阅请求发送相应的通知
return startNotificationStream(session, request.getTopics());
})
.doFinally(signal -> {
sessions.remove(session);
})
.then();
}
private Mono<Void> startNotificationStream(WebSocketSession session, List<String> topics) {
return Flux.interval(Duration.ofSeconds(5))
.map(sequence -> createNotification(sequence, topics))
.flatMap(notification -> sendNotification(session, notification))
.then();
}
private Notification createNotification(long sequence, List<String> topics) {
String[] types = {"info", "warning", "alert"};
return new Notification(
"notif-" + sequence,
"通知标题 " + sequence,
"通知内容 " + sequence,
types[(int) (sequence % types.length)],
"medium"
);
}
private Mono<Void> sendNotification(WebSocketSession session, Notification notification) {
try {
String json = objectMapper.writeValueAsString(notification);
return session.send(Mono.just(session.textMessage(json)));
} catch (Exception e) {
return Mono.error(e);
}
}
}
@Component
public class LiveDataWebSocketHandler implements WebSocketHandler {
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public Mono<Void> handle(WebSocketSession session) {
String sessionId = session.getId();
sessions.put(sessionId, session);
// 处理客户端消息
return session.receive()
.map(webSocketMessage -> {
try {
return objectMapper.readValue(webSocketMessage.getPayloadAsText(), LiveDataRequest.class);
} catch (Exception e) {
return new LiveDataRequest("subscribe", "default");
}
})
.flatMap(request -> {
switch (request.getAction()) {
case "subscribe":
return handleSubscribe(session, request.getSymbol());
case "unsubscribe":
return handleUnsubscribe(session, request.getSymbol());
default:
return Mono.empty();
}
})
.doFinally(signal -> {
sessions.remove(sessionId);
})
.then();
}
private Mono<Void> handleSubscribe(WebSocketSession session, String symbol) {
// 开始发送实时数据
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> generateLiveData(symbol, sequence))
.flatMap(data -> sendLiveData(session, data))
.take(Duration.ofMinutes(10)) // 限制流时间
.then();
}
private Mono<Void> handleUnsubscribe(WebSocketSession session, String symbol) {
// 停止发送数据
return Mono.fromRunnable(() ->
System.out.println("取消订阅: " + symbol));
}
private LiveData generateLiveData(String symbol, long sequence) {
double value = 100 + Math.random() * 50 + Math.sin(sequence * 0.1) * 10;
return new LiveData(symbol, value, System.currentTimeMillis());
}
private Mono<Void> sendLiveData(WebSocketSession session, LiveData data) {
try {
String json = objectMapper.writeValueAsString(data);
return session.send(Mono.just(session.textMessage(json)));
} catch (Exception e) {
return Mono.error(e);
}
}
}
// WebSocket数据模型
class ChatMessage {
private String user;
private String content;
private String type; // "user", "system", "error"
private long timestamp;
public ChatMessage(String user, String content, String type) {
this.user = user;
this.content = content;
this.type = type;
this.timestamp = System.currentTimeMillis();
}
// getters and setters
public String getUser() { return user; }
public void setUser(String user) { this.user = user; }
public String getContent() { return content; }
public void setContent(String content) { this.content = content; }
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
}
class SubscribeRequest {
private List<String> topics;
public SubscribeRequest(String... topics) {
this.topics = Arrays.asList(topics);
}
// getters and setters
public List<String> getTopics() { return topics; }
public void setTopics(List<String> topics) { this.topics = topics; }
}
class LiveDataRequest {
private String action; // "subscribe", "unsubscribe"
private String symbol;
public LiveDataRequest(String action, String symbol) {
this.action = action;
this.symbol = symbol;
}
// getters and setters
public String getAction() { return action; }
public void setAction(String action) { this.action = action; }
public String getSymbol() { return symbol; }
public void setSymbol(String symbol) { this.symbol = symbol; }
}
class LiveData {
private String symbol;
private double value;
private long timestamp;
public LiveData(String symbol, double value, long timestamp) {
this.symbol = symbol;
this.value = value;
this.timestamp = timestamp;
}
// getters and setters
public String getSymbol() { return symbol; }
public void setSymbol(String symbol) { this.symbol = symbol; }
public double getValue() { return value; }
public void setValue(double value) { this.value = value; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
}
WebSocket客户端示例
<!DOCTYPE html>
<html>
<head>
<title>WebSocket客户端示例</title>
<style>
.container { margin: 20px; padding: 10px; border: 1px solid #ccc; }
.message { margin: 5px 0; padding: 5px; background: #f5f5f5; }
.user-message { border-left: 3px solid #4CAF50; }
.system-message { border-left: 3px solid #2196F3; }
.error-message { border-left: 3px solid #f44336; }
.connected { color: #4CAF50; }
.disconnected { color: #f44336; }
</style>
</head>
<body>
<h1>WebSocket 演示</h1>
<div class="container">
<h3>聊天室</h3>
<div>
<span id="chat-status" class="disconnected">未连接</span>
<button onclick="connectChat()">连接聊天室</button>
<button onclick="disconnectChat()">断开连接</button>
</div>
<div>
<input type="text" id="username" placeholder="用户名" value="用户1">
<input type="text" id="message-input" placeholder="输入消息">
<button onclick="sendChatMessage()">发送</button>
</div>
<div id="chat-messages"></div>
</div>
<div class="container">
<h3>实时数据</h3>
<div>
<span id="data-status" class="disconnected">未连接</span>
<button onclick="connectLiveData()">连接实时数据</button>
<button onclick="disconnectLiveData()">断开连接</button>
</div>
<input type="text" id="data-symbol" placeholder="数据符号" value="STOCK1">
<button onclick="subscribeData()">订阅</button>
<div id="live-data"></div>
</div>
<script>
let chatSocket = null;
let dataSocket = null;
function connectChat() {
if (chatSocket) return;
chatSocket = new WebSocket('ws://localhost:8080/ws/chat');
chatSocket.onopen = function(event) {
document.getElementById('chat-status').textContent = '已连接';
document.getElementById('chat-status').className = 'connected';
addChatMessage('系统', '连接成功', 'system');
};
chatSocket.onmessage = function(event) {
const message = JSON.parse(event.data);
addChatMessage(message.user, message.content, message.type);
};
chatSocket.onclose = function(event) {
document.getElementById('chat-status').textContent = '未连接';
document.getElementById('chat-status').className = 'disconnected';
addChatMessage('系统', '连接已断开', 'system');
chatSocket = null;
};
chatSocket.onerror = function(event) {
console.error('WebSocket错误:', event);
addChatMessage('系统', '连接错误', 'error');
};
}
function disconnectChat() {
if (chatSocket) {
chatSocket.close();
chatSocket = null;
}
}
function sendChatMessage() {
if (!chatSocket || chatSocket.readyState !== WebSocket.OPEN) {
alert('请先连接WebSocket');
return;
}
const username = document.getElementById('username').value;
const message = document.getElementById('message-input').value;
if (!username || !message) {
alert('请输入用户名和消息');
return;
}
const chatMessage = {
user: username,
content: message,
type: 'user'
};
chatSocket.send(JSON.stringify(chatMessage));
document.getElementById('message-input').value = '';
}
function addChatMessage(user, content, type) {
const messagesDiv = document.getElementById('chat-messages');
const messageDiv = document.createElement('div');
messageDiv.className = `message ${type}-message`;
messageDiv.innerHTML = `
<strong>${user}:</strong> ${content}
<small>${new Date().toLocaleTimeString()}</small>
`;
messagesDiv.appendChild(messageDiv);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
}
function connectLiveData() {
if (dataSocket) return;
dataSocket = new WebSocket('ws://localhost:8080/ws/live-data');
dataSocket.onopen = function(event) {
document.getElementById('data-status').textContent = '已连接';
document.getElementById('data-status').className = 'connected';
};
dataSocket.onmessage = function(event) {
const data = JSON.parse(event.data);
displayLiveData(data);
};
dataSocket.onclose = function(event) {
document.getElementById('data-status').textContent = '未连接';
document.getElementById('data-status').className = 'disconnected';
dataSocket = null;
};
}
function disconnectLiveData() {
if (dataSocket) {
dataSocket.close();
dataSocket = null;
}
}
function subscribeData() {
if (!dataSocket || dataSocket.readyState !== WebSocket.OPEN) {
alert('请先连接WebSocket');
return;
}
const symbol = document.getElementById('data-symbol').value;
const request = {
action: 'subscribe',
symbol: symbol
};
dataSocket.send(JSON.stringify(request));
}
function displayLiveData(data) {
const dataDiv = document.getElementById('live-data');
const dataElement = document.createElement('div');
dataElement.className = 'message';
dataElement.innerHTML = `
<strong>${data.symbol}:</strong> ${data.value.toFixed(2)}
<small>${new Date(data.timestamp).toLocaleTimeString()}</small>
`;
dataDiv.appendChild(dataElement);
}
</script>
</body>
</html>
4.5 响应式安全(Spring Security Reactive)
Spring Security Reactive配置
@Configuration
@EnableWebFluxSecurity
public class SecurityConfiguration {
/**
* Spring Security Reactive 配置
*/
@Bean
public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
return http
.authorizeExchange(exchanges -> exchanges
// 公开路径
.pathMatchers("/public/**", "/auth/login", "/auth/register").permitAll()
.pathMatchers("/ws/**").permitAll() // WebSocket通常需要单独配置
.pathMatchers("/sse/**").permitAll()
// 需要认证的路径
.pathMatchers("/api/admin/**").hasRole("ADMIN")
.pathMatchers("/api/user/**").hasRole("USER")
.pathMatchers("/api/**").authenticated()
// 其他请求
.anyExchange().authenticated()
)
.httpBasic(httpBasic -> httpBasic.disable())
.formLogin(formLogin -> formLogin
.loginPage("/auth/login")
.authenticationSuccessHandler(authenticationSuccessHandler())
.authenticationFailureHandler(authenticationFailureHandler())
)
.logout(logout -> logout
.logoutUrl("/auth/logout")
.logoutSuccessHandler(logoutSuccessHandler())
)
.csrf(csrf -> csrf.disable()) // 对于API通常禁用CSRF
.exceptionHandling(exceptionHandling -> exceptionHandling
.authenticationEntryPoint(authenticationEntryPoint())
.accessDeniedHandler(accessDeniedHandler())
)
.build();
}
/**
* 密码编码器
*/
@Bean
public PasswordEncoder passwordEncoder() {
return PasswordEncoderFactories.createDelegatingPasswordEncoder();
}
/**
* 响应式用户详情服务
*/
@Bean
public ReactiveUserDetailsService reactiveUserDetailsService() {
UserDetails user = User.withUsername("user")
.password("{bcrypt}$2a$10$dXJ3SW6G7P.XBLBvanJYv.M5.g.3tJ5f.Nu.ZBLBvanJYv.M5.g.3tJ5")
.roles("USER")
.build();
UserDetails admin = User.withUsername("admin")
.password("{bcrypt}$2a$10$dXJ3SW6G7P.XBLBvanJYv.M5.g.3tJ5f.Nu.ZBLBvanJYv.M5.g.3tJ5")
.roles("USER", "ADMIN")
.build();
return new MapReactiveUserDetailsService(user, admin);
}
// 各种处理器配置
@Bean
public ServerAuthenticationSuccessHandler authenticationSuccessHandler() {
return new WebFilterChainServerAuthenticationSuccessHandler();
}
@Bean
public ServerAuthenticationFailureHandler authenticationFailureHandler() {
return new RedirectServerAuthenticationFailureHandler("/auth/login?error");
}
@Bean
public ServerLogoutSuccessHandler logoutSuccessHandler() {
return new RedirectServerLogoutSuccessHandler();
}
@Bean
public ServerAuthenticationEntryPoint authenticationEntryPoint() {
return (exchange, exception) -> {
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
exchange.getResponse().getHeaders().setContentType(MediaType.APPLICATION_JSON);
String body = "{"error": "未认证"}";
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(body.getBytes());
return exchange.getResponse().writeWith(Mono.just(buffer));
};
}
@Bean
public ServerAccessDeniedHandler accessDeniedHandler() {
return (exchange, exception) -> {
exchange.getResponse().setStatusCode(HttpStatus.FORBIDDEN);
exchange.getResponse().getHeaders().setContentType(MediaType.APPLICATION_JSON);
String body = "{"error": "权限不足"}";
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(body.getBytes());
return exchange.getResponse().writeWith(Mono.just(buffer));
};
}
}
/**
* JWT认证过滤器
*/
@Component
public class JwtAuthenticationWebFilter implements WebFilter {
private final JwtTokenProvider tokenProvider;
private final ReactiveUserDetailsService userDetailsService;
public JwtAuthenticationWebFilter(JwtTokenProvider tokenProvider,
ReactiveUserDetailsService userDetailsService) {
this.tokenProvider = tokenProvider;
this.userDetailsService = userDetailsService;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String token = resolveToken(exchange.getRequest());
if (token != null && tokenProvider.validateToken(token)) {
String username = tokenProvider.getUsernameFromToken(token);
return userDetailsService.findByUsername(username)
.map(userDetails ->
new UsernamePasswordAuthenticationToken(
userDetails,
null,
userDetails.getAuthorities()
)
)
.flatMap(authentication -> {
// 将认证信息设置到SecurityContext中
return chain.filter(exchange)
.contextWrite(ReactiveSecurityContextHolder
.withAuthentication(authentication));
});
}
return chain.filter(exchange);
}
private String resolveToken(ServerHttpRequest request) {
String bearerToken = request.getHeaders().getFirst("Authorization");
if (bearerToken != null && bearerToken.startsWith("Bearer ")) {
return bearerToken.substring(7);
}
return null;
}
}
/**
* JWT令牌提供者
*/
@Component
public class JwtTokenProvider {
private final String secretKey = "your-secret-key";
private final long validityInMilliseconds = 3600000; // 1小时
public String createToken(String username, List<String> roles) {
Claims claims = Jwts.claims().setSubject(username);
claims.put("roles", roles);
Date now = new Date();
Date validity = new Date(now.getTime() + validityInMilliseconds);
return Jwts.builder()
.setClaims(claims)
.setIssuedAt(now)
.setExpiration(validity)
.signWith(SignatureAlgorithm.HS256, secretKey)
.compact();
}
public String getUsernameFromToken(String token) {
return Jwts.parser()
.setSigningKey(secretKey)
.parseClaimsJws(token)
.getBody()
.getSubject();
}
public boolean validateToken(String token) {
try {
Jws<Claims> claims = Jwts.parser()
.setSigningKey(secretKey)
.parseClaimsJws(token);
return !claims.getBody().getExpiration().before(new Date());
} catch (JwtException | IllegalArgumentException e) {
return false;
}
}
}
/**
* 认证控制器
*/
@RestController
@RequestMapping("/auth")
public class AuthController {
private final JwtTokenProvider tokenProvider;
private final ReactiveAuthenticationManager authenticationManager;
private final PasswordEncoder passwordEncoder;
public AuthController(JwtTokenProvider tokenProvider,
ReactiveAuthenticationManager authenticationManager,
PasswordEncoder passwordEncoder) {
this.tokenProvider = tokenProvider;
this.authenticationManager = authenticationManager;
this.passwordEncoder = passwordEncoder;
}
@PostMapping("/login")
public Mono<ResponseEntity<AuthResponse>> login(@RequestBody LoginRequest loginRequest) {
String username = loginRequest.getUsername();
String password = loginRequest.getPassword();
Authentication authentication = new UsernamePasswordAuthenticationToken(username, password);
return authenticationManager.authenticate(authentication)
.map(auth -> {
String token = tokenProvider.createToken(username,
auth.getAuthorities().stream()
.map(GrantedAuthority::getAuthority)
.collect(Collectors.toList()));
return ResponseEntity.ok(new AuthResponse(token, "Bearer"));
})
.onErrorResume(e ->
Mono.just(ResponseEntity.status(HttpStatus.UNAUTHORIZED).build()));
}
@PostMapping("/register")
public Mono<ResponseEntity<AuthResponse>> register(@RequestBody RegisterRequest registerRequest) {
// 用户注册逻辑
// 这里简化处理,实际项目中需要保存用户到数据库
String username = registerRequest.getUsername();
String encodedPassword = passwordEncoder.encode(registerRequest.getPassword());
// 创建用户并生成token
String token = tokenProvider.createToken(username, Arrays.asList("ROLE_USER"));
return Mono.just(ResponseEntity.ok(new AuthResponse(token, "Bearer")));
}
@GetMapping("/me")
public Mono<ResponseEntity<UserInfo>> getCurrentUser() {
return ReactiveSecurityContextHolder.getContext()
.map(SecurityContext::getAuthentication)
.map(authentication -> {
String username = authentication.getName();
List<String> roles = authentication.getAuthorities().stream()
.map(GrantedAuthority::getAuthority)
.collect(Collectors.toList());
return new UserInfo(username, roles);
})
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.status(HttpStatus.UNAUTHORIZED).build());
}
}
// 认证相关数据模型
class LoginRequest {
private String username;
private String password;
// getters and setters
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public String getPassword() { return password; }
public void setPassword(String password) { this.password = password; }
}
class RegisterRequest {
private String username;
private String password;
private String email;
// getters and setters
}
class AuthResponse {
private String accessToken;
private String tokenType;
public AuthResponse(String accessToken, String tokenType) {
this.accessToken = accessToken;
this.tokenType = tokenType;
}
// getters and setters
public String getAccessToken() { return accessToken; }
public void setAccessToken(String accessToken) { this.accessToken = accessToken; }
public String getTokenType() { return tokenType; }
public void setTokenType(String tokenType) { this.tokenType = tokenType; }
}
class UserInfo {
private String username;
private List<String> roles;
public UserInfo(String username, List<String> roles) {
this.username = username;
this.roles = roles;
}
// getters and setters
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public List<String> getRoles() { return roles; }
public void setRoles(List<String> roles) { this.roles = roles; }
}
推开窗,是别人的车马喧嚣;关上门,才是自家的月光皎洁。在属于自己的节奏里泡茶、读书、慢慢行走,力量,自会在不言中生长。 耐得寂寞蓄力气,终有一朝破土鸣。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
相关文章
暂无评论...


