简单记录近期治理 PG 同步到 StarRocks 过程中遇到的各类数据问题及解决方案。
问题一:时间戳转换异常导致同步中断
问题现象
- Flink Job 从 RUNNING 变为 RESTARTING
- PG 的 WAL 日志持续堆积
- 复制槽的 LSN 位点长时间不推进
- Flink taskmanager 日志中出现异常
问题描述
当前项目中,因为需要将 PG 的时间转换为 SR 时间,不保留秒后面的部分,所以在 Flink Job 中通过
io.debezium.time.MicroTimestamp 承载了时间数据,将其解析为Java时间戳,在这个过程中,原有的代码没有考虑负数时间戳情况下的一个bug,造成了 Flink 内部解析异常。
当 PG 中存在负数微秒时间戳(如 1773-01-01 之前的历史日期)时,Flink 解析会抛出异常导致同步任务中断。
问题根因
原代码使用普通除法和取模运算处理微秒时间戳:
// 原代码(有问题)
Timestamp ts = TimestampData.fromEpochMillis(micro / 1000L,
(int) (micro % 1000L * 1000L)).toTimestamp();
上述代码遇到时间小于1773年,且最后三位有值,不能被1000整除的情况下,就会报错, 比如0001-01-21 22:28:43.000001 。
但对于:
0001-01-21 22:28:43.1230002026-01-21 22:30:52.000001
均不会报错。
报错代码如下:
摘抄自:org.apache.flink.table.data.TimestampData
// 源码(判断必须在0~999999之间)
private TimestampData(long millisecond, int nanoOfMillisecond) {
Preconditions.checkArgument(nanoOfMillisecond >= 0 && nanoOfMillisecond <=
999_999);
this.millisecond = millisecond;
this.nanoOfMillisecond = nanoOfMillisecond;
}
public static TimestampData fromEpochMillis(long milliseconds) {
return new TimestampData(milliseconds, 0);
}
当 micro 为负数时(如 -6213523500997900L),% 运算会产生负数结果,导致纳秒值为负数,程序异常。
解决方案
使用 Math.floorDiv 和 Math.floorMod 替代普通除法运算:
// 修复后
Timestamp ts = TimestampData.fromEpochMillis(Math.floorDiv(micro, 1_000L),
(int) (Math.floorMod(micro, 1_000L) * 1_000L)).toTimestamp();
问题二:UPDATE 数据丢失
问题现象
- PG 和 StarRocks 的表数据总行数一致
- 但部分字段数据不一致,StarRocks 中某些字段的值是旧值
- 手动在 PG 中更新该记录的任意字段后,StarRocks 能正确同步到最新数据
- 问题高发于同一事务中先 INSERT 后 UPDATE 的场景
问题描述
当一个 PG 事务内先 INSERT 后 UPDATE 同一条记录时,UPDATE 的数据偶发性丢失。
问题场景:
BEGIN;
INSERT INTO table_a (id, name) VALUES (1, 'old_name');
UPDATE table_a SET name = 'new_name' WHERE id = 1;
COMMIT;
现象:StarRocks 中 name 字段仍然是 old_name,UPDATE 没有生效。
问题根因
Flink 并行度导致消息顺序被打乱。
数据流程:
PG CDC (Debezium) → Flink (并行度8) → Kafka → StarRocks
- Debezium 解析阶段:单线程顺序解析,INSERT → UPDATE 顺序正确
- Flink 处理阶段:并行度=8,同一主键的 INSERT 和 UPDATE 被分发到不同 Task
- 不同 Task 处理速度不同:导致 UPDATE 可能比 INSERT 先写入 Kafka
- StarRocks 消费 Kafka:UPDATE 先到达但被后到达的 INSERT 覆盖
日志对比验证
通过添加追踪日志,对比同步成功和失败的案例,确认问题根因。
同步失败案例 (key=167301569)
| 阶段 | cdcSeq | Thread | 说明 |
|---|---|---|---|
| CDC-ORDER | 659447872 |
Source (1/1) | 第1条 |
| CDC-ORDER | 659468025 |
Source (1/1) | 第2条 |
| CDC-ORDER | 659504986 |
Source (1/1) | 第3条 |
| KAFKA-SEND | 659504986 |
Process (1/8) | 第3条先发送 |
| KAFKA-SEND | 659447872 |
Process (6/8) | 第1条后发送 |
失败原因:没有 keyBy,同一主键被随机分发到不同 Task(1/8 和 6/8),处理速度不同导致顺序错乱。
同步成功案例 (key=167315546)
| 阶段 | cdcSeq | Thread | 说明 |
|---|---|---|---|
| CDC-ORDER | 644085015 |
Source (1/1) | 第1条 |
| CDC-ORDER | 644127228 |
Source (1/1) | 第2条 |
| PROCESS | 644085015 |
KeyedProcess (3/8) | 第1条先处理 |
| KAFKA-SEND | 644085015 |
KeyedProcess (3/8) | 第1条先发送 |
| PROCESS | 644127228 |
KeyedProcess (3/8) | 第2条后处理 |
| KAFKA-SEND | 644127228 |
KeyedProcess (3/8) | 第2条后发送 |
成功原因:加了 keyBy 后,同一主键的数据固定路由到同一个 Task(3/8),顺序得到保证。
结论
| 对比项 | 同步成功 | 同步失败 |
|---|---|---|
| 是否有 keyBy | 有 | 无 |
| 同一主键的 Thread | 同一个 (3/8) | 不同 (1/8, 6/8) |
| 顺序是否正确 | 正确 | 错乱 |
解决方案
核心修复:添加 keyBy(主键),确保同一主键的数据由同一个 Task 顺序处理。
// 关键修改:按主键 keyBy
SingleOutputStreamOperator<JsonRecord> keyedStream = dataStreamSource
.keyBy(record -> {
String key = record.getMessageKey();
return key != null ? key : java.util.UUID.randomUUID().toString();
})
.process(new KeyedProcessFunction<>() { ... });
可选的额外保险:设置并行度为 1
性能大幅下降
// 更保守的方案:强制单线程处理
.setParallelism(1)
方案对比
| 方案 | 说明 | 推荐场景 |
|---|---|---|
只加 keyBy |
同一主键固定到同一 Task,多 Task 并行处理不同主键 | 数据量大,需要高吞吐 |
keyBy + setParallelism(1) |
所有数据单 Task 处理,绝对顺序保证 | 数据量小,追求绝对安全 |
修改后的数据流:
Source(1) → keyBy(主键) → KeyedProcess → SideOutput → Sink