APP推广合作
联系“鸟哥笔记小乔”
传统金融业务场景下Flink实时计算的探索与实践? by鸣宇淳
2021-01-27 21:21:34


超长文警告!本文7000字,含架构图和各种解决方案的尝试,以及详细代码。,最后还有电子书和各种分享ppt下载,请在wifi下观看。土豪随意


我是鸣宇淳,一个大数据架构师。今天给大家分享一下我在传统金融业务场景中,用Flink的特性,帮助业务解决实际问题的案例。


Flink 有很好的支持事件时间、有状态的计算、可以灵活设置窗口等特点,受到业界的广泛追捧。我所在的公司并不是典型的互联网公司,而是偏传统的金融公司,我也想探索一下怎么能利用 Flink 这个好的工具来服务公司的业务,所以对 Flink 的应用场景做了一番尝试。

    下面我将探索的过程记录下来,一是分享下自己的经历,二是希望跟同行交流,还望轻拍。




业务背景



我公司业务场景跟市面上大部分 Flink 使用场景不太一样的地方在于以下几点:


1. 表单类数据比较多

要处理的数据里,日志类型的数据很少,基本上都是表单类的业务数据,这类数据会经常变化。


2. 数据更新周期长

一条业务数据可能需要几天或者几个月后,才有一条更新操作,所以不能更好地利用 Flink 的窗口,而 Flink 更适合于那种插入后很快就会有更新的情况。


3. 维表数据变化时,历史统计结果数据也要跟着变

维表数据如果变了,统计结果数据都要跟着变,这也是传统业务中常见要求。


4. 数据准确度要求非常高

因为是金融业务,多数场景都和金钱相关,所以要求计算出的数据准确。

以上几条都是我们传统金融业务中比较特别的场景。

另外,由于现有人员多数是精通 SQL 的开发人员,而精通 Java、Scala 开发的并不多,所以决定尽量不使用 Flink API 的方式开发 Flink 实时计算。所以自研了一个基于 Flink 的实时计算平台,主要功能有使用 Flink SQL 创建各类 Source和Sink的 Flink 表、创建 Flink 任务、提交 Flink 任务到 Yarn 上执行、任务监控等,另外也可以基于 Flink API 开发,将 jar 表上传到实时计算平台上执行。


以上是大致背景。接下来是我们具体的探索和实践过程。




实时数据处理流程



我们根据业务需求,决定把Flink作为实时计算平台。前面通过Kafka收集和传输数据,通过Flink计算平台处理好之后,把数据扔到MySQL、ES、HBase等存储落地,或者继续扔到Kafka中等待下一步计算。

以此,我们就可以画出实时数据处理的整体流程:

  1. 数据来源有两类:一类是业务数据,绝大部分来源于 MySQL,这部分数据通过 Canal 将 Binlog 数据实时写入 Kafka中;另外一类是少量的日志数据,这些数据通过 Flume 实时采集后,也写入 Kafka中。

  2. 在实时计算平台上,通过 Flink SQL 定义好 source 表、sink 表,指定 Kafka 来源和要写入的目标库。

  3. 或者是将开发的 Flink 程序打成 jar 包后上传到实时计算平台上。

  4. 在实时计算平台上创建任务,运行各种业务计算。

  5. 如果计算结果为最终数据,就 sink 到 MySQL、ES、HBase 等存储中用于展示,如果计算结果数据还需要进行下一步计算,那么就 sink 到 Kafka 中。




应用场景一:基于单表计算


0. 需求

业务背景:以金融业务中常见的还款计划表为例。比如有一张客户还款计划表,里面每一条数据是一期要还款的记录,每个单号有多行数据,主要字段有:主键、单号、期号、计划还款日期、实际还款日期、应还金额、是否逾期、还款状态等。


具体业务需求是:通过实时计算,统计出每个单号的首期月供、首期还款日期、首次逾期日期、当前期号、实际已还金额、剩余金额等统计数据,每个单号一行结果。

大体需求如下:

1.实现

这个需求使用 Flink Java API 来实现,程序大体流程如下:

简单来说:

1. 一共设计了两张 HBase 表:保存所有明细数据的 HBase 表、保存实时统计结果的 HBase 表。

2. 对于 Kafka 传递过来的每一条 binlog 日志,不管是 insert 还是 update,都在窗口内保存到 Flink 的状态里,并进行合并,只保留每个单子最新一条 binlog 日志的全量字段值。

3. 当窗口关闭的时候,从明细 HBase 表中读取相应单号的数据,和 Flink 状态中的最新数据进行合并,并将合并后的最新数据更新到 HBase 明细表中,完成实时同步。

4. 同时对这个窗口 Flink 状态中涉及到的单号,使用最新的数据进行统计,然后将统计结果 Sink 到 HBase 结果表中。


代码总体逻辑如下:

    public static void main(String[] args) throws Exception {

        //定义一个Flink Stream执行环境对象

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        //设置checkpoint

        env.enableCheckpointing(600 * 1000);


        //设置状态后端

        String chkpointPath = "hdfs://ha/xxx/checkpoint";

        StateBackend backend = new RocksDBStateBackend(chkpointPath);

        env.setStateBackend(backend);


        //设置事件时间        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


        //Kafka的ip和要消费的topic

        String kafkaIPs = "********";

        String topic = "plan_topic";


        //Kafka设置

        Properties props = new Properties();

        props.setProperty("bootstrap.servers", kafkaIPs);

        props.setProperty("group.id", "group.cyb.1");

        FlinkKafkaConsumer>> consumer =

                new FlinkKafkaConsumer>>(topic, new KafkaDeserializationScheme(), props);


        //设置水位线        AssignerWithPeriodicWatermarks>> watermarks =

                new PeriodicTsAndWmarks<>();

        consumer.assignTimestampsAndWatermarks(watermarks);


        consumer.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor>>(Time.seconds(5)) {

            @Override

            public long extractTimestamp(KafkaData> element) {

                return element.getTs();

            }

        });

        //传入参数,指定从哪个时间戳开始消费Kafka

        long startTs = Long.parseLong(args[0]);

        //设置从指定时间戳开始消费

        consumer.setStartFromTimestamp(startTs);


        SingleOutputStreamOperator ds = env

                //指定Source

                .addSource(consumer)

                //筛选数据,只是保留指定表的数据

                .filter(f -> f.getTable().toLowerCase().equals("plan_cust") && !f.getIsDdl())

                //因为kafka里的一条数据可能包括多条insert、update、delete记录,所以binlog日志是一对多的结构

                //这里进行打平,一条kafka日志输出多条记录

                .flatMap(new DataFlatMap())

                //根据单号进行分组

                .keyBy(RepayPlanCustOut::getApply_no)

                //定义窗口分配器,根据EventTime进行滚动。

                .window(TumblingEventTimeWindows.of(Time.seconds(20)))

                //增量聚合

                //.aggregate(new CustAggregateFunction())

                //延迟指定时间后,窗口才释放

                //.allowedLateness(Time.seconds(5))

                //对每个窗口内的所有元素执行函数操作

                .process(new CustProcessWindowFun())

                //指定并行度

                .setParallelism(15)

                //因为使用了lamada表达式,所以需要指定返回类型

                .returns(new TypeHint() {

                });


        //输出到自定义的Sink

        ds.writeUsingOutputFormat(new SinkOutFormat())

                //指定并行度

                .setParallelism(15);


        //提交执行Flink任务,指定任务名称

        env.execute("planMain-1");

    }


2. 数据初始化和数据测试

数据初始化和数据测试的流程如下:

数据初始化

不像常见 Flink 实时计算场景那样,上线时不用考虑历史数据问题,我这个业务场景中,在程序首次上线或者迭代上线的时候,需要将历史数据也做初始化,初始化工作包括将历史数据同步到 HBase 明细表中、将统计结果数据初始化到 HBase 统计结果表中。

数据初始化采取的方案是:使用 Hive 中的历史数据同步到 HBase 表中。

一是创建一个 HBase Hive 外部表,将明细数据 Insert 到 HBase 明细表中;

二是使用 Hive 写同样的 统计脚本,将统计结果 Insert 到 HBase 统计结果表中;

数据测试

对于统计结果的测试,使用一个 Python 脚本,分别读取 MySQL 业务备库中的数据、HBase 统计结果中的数据,然后进行自动对比。

以上就是对于单表实时计算的介绍,下面来介绍更为复杂的多表关联场景。




应用场景二:基于多表关联计算


0. 需求

存储在 MySQL 业务库中的好几张业务表,包括:合同主表、**表、产品方案表等等,基于这些表,需要实时进行关联,得到的结果如下图所示:

1. 探索:方案一

首先想到的最简单的方式就是这样,涉及到的每个从表都使用 Flink 做实时同步到 HBase 中,关联键为 RowKey。然后 Flink 对于主表的每一条 binlog 日志进行读取后,关联 HBase 中的每个从表,得到的结果写入 HBase 结果表或者传递给下游 Kafka。

这个方案优点是逻辑简单,缺点是维表数据有延迟、要创建很多 HBase表,尤其是有多种关联方式时。

2. 探索:方案二

方案一中使用 HBase 当维表是因为当时使用的 Flink 1.5 中不支持 MySQL 当维表,后来实时计算平台的 Flink 升级到了 1.10后,可以使用 MySQL 当维表,抛弃了 HBase 那就简单多了,程序处理流程变为了上面的样子。


(1) 只有主表实时接入到Topic。

(2) Flink读取主表Topic数据,实时关联MySQL中维表。

(3) 将关联结果写入MySQL结果表中。

(4) 可以同时将关联结果写入Kafka,用做下一步计算。


Flink 实时计算程序直接读取 MySQL 主表和维表进行计算,这样维表的更新没有了延迟。


但是后来发现有个问题,就是当维表数据更新时,不能触发计算,不能更新结果。

3. 探索:方案三

由于方案二中有个致命问题,就是由于 Flink 是事件驱动的,只有主表更新时才触发计算,而维表数据变化时,也需要更新历史老数据(这个估计是传统业务场景中的特有的要求),所以形成了方案三,如上图所示:

(1) 所有表都实时接入到 Topic 。

(2) 每个表的接入都要触发计算,用结果表当维表。

(3) 将关联结果写入 MySQL 结果表中。

(4) 可以同时将关联结果写入 Kafka ,用做下一步计算。


这样主表、维表数据更新都可以触发计算,但是还有个缺点,就是结果表每次都是对全部字段读取后,再全部字段更新,效率低。

4. 探索:方案四


怎么解决方案三中效率低的问题呢,然后就有了方案四:


(1) 所有表都实时接入到 Topic 。

(2) 每个表的接入都要触发计算,用结果表当维表关联,更新结果表。只更新各自相关字段。

(3) 将关联结果写入 MySQL 结果表中。


这样主表、维表数据更新都可以触发计算,提高了执行效率,但是呢也是有问题的,就是步骤较多,程序复杂。




总结


Flink总体使用体感还是很舒服的,速度很快,跟其他组件对接也很顺畅,尤其是对于事件时间、有状态的计算等特性非常好用。

但是我们的一些场景也不太符合Flink的特性,比如数据更新周期特别长、数据准确的要求非常高等,所以我们也只是在部分场景进行了尝试性的探索。

对于单表计算场景,我们用canal实时读取MySQL的BinLog,经由Kafka输入到Flink中进行计算,最后写入HBase的结果表中。这个体验还是非常舒服的。

对于多表关联计算场景,我们先后尝试了4种解决方案,各有利弊。

最早的方案是所有需要关联的表都在Flink中做实时关联,然后存到HBase中。当时用的是Flink1.5,不支持MySQL当维表,所以还用Flink实时同步数据到HBase,做成实时维度。这个方案看上去简单,但是维表数据经常延迟,关联不上。

然后我们把Flink更新到1.10,直接用MySQL的表当维表,比早期的方案更简化了,维表也没有延迟的问题。但是随之发现维表更新的时候,不能触发计算。这是因为Flink是事件驱动,只有主表更新时才会触发,维表更新的时候就不动了,这个比较恼火。

因为这个缺陷严重影响业务,所以我们又做了一次尝试,把所有表都放到Kafka中,然后再到Flink中进行多表关联。这样每个表更新都可以触发计算。这样基本上需求就都能满足了。

但是上面的方案依然有缺点,结果表每次都会读取全面字段后再进行更新,效率不够高。

于是我们再次调整方案,依然是把结果表当维表关联,但是每个任务只更新自己相关的字段,这样就规避了前面说的效率问题。


整个过程看下来,Flink能解决咱的业务场景的问题,实时性也非常好。但是一旦到了某个具体的场景中,就会有无数问题等着你,解决一个,又来一个,考验的不仅是你对Flink的熟悉程度,还有遇到问题解决问题的能力,更考验架构的能力。

这不正如我们的人生么?总是有一个有一个的挑战等着我们。只要你持续精进,终归是能找到解决办法的。加油!


我是大数据架构师鸣宇淳,欢迎你跟我一起聊大数据,我的微信号:chybin001。


扩展阅读:1本Flink电子书+9份分享ppt,公众号“大数据架构师”后台回复“Flink”即可下载。

    

配合以下文章享受更佳








干货 | 一口气讲完数据仓库建模方法


干货 | 如何搭建一个数据仓库


【资料包】 实时数仓架构选型资料包


【实战】 手摸手搭建一个实时数据仓库


【干货】 数仓到底要分多少层?



我需要你的转发,小小的满足一下我的虚荣心

大数据架构师
分享到朋友圈
收藏
收藏
评分

综合评分:

我的评分
Xinstall 15天会员特权
Xinstall是专业的数据分析服务商,帮企业追踪渠道安装来源、裂变拉新统计、广告流量指导等,广泛应用于广告效果统计、APP地推与CPS/CPA归属统计等方面。
20羽毛
立即兑换
一书一课30天会员体验卡
领30天VIP会员,110+门职场大课,250+本精读好书免费学!助你提升职场力!
20羽毛
立即兑换
顺丰同城急送全国通用20元优惠券
顺丰同城急送是顺丰推出的平均1小时送全城的即时快送服务,专业安全,准时送达!
30羽毛
立即兑换
大数据架构师
大数据架构师
发表文章272
历任多家公司大数据总监、大数据架构师,专注于数字化转型领域。
确认要消耗 羽毛购买
传统金融业务场景下Flink实时计算的探索与实践? by鸣宇淳吗?
考虑一下
很遗憾,羽毛不足
我知道了

我们致力于提供一个高质量内容的交流平台。为落实国家互联网信息办公室“依法管网、依法办网、依法上网”的要求,为完善跟帖评论自律管理,为了保护用户创造的内容、维护开放、真实、专业的平台氛围,我们团队将依据本公约中的条款对注册用户和发布在本平台的内容进行管理。平台鼓励用户创作、发布优质内容,同时也将采取必要措施管理违法、侵权或有其他不良影响的网络信息。


一、根据《网络信息内容生态治理规定》《中华人民共和国未成年人保护法》等法律法规,对以下违法、不良信息或存在危害的行为进行处理。
1. 违反法律法规的信息,主要表现为:
    1)反对宪法所确定的基本原则;
    2)危害国家安全,泄露国家秘密,颠覆国家政权,破坏国家统一,损害国家荣誉和利益;
    3)侮辱、滥用英烈形象,歪曲、丑化、亵渎、否定英雄烈士事迹和精神,以侮辱、诽谤或者其他方式侵害英雄烈士的姓名、肖像、名誉、荣誉;
    4)宣扬恐怖主义、极端主义或者煽动实施恐怖活动、极端主义活动;
    5)煽动民族仇恨、民族歧视,破坏民族团结;
    6)破坏国家宗教政策,宣扬邪教和封建迷信;
    7)散布谣言,扰乱社会秩序,破坏社会稳定;
    8)宣扬淫秽、色情、赌博、暴力、凶杀、恐怖或者教唆犯罪;
    9)煽动非法集会、结社、游行、示威、聚众扰乱社会秩序;
    10)侮辱或者诽谤他人,侵害他人名誉、隐私和其他合法权益;
    11)通过网络以文字、图片、音视频等形式,对未成年人实施侮辱、诽谤、威胁或者恶意损害未成年人形象进行网络欺凌的;
    12)危害未成年人身心健康的;
    13)含有法律、行政法规禁止的其他内容;


2. 不友善:不尊重用户及其所贡献内容的信息或行为。主要表现为:
    1)轻蔑:贬低、轻视他人及其劳动成果;
    2)诽谤:捏造、散布虚假事实,损害他人名誉;
    3)嘲讽:以比喻、夸张、侮辱性的手法对他人或其行为进行揭露或描述,以此来激怒他人;
    4)挑衅:以不友好的方式激怒他人,意图使对方对自己的言论作出回应,蓄意制造事端;
    5)羞辱:贬低他人的能力、行为、生理或身份特征,让对方难堪;
    6)谩骂:以不文明的语言对他人进行负面评价;
    7)歧视:煽动人群歧视、地域歧视等,针对他人的民族、种族、宗教、性取向、性别、年龄、地域、生理特征等身份或者归类的攻击;
    8)威胁:许诺以不良的后果来迫使他人服从自己的意志;


3. 发布垃圾广告信息:以推广曝光为目的,发布影响用户体验、扰乱本网站秩序的内容,或进行相关行为。主要表现为:
    1)多次发布包含售卖产品、提供服务、宣传推广内容的垃圾广告。包括但不限于以下几种形式:
    2)单个帐号多次发布包含垃圾广告的内容;
    3)多个广告帐号互相配合发布、传播包含垃圾广告的内容;
    4)多次发布包含欺骗性外链的内容,如未注明的淘宝客链接、跳转网站等,诱骗用户点击链接
    5)发布大量包含推广链接、产品、品牌等内容获取搜索引擎中的不正当曝光;
    6)购买或出售帐号之间虚假地互动,发布干扰网站秩序的推广内容及相关交易。
    7)发布包含欺骗性的恶意营销内容,如通过伪造经历、冒充他人等方式进行恶意营销;
    8)使用特殊符号、图片等方式规避垃圾广告内容审核的广告内容。


4. 色情低俗信息,主要表现为:
    1)包含自己或他人性经验的细节描述或露骨的感受描述;
    2)涉及色情段子、两性笑话的低俗内容;
    3)配图、头图中包含庸俗或挑逗性图片的内容;
    4)带有性暗示、性挑逗等易使人产生性联想;
    5)展现血腥、惊悚、残忍等致人身心不适;
    6)炒作绯闻、丑闻、劣迹等;
    7)宣扬低俗、庸俗、媚俗内容。


5. 不实信息,主要表现为:
    1)可能存在事实性错误或者造谣等内容;
    2)存在事实夸大、伪造虚假经历等误导他人的内容;
    3)伪造身份、冒充他人,通过头像、用户名等个人信息暗示自己具有特定身份,或与特定机构或个人存在关联。


6. 传播封建迷信,主要表现为:
    1)找人算命、测字、占卜、解梦、化解厄运、使用迷信方式治病;
    2)求推荐算命看相大师;
    3)针对具体风水等问题进行求助或咨询;
    4)问自己或他人的八字、六爻、星盘、手相、面相、五行缺失,包括通过占卜方法问婚姻、前程、运势,东西宠物丢了能不能找回、取名改名等;


7. 文章标题党,主要表现为:
    1)以各种夸张、猎奇、不合常理的表现手法等行为来诱导用户;
    2)内容与标题之间存在严重不实或者原意扭曲;
    3)使用夸张标题,内容与标题严重不符的。


8.「饭圈」乱象行为,主要表现为:
    1)诱导未成年人应援集资、高额消费、投票打榜
    2)粉丝互撕谩骂、拉踩引战、造谣攻击、人肉搜索、侵犯隐私
    3)鼓动「饭圈」粉丝攀比炫富、奢靡享乐等行为
    4)以号召粉丝、雇用网络水军、「养号」形式刷量控评等行为
    5)通过「蹭热点」、制造话题等形式干扰舆论,影响传播秩序


9. 其他危害行为或内容,主要表现为:
    1)可能引发未成年人模仿不安全行为和违反社会公德行为、诱导未成年人不良嗜好影响未成年人身心健康的;
    2)不当评述自然灾害、重大事故等灾难的;
    3)美化、粉饰侵略战争行为的;
    4)法律、行政法规禁止,或可能对网络生态造成不良影响的其他内容。


二、违规处罚
本网站通过主动发现和接受用户举报两种方式收集违规行为信息。所有有意的降低内容质量、伤害平台氛围及欺凌未成年人或危害未成年人身心健康的行为都是不能容忍的。
当一个用户发布违规内容时,本网站将依据相关用户违规情节严重程度,对帐号进行禁言 1 天、7 天、15 天直至永久禁言或封停账号的处罚。当涉及欺凌未成年人、危害未成年人身心健康、通过作弊手段注册、使用帐号,或者滥用多个帐号发布违规内容时,本网站将加重处罚。


三、申诉
随着平台管理经验的不断丰富,本网站出于维护本网站氛围和秩序的目的,将不断完善本公约。
如果本网站用户对本网站基于本公约规定做出的处理有异议,可以通过「建议反馈」功能向本网站进行反馈。
(规则的最终解释权归属本网站所有)

我知道了
恭喜你~答对了
+5羽毛
下一次认真读哦
成功推荐给其他人
+ 10羽毛
评论成功且进入审核!审核通过后,您将获得10羽毛的奖励。分享本文章给好友阅读最高再得15羽毛~
(羽毛可至 "羽毛精选" 兑换礼品)
好友微信扫一扫
复制链接