使用 R2DBC 进行数据访问 (Data Access with R2DBC)
R2DBC("Reactive Relational Database Connectivity")是一个由社区驱动的规范,旨在利用响应式模式(Reactive Patterns)标准化对 SQL 数据库的访问。
包层级结构 (Package Hierarchy)
Spring Framework 的 R2DBC 抽象框架由两个不同的包组成:
- core:
org.springframework.r2dbc.core包包含DatabaseClient类及各种相关类。参见 使用 R2DBC 核心类。 - connection:
org.springframework.r2dbc.connection包包含用于简化ConnectionFactory访问的工具类,以及各种简单的ConnectionFactory实现(主要用于测试和运行未修改的 R2DBC 代码)。参见 控制数据库连接。
使用 R2DBC 核心类控制基础 R2DBC 处理和错误处理
本节介绍如何使用 R2DBC 核心类来控制基础逻辑,包括错误处理。
使用 DatabaseClient
DatabaseClient 是 R2DBC 核心包中的中心类。它负责资源的创建和释放,避免诸如忘记关闭连接等常见错误。它执行核心流程(如语句创建和执行),而应用代码只需提供 SQL 并提取结果。
DatabaseClient 的主要功能:
- 运行 SQL 查询。
- 执行更新语句和存储过程调用。
- 对
Result实例进行迭代。 - 捕获 R2DBC 异常并将其翻译为
org.springframework.dao包中定义的更通用的异常体系。
该客户端提供了一套基于响应式类型的函数式流式 API。
创建 DatabaseClient
最简单的方法是使用静态工厂方法:
DatabaseClient client = DatabaseClient.create(connectionFactory);val client = DatabaseClient.create(connectionFactory)提示
目前支持的数据库包括:H2, MariaDB, Microsoft SQL Server, MySQL, Postgres。
执行语句
以下示例展示了创建一个新表的最小功能代码:
Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.then();// 使用协程
client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
.await()查询 (SELECT)
你可以使用 fetch() 操作符来指定要消耗的数据量。
// 获取第一行数据
Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
.fetch().first();
// 使用绑定变量
Mono<Map<String, Object>> joe = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
.bind("fn", "Joe")
.fetch().first();val first = client.sql("SELECT id, name FROM person")
.fetch().awaitSingle()数据消耗操作符:
first(): 返回整个结果的第一行(Flux -> Mono)。one(): 返回确切的一行结果,如果结果包含多行则失败。all(): 返回所有行。rowsUpdated(): 返回受影响的行数(用于 INSERT/UPDATE/DELETE)。
更新 (INSERT, UPDATE, DELETE)
对于修改语句,通常使用 rowsUpdated() 来获取结果:
Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
.bind("fn", "Joe")
.fetch().rowsUpdated();val affectedRows = client.sql("UPDATE person SET first_name = :fn")
.bind("fn", "Joe")
.fetch().awaitRowsUpdated()控制数据库连接
使用 ConnectionFactory
Spring 通过 ConnectionFactory 获取 R2DBC 连接。这类似于 JDBC 的 DataSource。
对于生产环境,建议使用第三方连接池实现,例如 R2DBC Pool (r2dbc-pool)。Spring 自带的简单实现主要用于测试。
使用 R2dbcTransactionManager
R2dbcTransactionManager 是针对单个 R2DBC ConnectionFactory 的 ReactiveTransactionManager 实现。它将连接绑定到订阅者上下文(Subscriber Context)中。
补充教学
1. 为什么是 R2DBC 而不是 JDBC?
传统的 JDBC 是阻塞的(Blocking)。每一个数据库查询都会占用一个线程,直到数据库返回结果。在高并发场景下,这会导致线程耗尽。 R2DBC 是非阻塞、响应式的。它允许你在等待数据库响应时释放线程去处理其他请求。它是 Spring WebFlux 栈中实现全链路响应式(End-to-End Reactive)的关键环。
2. 绑定标记(Bind Markers)的差异
JDBC 统一使用 ?。但在 R2DBC 中,不同的数据库有不同的原生格式:
- Postgres: 使用
$1,$2,$n。 - SQL Server: 使用
@name。 - MySQL/H2: 使用
?。
Spring 的优势:DatabaseClient 允许你使用统一的 :name 命名参数语法,它会自动根据底层数据库类型转换为正确的绑定标记。这提高了代码的可移植性。
3. 处理 null 的限制
根据物理响应式流规范,不能发射 null 值。 如果数据库返回了 NULL 字段,在提取数据时必须小心。如果你使用 map(row -> row.get("field")) 且该字段为 NULL,由于 Reactor 不允许发射 NULL,会抛出异常。 最佳实践:始终将可能为 NULL 的值包装在 Optional 中,或者使用 fetch().first() 返回的 Mono<Map>(Map 的值可以为 null 引用,但发射的 Map 实例本身不是 null)。