Skip to content

使用 R2DBC 进行数据访问 (Data Access with R2DBC)

R2DBC("Reactive Relational Database Connectivity")是一个由社区驱动的规范,旨在利用响应式模式(Reactive Patterns)标准化对 SQL 数据库的访问。

包层级结构 (Package Hierarchy)

Spring Framework 的 R2DBC 抽象框架由两个不同的包组成:

  • core: org.springframework.r2dbc.core 包包含 DatabaseClient 类及各种相关类。参见 使用 R2DBC 核心类
  • connection: org.springframework.r2dbc.connection 包包含用于简化 ConnectionFactory 访问的工具类,以及各种简单的 ConnectionFactory 实现(主要用于测试和运行未修改的 R2DBC 代码)。参见 控制数据库连接

使用 R2DBC 核心类控制基础 R2DBC 处理和错误处理

本节介绍如何使用 R2DBC 核心类来控制基础逻辑,包括错误处理。

使用 DatabaseClient

DatabaseClient 是 R2DBC 核心包中的中心类。它负责资源的创建和释放,避免诸如忘记关闭连接等常见错误。它执行核心流程(如语句创建和执行),而应用代码只需提供 SQL 并提取结果。

DatabaseClient 的主要功能:

  • 运行 SQL 查询。
  • 执行更新语句和存储过程调用。
  • Result 实例进行迭代。
  • 捕获 R2DBC 异常并将其翻译为 org.springframework.dao 包中定义的更通用的异常体系。

该客户端提供了一套基于响应式类型的函数式流式 API。

创建 DatabaseClient

最简单的方法是使用静态工厂方法:

java
DatabaseClient client = DatabaseClient.create(connectionFactory);
kotlin
val client = DatabaseClient.create(connectionFactory)

提示

目前支持的数据库包括:H2, MariaDB, Microsoft SQL Server, MySQL, Postgres。

执行语句

以下示例展示了创建一个新表的最小功能代码:

java
Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
		.then();
kotlin
// 使用协程
client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
		.await()

查询 (SELECT)

你可以使用 fetch() 操作符来指定要消耗的数据量。

java
// 获取第一行数据
Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
		.fetch().first();

// 使用绑定变量
Mono<Map<String, Object>> joe = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
		.bind("fn", "Joe")
		.fetch().first();
kotlin
val first = client.sql("SELECT id, name FROM person")
		.fetch().awaitSingle()

数据消耗操作符:

  • first(): 返回整个结果的第一行(Flux -> Mono)。
  • one(): 返回确切的一行结果,如果结果包含多行则失败。
  • all(): 返回所有行。
  • rowsUpdated(): 返回受影响的行数(用于 INSERT/UPDATE/DELETE)。

更新 (INSERT, UPDATE, DELETE)

对于修改语句,通常使用 rowsUpdated() 来获取结果:

java
Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
		.bind("fn", "Joe")
		.fetch().rowsUpdated();
kotlin
val affectedRows = client.sql("UPDATE person SET first_name = :fn")
		.bind("fn", "Joe")
		.fetch().awaitRowsUpdated()

控制数据库连接

使用 ConnectionFactory

Spring 通过 ConnectionFactory 获取 R2DBC 连接。这类似于 JDBC 的 DataSource

对于生产环境,建议使用第三方连接池实现,例如 R2DBC Pool (r2dbc-pool)。Spring 自带的简单实现主要用于测试。

使用 R2dbcTransactionManager

R2dbcTransactionManager 是针对单个 R2DBC ConnectionFactoryReactiveTransactionManager 实现。它将连接绑定到订阅者上下文(Subscriber Context)中。


补充教学

1. 为什么是 R2DBC 而不是 JDBC?

传统的 JDBC 是阻塞的(Blocking)。每一个数据库查询都会占用一个线程,直到数据库返回结果。在高并发场景下,这会导致线程耗尽。 R2DBC 是非阻塞、响应式的。它允许你在等待数据库响应时释放线程去处理其他请求。它是 Spring WebFlux 栈中实现全链路响应式(End-to-End Reactive)的关键环。

2. 绑定标记(Bind Markers)的差异

JDBC 统一使用 ?。但在 R2DBC 中,不同的数据库有不同的原生格式:

  • Postgres: 使用 $1, $2, $n
  • SQL Server: 使用 @name
  • MySQL/H2: 使用 ?

Spring 的优势DatabaseClient 允许你使用统一的 :name 命名参数语法,它会自动根据底层数据库类型转换为正确的绑定标记。这提高了代码的可移植性。

3. 处理 null 的限制

根据物理响应式流规范,不能发射 null。 如果数据库返回了 NULL 字段,在提取数据时必须小心。如果你使用 map(row -> row.get("field")) 且该字段为 NULL,由于 Reactor 不允许发射 NULL,会抛出异常。 最佳实践:始终将可能为 NULL 的值包装在 Optional 中,或者使用 fetch().first() 返回的 Mono<Map>(Map 的值可以为 null 引用,但发射的 Map 实例本身不是 null)。

Based on Spring Framework.