大数据 2022 年 8 月 28 日
Flinkx源码解析
对Flinkx中的几个主要功能的源码进行分析
Flinkx源码
本地模式
- 创建flink流式计算执行环境类:StreamExecutionEnvironment,配置参数来源于confProp参数的值
- 创建flink流式计算表环境类:StreamTableEnvironment,配置参数来源于confProp参数的值
- 配置StreamExecutionEnvironment
- 获取数据来源信息,创建来源的DataStream
- 获取数据流向信息,创建流向的DataStreamSink
重要类
DtInputFormatSourceFunction
实现整体的操作逻辑,使用BaseRichInputFormat对象进行对数据源的各种操作;
BaseRichInputFormat
由各个数据源实现BaseRichInputFormat,实现具体业务逻辑
DataSyncFactoryUtil
根据数据源名称,通过反射机制获取到对应的数据源工厂类SourceFactory
SourceFactory
每个数据源实现工厂类,通过createSource方法获取到数据源DataStream
SinkFactory
每个数据源实现工厂类,通过createSink方法获取到数据输出DataStreamSink
断点续传
flink checkpoint机制
断点续传和实时采集都依赖于flink的checkpoint机制

checkpoint机制
Checkpoint触发时,会向多个分布式的Stream Source中插入一个Barrier标记,这些Barrier会随着Stream中的数据记录一起流向下游的各个Operator。当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录。因为一个Operator可能存在多个输入的Stream,而每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐),在等待所有Barrier到达的过程中,Operator的Buffer中可能已经缓存了一些比Barrier早到达Operator的数据记录(Outgoing Records),这时该Operator会将数据记录(Outgoing Records)发射(Emit)出去,作为下游Operator的输入,最后将Barrier对应Snapshot发射(Emit)出去作为此次Checkpoint的结果数据。
flink状态管理
- 实现接口CheckpointedFunction
- 在任务启动时,会调用initializeState,获取到状态列表ListState
- 在flink触发checkpoint时,会自动调用snapshotState方法,将当前任务的状态放入状态列表ListState中进行持久化保存
- 在任务重启时,会调用initializeState,将状态列表ListState中的数据恢复到内存中
- inputFormat和outputFormat在开始执行时,从formatStateMap中获取到初始化的状态信息,在任务的执行过程中,保存当前任务的执行状态,在snapshotState方法调用时进行持久化
实时采集
Mysql Binlog实时采集采用Alibaba的开源框架canal,能够实时获取binlog日志内容并进行解析。
- 创建MysqlEventParser解析器,设置连接信息等参数
- 自定义解析类BinlogEventSink,将获取的数据转换成RowData
- InputFormat将RowData作为source
增量读取
如何获取起始位置?—配置json时必须指定startLocation的值
queryStartLocation的作用是什么?执行的sql长什么样?
通过设置递增的字段与初始的startLocation的值进行数据的筛选,获取大于startLocation 的数据,来进行增量数据的读取
脏值处理
流量控制
使用flink的累加器记录全局的累计读取条数与累计读取字节数的指标,根据这些指标实时更新本地的指标值,采用guava的RateLimit作为限流器
RateLimit
RateLimiter使用的是
令牌桶算法,就是以固定的频率生成令牌,线程执行之前先申请固定数量的令牌,令牌数量不足时会阻塞初始令牌设置1000row/s,每隔一秒根据指标值更新令牌的生成数量,也就是流量速度的控制。
根据累加器获取的指标计算当前子任务的流量:
每条数据的平均字节数=总的读取字节数/总的读取条数
子任务流量=总的读取条数/每条数据的平均字节数*子任务读取条数的占比
将计算出的流量作为新的令牌数量,来进行流量的控制
分片读取
继承flink的RichInputFormat抽象类,该类实现了InputFormat接口,通过实现createInputSplits方法,进行自定义的分片方式
kafka-connector
column与codec是如何起作用的?
版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
作者: OnlyWaitY 发表日期:2022 年 8 月 28 日