一种基于多数据源的ETL方法及装置与流程

专利2022-05-10  5


一种基于多数据源的etl方法及装置
技术领域
1.本发明涉及数据处理技术领域,具体涉及一种基于多数据源的etl方法及装置。


背景技术:

2.目前大数据etl(数据抽取、转换和加载)的应用越来越广泛,例如电商领域,金融领域和安防领域等。现有技术中,数据接入和存储大多数都为一种数据来源和一个目标数据源定制开发一个etl程序,但是随着数据源越来越多,数据分析的维度也越来越多,所需要开发的程序也越来越多,进而造成开发成本和运维成本直线上升。


技术实现要素:

3.针对现有技术中存在的缺陷,本发明实施例的目的在于提供一种基于多数据源的etl方法及装置。
4.为实现上述目的,第一方面,本发明实施例提供了一种基于多数据源的etl方法,包括:
5.配置自定义数据;
6.选择输出的目的表和字段的清洗规则;其中,所述配置和选择均通过在前端web页面中操作所得;
7.根据所述配置和选择操作生成对应的消息格式,并写入到kafka消息队列;
8.利用spark流式计算框架处理所述kafka消息队列中的数据后,再将处理后的数据入库。
9.作为本技术的一种具体实施方式,所述清洗规则为封装完成的函数;所述清洗规则包括:
10.字段内容清除,删除空格;
11.空值替换,指定值替换为null。
12.作为本技术的一种具体实施方式,所述spark流式计算框架的处理包括以下步骤:
13.根据数据量和实时性的要求设置批处理的时间;
14.将接收的kafka数据,按照预先定义好的数据格式封装为javabean;
15.使用spark的dstream的filter算子根据所述数据格式里的目标数据库字段分为不同的dstream流,再对每个流遍历进行入库操作。
16.作为本技术的一种具体实施方式,对同一批流的数据按照目标表字段分组,使用rdd的groupby算子计算出目标表的数量,然后对目标表们进行遍历,再次使用rdd的filter算子把相同表的数据筛选到一块,每一个rdd都是同一类的目标表;
17.当同一类目标表的数据都在一起后,统一进行清洗操作,取出数组对应位置的字段值,再取出对应清洗字段的值,用清洗规则的方法名反射清洗规则,对字段进行清洗。
18.进一步地,作为本技术的一种优选实施方式,采用sparksql里的dataframe进行入库。
19.第二方面,本发明实施例还提供了一种基于多数据源的etl装置,包括:
20.配置模块,用于配置自定义数据;
21.选择模块,用于选择输出的目的表和字段的清洗规则;其中,所述配置和选择均通过在前端web页面中操作所得;
22.封装模块,用于根据所述配置和选择操作生成对应的消息格式,并写入到kafka消息队列;
23.处理模块,用于利用spark流式计算框架处理所述kafka消息队列中的数据后,再将处理后的数据入库。
24.作为本技术的一种具体实施方式,所述清洗规则为封装完成的函数;所述清洗规则包括:
25.字段内容清除,删除空格;
26.空值替换,指定值替换为null。
27.作为本技术的一种具体实施方式,所述spark流式计算框架的处理包括以下步骤:
28.根据数据量和实时性的要求设置批处理的时间;
29.将接收的kafka数据,按照预先定义好的数据格式封装为javabean;
30.使用spark的dstream的filter算子根据所述数据格式里的目标数据库字段分为不同的dstream流,再对每个流遍历进行入库操作。
31.作为本技术的一种具体实施方式,对同一批流的数据按照目标表字段分组,使用rdd的groupby算子计算出目标表的数量,然后对目标表们进行遍历,再次使用rdd的filter算子把相同表的数据筛选到一块,每一个rdd都是同一类的目标表;
32.当同一类目标表的数据都在一起后,统一进行清洗操作,取出数组对应位置的字段值,再取出对应清洗字段的值,用清洗规则的方法名反射清洗规则,对字段进行清洗。
33.作为本技术的一种具体实施方式,采用sparksql里的dataframe进行入库。
34.实施本发明实施例,主要有益效果如下:
35.(1)使用web页面的方式简化了运维开发人员接数据的操作,不用执行命令,不用修改配置文件,不用执行程序就可以轻松完成多种复杂数据的迁移和接入,降低了学习成本和运维难度;
36.(2)利用制定了数据接入的通用规则,不同库不同表的数据都可以写成此格式,所以可以用这一套流程代码来完成所有的不同源的数据etl操作,大大减少了开发量。
附图说明
37.为了更清楚地说明本发明具体实施方式或现有技术中的技术方案,下面将对具体实施方式或现有技术描述中所需要使用的附图作简单地介绍。
38.图1是本发明实施例所提供的一种基于多数据源的etl方法的流程图;
39.图2是本发明实施例所提供的一种配置示意图;
40.图3是本发明实施例所提供的一种选择输出的目的表的示意图;
41.图4是本发明实施例中所提供的一种数据处理的流程示意图;
42.图5是本发明实施例所提供的一种基于多数据源的etl装置的结构示意图。
具体实施方式
43.下面将结合本发明实施例中的附图,对本发明实施例中的技术方案进行清楚、完整地描述,显然,所描述的实施例是本发明一部分实施例,而不是全部的实施例。基于本发明中的实施例,本领域普通技术人员在没有做出创造性劳动前提下所获得的所有其他实施例,都属于本发明保护的范围。
44.应当理解,当在本说明书和所附权利要求书中使用时,术语“包括”和“包含”指示所描述特征、整体、步骤、操作、元素和/或组件的存在,但并不排除一个或多个其它特征、整体、步骤、操作、元素、组件和/或其集合的存在或添加。
45.需要说明的是,文中的专业术语为计算机领域技术人员通常所理解的含义。
46.如图1至图4所示,本发明实施例提供了一种基于多数据源的etl方法,所述方法包括:
47.s101,配置自定义数据。
48.具体地,用户在web页面自定义选择数据源表,选字段,数据类型,数据长度和精度等。
49.s102,选择输出的目的表和字段的清洗规则;其中,所述配置和选择均通过在前端web页面中操作所得;
50.所述选择输出的目的表,即表示这个表最终要写到哪个表里去。
51.具体地,用户在web页面选择数据清洗规则,目标的数据库,目标的数据表和目标的数据类型。
52.其中,所述清洗规则为封装完成的函数;所述清洗规则包括:
53.字段内容清除,删除空格;
54.空值替换,指定值替换为null。
55.应用时,对于清洗规则部分,大数据开发人员需要一次开发好可能用到的清洗规则,封装成函数,提供给前端展示供用户选择,这样用户就可以动态的对各种字段进行清洗,开发人员只需要在添加新的清洗规则的时候更新一下代码。
56.s103,根据所述配置和选择操作生成对应的消息格式,并写入到kafka消息队列。
57.具体地,用户在前述步骤通过页面操作会生成对应的消息格式从前端传到后端,后端根据用户给出的数据库地址和身份信息去对相应的库的数据进行采集,并根据用户指定的字段名和类型封装成固定的消息格式,从数据源库传数据到kafka消息队列;kafka消息队列,等待大数据的程序来取数据;
58.设计数据格式(即消息格式)好处是一条kafka数据可以带着多条源数据,节省了网络io,并且这种数据格式完全和业务脱离,只需要在spark数据处理程序中取值就可以,spark程序不会关注字段具体的名字和字段类型等,只会根据设计好的模式处理数据。
59.s104,利用spark流式计算框架处理所述kafka消息队列中的数据后,再将处理后的数据入库。
60.具体地,所述spark流式计算框架的处理包括以下步骤:
61.根据数据量和实时性的要求设置批处理的时间;
62.将接收的kafka数据,按照预先定义好的数据格式封装为javabean;
63.使用spark的dstream的filter算子根据所述数据格式里的目标数据库字段分为
不同的dstream流,再对每个流遍历进行入库操作;
64.即,在spark数据清洗阶段,根据数据量和对实时性的要求设置好批处理的时间,间隔太短对产生大量小文件。接到kafka数据以后,按照预先定义好的数据格式封装为javabean。由于kafka里各种数据都有,第一步我们要先把不同的目标数据库分开,使用spark的dstream的filter算子根据数据格式里的目标数据库字段分为不同的dstream流,因为目标数据库不会太多,所以在这里只需要对每个流遍历进行入库操作。
65.进一步地,对同一批流的数据按照目标表字段分组,使用rdd的groupby算子计算出目标表的数量,然后对目标表们进行遍历,再次使用rdd的filter算子把相同表的数据筛选到一块,每一个rdd都是同一类的目标表;
66.当同一类目标表的数据都在一起后,统一进行清洗操作,取出数组对应位置的字段值,再取出对应清洗字段的值,用清洗规则的方法名反射清洗规则,对字段进行清洗。
67.当目标库相同的数据都在一个流里以后,就是入不同的表了。由于我们不确定有多少张表,所以第一步操作是对这一批流的数据按照目标表字段分组,可以使用rdd的groupby算子,把总共有多少张目标表计算出来。然后对目标表们进行遍历,再次使用rdd的filter算子把相同的表的数据筛选到一块。假设分组出7个表来,那么就对7个表遍历,使用了7次filter算子,筛出了7个rdd,每一个rdd都是同一类的目标表。
68.当同一类目标表的数据都在一起以后,就可以统一进行清洗操作了,取出数组对应位置的字段值,再取出对应清洗字段的值,用清洗规则的方法名反射清洗规则对字段进行清洗,这里采用了反射,避免了繁琐的代码,并把清洗规则抽成单独的一个类的不同方法,更加便于维护。
69.最后,清洗完以后,数据就可以入库了,相比于使用标准sql进行入库,这里我们采用的是sparksql里的dataframe进行入库。相比于jdbc方式写文件,sparksql采用直接向hdfs写文件的方式,比jdbc那种通过基于tcp的thrift方式更快一些,减少了解包等操作。使用dataframe方式需要构建schema和row。根据用户传过来的消息格式,按顺序取出字段名和字段类型,构建成schema,根据用户传过来的字段值,构建成row。然后根据schema和row构建成dataframe,调用dataframe的方法就可以把数据写到目标数据库里。
70.支持jdbc连接方式的都可以使用dataframe方式写数据入库,不支持jdbc方式的只能通过各自通用的方式入库。
71.另外还需说明的是,需要提前在hive里建好表(即在数据仓库里建表,由于数据仓库有很多种类,目前我们使用的是hive数据库和雷霆数据库,此方法也可以使用其他种类的数据仓库),把每次报错的地方数据和报错类型收集起来,写到hive的错误表里,定期对错误进行统计并进行评审,以便于可以定期对错误数据评审,发现代码中隐藏的bug,从而修改或优化代码,减少类似错误的发生,并且把报错的数据收集起来,防止关键性的数据因为代码bug而造成数据丢失,从而可以保证数据不会丢失。
72.上述方案,通过使用web页面的方式简化了运维开发人员接数据的操作,不用执行命令,不用修改配置文件,不用执行程序就可以轻松完成多种复杂数据的迁移和接入,降低了学习成本和运维难度;
73.利用制定了数据接入的通用规则,不同库不同表的数据都可以写成此格式,所以可以用这一套流程代码来完成所有的不同源的数据etl操作,大大减少了开发量。
74.基于相同的发明构思,参见图5,本发明实施例还提供了一种基于多数据源的etl装置,由于这些装置解决问题的原理与一种基于多数据源的etl方法相似,因此这些装置的具体实施可以参见方法的实施步骤,重复之处不再赘述。
75.该装置包括:
76.配置模块,用于配置自定义数据;
77.选择模块,用于选择输出的目的表和字段的清洗规则;其中,所述配置和选择均通过在前端web页面中操作所得;所述清洗规则为封装完成的函数;所述清洗规则包括:
78.字段内容清除,删除空格;
79.空值替换,指定值替换为null。
80.封装模块,用于根据所述配置和选择操作生成对应的消息格式,并写入到kafka消息队列;
81.后端根据用户给出的数据库地址和身份信息去对相应的库的数据进行采集,并根据用户指定的字段名和类型封装成固定的消息格式,从数据源库传数据到kafka消息队列;kafka消息队列,以等待大数据的程序来取数据。
82.处理模块,用于利用spark流式计算框架处理所述kafka消息队列中的数据后,再将处理后的数据入库。
83.即,根据数据量和实时性的要求设置批处理的时间;
84.将接收的kafka数据,按照预先定义好的数据格式封装为javabean;
85.使用spark的dstream的filter算子根据所述数据格式里的目标数据库字段分为不同的dstream流,再对每个流遍历进行入库操作;
86.对同一批流的数据按照目标表字段分组,使用rdd的groupby算子计算出目标表的数量,然后对目标表们进行遍历,再次使用rdd的filter算子把相同表的数据筛选到一块,每一个rdd都是同一类的目标表;
87.当同一类目标表的数据都在一起后,统一进行清洗操作,取出数组对应位置的字段值,再取出对应清洗字段的值,用清洗规则的方法名反射清洗规则,对字段进行清洗;同时,采用sparksql里的dataframe进行入库。
88.上述方案,应用在存在多个数据源的情况下,代替传统开发多个程序对相应的数据进行清洗和入库,不再需要多套程序的开发和部署了,即可实现与多套程序取得同样效果,并且在对某些数据清洗规则需要做改变的情况下,也可以使用本发明的方法代替传统的修改代码的方法,大大减少了开发量;
89.使用web页面的方式简化了运维开发人员接数据的操作,不用执行程序就可以轻松完成多种复杂数据的迁移和接入,降低了学习成本和运维难度。
90.以上所述,仅为本发明的具体实施方式,但本发明的保护范围并不局限于此,任何熟悉本技术领域的技术人员在本发明揭露的技术范围内,可轻易想到各种等效的修改或替换,这些修改或替换都应涵盖在本发明的保护范围之内。因此,本发明的保护范围应以权利要求的保护范围为准。
转载请注明原文地址: https://doc.8miu.com/read-1350349.html

最新回复(0)