PG → StarRocks 数据同步问题治理记录

简单记录近期治理 PG 同步到 StarRocks 过程中遇到的各类数据问题及解决方案。


问题一:时间戳转换异常导致同步中断

问题现象

  • Flink Job 从 RUNNING 变为 RESTARTING
  • PG 的 WAL 日志持续堆积
  • 复制槽的 LSN 位点长时间不推进
  • Flink taskmanager 日志中出现异常

问题描述

当前项目中,因为需要将 PG 的时间转换为 SR 时间,不保留秒后面的部分,所以在 Flink Job 中通过
io.debezium.time.MicroTimestamp 承载了时间数据,将其解析为Java时间戳,在这个过程中,原有的代码没有考虑负数时间戳情况下的一个bug,造成了 Flink 内部解析异常。

当 PG 中存在负数微秒时间戳(如 1773-01-01 之前的历史日期)时,Flink 解析会抛出异常导致同步任务中断。

问题根因

原代码使用普通除法和取模运算处理微秒时间戳:

// 原代码(有问题)
Timestamp ts = TimestampData.fromEpochMillis(micro / 1000L,
    (int) (micro % 1000L * 1000L)).toTimestamp();

上述代码遇到时间小于1773年,且最后三位有值,不能被1000整除的情况下,就会报错, 比如0001-01-21 22:28:43.000001
但对于:

  • 0001-01-21 22:28:43.123000
  • 2026-01-21 22:30:52.000001
    均不会报错。

报错代码如下:

摘抄自:org.apache.flink.table.data.TimestampData

    // 源码(判断必须在0~999999之间)
    private TimestampData(long millisecond, int nanoOfMillisecond) {
        Preconditions.checkArgument(nanoOfMillisecond >= 0 && nanoOfMillisecond <= 
        999_999);
        this.millisecond = millisecond;
        this.nanoOfMillisecond = nanoOfMillisecond;
    }

    public static TimestampData fromEpochMillis(long milliseconds) {
        return new TimestampData(milliseconds, 0);
    }

micro 为负数时(如 -6213523500997900L),% 运算会产生负数结果,导致纳秒值为负数,程序异常。

解决方案

使用 Math.floorDivMath.floorMod 替代普通除法运算:

// 修复后
Timestamp ts = TimestampData.fromEpochMillis(Math.floorDiv(micro, 1_000L),
    (int) (Math.floorMod(micro, 1_000L) * 1_000L)).toTimestamp();

问题二:UPDATE 数据丢失

问题现象

  • PG 和 StarRocks 的表数据总行数一致
  • 但部分字段数据不一致,StarRocks 中某些字段的值是旧值
  • 手动在 PG 中更新该记录的任意字段后,StarRocks 能正确同步到最新数据
  • 问题高发于同一事务中先 INSERT 后 UPDATE 的场景

问题描述

当一个 PG 事务内先 INSERT 后 UPDATE 同一条记录时,UPDATE 的数据偶发性丢失。

问题场景

BEGIN;
INSERT INTO table_a (id, name) VALUES (1, 'old_name');
UPDATE table_a SET name = 'new_name' WHERE id = 1;
COMMIT;

现象:StarRocks 中 name 字段仍然是 old_name,UPDATE 没有生效。

问题根因

Flink 并行度导致消息顺序被打乱。

数据流程:

PG CDC (Debezium) → Flink (并行度8) → Kafka → StarRocks
  1. Debezium 解析阶段:单线程顺序解析,INSERT → UPDATE 顺序正确
  2. Flink 处理阶段:并行度=8,同一主键的 INSERT 和 UPDATE 被分发到不同 Task
  3. 不同 Task 处理速度不同:导致 UPDATE 可能比 INSERT 先写入 Kafka
  4. StarRocks 消费 Kafka:UPDATE 先到达但被后到达的 INSERT 覆盖

日志对比验证

通过添加追踪日志,对比同步成功和失败的案例,确认问题根因。

同步失败案例 (key=167301569)

阶段 cdcSeq Thread 说明
CDC-ORDER 659447872 Source (1/1) 第1条
CDC-ORDER 659468025 Source (1/1) 第2条
CDC-ORDER 659504986 Source (1/1) 第3条
KAFKA-SEND 659504986 Process (1/8) 第3条先发送
KAFKA-SEND 659447872 Process (6/8) 第1条后发送

失败原因:没有 keyBy,同一主键被随机分发到不同 Task(1/8 和 6/8),处理速度不同导致顺序错乱。

同步成功案例 (key=167315546)

阶段 cdcSeq Thread 说明
CDC-ORDER 644085015 Source (1/1) 第1条
CDC-ORDER 644127228 Source (1/1) 第2条
PROCESS 644085015 KeyedProcess (3/8) 第1条先处理
KAFKA-SEND 644085015 KeyedProcess (3/8) 第1条先发送
PROCESS 644127228 KeyedProcess (3/8) 第2条后处理
KAFKA-SEND 644127228 KeyedProcess (3/8) 第2条后发送

成功原因:加了 keyBy 后,同一主键的数据固定路由到同一个 Task(3/8),顺序得到保证。

结论

对比项 同步成功 同步失败
是否有 keyBy
同一主键的 Thread 同一个 (3/8) 不同 (1/8, 6/8)
顺序是否正确 正确 错乱

解决方案

核心修复:添加 keyBy(主键),确保同一主键的数据由同一个 Task 顺序处理。

// 关键修改:按主键 keyBy
SingleOutputStreamOperator<JsonRecord> keyedStream = dataStreamSource
    .keyBy(record -> {
        String key = record.getMessageKey();
        return key != null ? key : java.util.UUID.randomUUID().toString();
    })
    .process(new KeyedProcessFunction<>() { ... });

可选的额外保险:设置并行度为 1

性能大幅下降

// 更保守的方案:强制单线程处理
.setParallelism(1)

方案对比

方案 说明 推荐场景
只加 keyBy 同一主键固定到同一 Task,多 Task 并行处理不同主键 数据量大,需要高吞吐
keyBy + setParallelism(1) 所有数据单 Task 处理,绝对顺序保证 数据量小,追求绝对安全

修改后的数据流:

Source(1) → keyBy(主键) → KeyedProcess → SideOutput → Sink