Flinkx源码

本地模式

  1. 创建flink流式计算执行环境类:StreamExecutionEnvironment,配置参数来源于confProp参数的值
  2. 创建flink流式计算表环境类:StreamTableEnvironment,配置参数来源于confProp参数的值
  3. 配置StreamExecutionEnvironment
  4. 获取数据来源信息,创建来源的DataStream
  5. 获取数据流向信息,创建流向的DataStreamSink

重要类

DtInputFormatSourceFunction 实现整体的操作逻辑,使用BaseRichInputFormat对象进行对数据源的各种操作;
BaseRichInputFormat 由各个数据源实现BaseRichInputFormat,实现具体业务逻辑
DataSyncFactoryUtil 根据数据源名称,通过反射机制获取到对应的数据源工厂类SourceFactory
SourceFactory 每个数据源实现工厂类,通过createSource方法获取到数据源DataStream
SinkFactory 每个数据源实现工厂类,通过createSink方法获取到数据输出DataStreamSink

断点续传

断点续传和实时采集都依赖于flink的checkpoint机制
checkpoint机制
checkpoint机制
Checkpoint触发时,会向多个分布式的Stream Source中插入一个Barrier标记,这些Barrier会随着Stream中的数据记录一起流向下游的各个Operator。当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录。因为一个Operator可能存在多个输入的Stream,而每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐),在等待所有Barrier到达的过程中,Operator的Buffer中可能已经缓存了一些比Barrier早到达Operator的数据记录(Outgoing Records),这时该Operator会将数据记录(Outgoing Records)发射(Emit)出去,作为下游Operator的输入,最后将Barrier对应Snapshot发射(Emit)出去作为此次Checkpoint的结果数据。

flink状态管理

  1. 实现接口CheckpointedFunction
  2. 在任务启动时,会调用initializeState,获取到状态列表ListState
  3. 在flink触发checkpoint时,会自动调用snapshotState方法,将当前任务的状态放入状态列表ListState中进行持久化保存
  4. 在任务重启时,会调用initializeState,将状态列表ListState中的数据恢复到内存中
  5. inputFormat和outputFormat在开始执行时,从formatStateMap中获取到初始化的状态信息,在任务的执行过程中,保存当前任务的执行状态,在snapshotState方法调用时进行持久化

实时采集

Mysql Binlog实时采集采用Alibaba的开源框架canal,能够实时获取binlog日志内容并进行解析。
  1. 创建MysqlEventParser解析器,设置连接信息等参数
  2. 自定义解析类BinlogEventSink,将获取的数据转换成RowData
  3. InputFormat将RowData作为source

增量读取

如何获取起始位置?—配置json时必须指定startLocation的值

queryStartLocation的作用是什么?执行的sql长什么样?

通过设置递增的字段与初始的startLocation的值进行数据的筛选,获取大于startLocation 的数据,来进行增量数据的读取

脏值处理

流量控制

使用flink的累加器记录全局的累计读取条数与累计读取字节数的指标,根据这些指标实时更新本地的指标值,采用guava的RateLimit作为限流器

RateLimit

RateLimiter使用的是令牌桶算法,就是以固定的频率生成令牌,线程执行之前先申请固定数量的令牌,令牌数量不足时会阻塞
初始令牌设置1000row/s,每隔一秒根据指标值更新令牌的生成数量,也就是流量速度的控制。
根据累加器获取的指标计算当前子任务的流量: 每条数据的平均字节数=总的读取字节数/总的读取条数 子任务流量=总的读取条数/每条数据的平均字节数*子任务读取条数的占比
将计算出的流量作为新的令牌数量,来进行流量的控制

分片读取

继承flink的RichInputFormat抽象类,该类实现了InputFormat接口,通过实现createInputSplits方法,进行自定义的分片方式

kafka-connector

column与codec是如何起作用的?