车海快讯

多种数据同步方案在七猫的实践

admin 98
背景

伴随着七猫的发展,大数据团队针对不同业务场景和不同数据功能,调研和运用多种技术栈和数据库来来解决了各类数据存放和使用的问题。本文将七猫大数据团队在实践过程中的一些开发技巧和代码示例进行整理,一是用于技术的沉淀记录,二是希望通过该文章能帮助大家对各种数据库同步场景所有启发。

数据同步概览篇章1.离线数据同步

该篇章介绍七猫在各数据库中进行数据相互同步的实践例子。

MySQL2Hive

1.小数据量导入数仓
对于千万级别及以下的数据,为了简化开发,七猫选择了Sqoop来进行数据同步。

使用例子:

导入新starrocks集群invokeERROR:")()}}finally{try{()()}catch{caseex:Exception={println("step1先将数据从oss-hdfs同步至ossoss-hdfs不支持直接导入sr该命令如果分区相同则是覆盖写hadoopjar/opt/apps/JINDOSDK///tools/\--srcoss://xxxx/user/hive/warehouse/${sr_db}.db/${sr_table}/dt=${input_date}\--destxxxx/user/hive/warehouse/${sr_db}.db/${sr_table}/dt=${input_date}\--=xxxx\--=3280\--=60000000\--=xxxx\--=xxxx\--parallelism500\--jobBatch50000\--taskBatch2\--bandWidth100brokeload写入srload_path="oss://xxxx/user/hive/warehouse/${sr_db}.db/${sr_table}/dt=${input_date}/*"access_key_id="xxxx"access_key_secret="xxxx"_point="xxxx"load_info="LOADLABEL${label_name}(DATAINFILE('$load_path')INTOTABLE${sr_table}FORMATAS${sr_format}(--字段col1,col2,col3)--从路径中获取分区信息作为第一个字段COLUMNSFROMPATHAS(dt))WITHBROKER(''='${access_key_id}',''='${access_key_secret}',''='${_point}');"===========================导入状态检查=====================================循环获取导入状态直到finished结束或者达到一定次数结束while[x$finish_state!=x${get_state}];do如果超过10次则直接跳出获取状态的程序直接结束120次1个小时if[[${_count_condition}-gt120]];thenecho"导入失败,已超出导入时间"breakficasex$get_stateinxCANCELLED)echo"导入失败,已被取消"break;;x${finish_state})echo"导入成功"break;;esacsleep30doneecho"导入结束"
Hive2ClickHouse

通过Spark的jdbc功能,七猫开发通用工具进行数据导入。

工具代码:

_{DataFrame,SparkSession}{Logger,LoggerFactory}{privatevallogger:Logger=()privatevalckDriver=""defmain(args:Array[String]):Unit={valoptions:Options=("ckHost",true,"clickhouse-host")("ckPort",true,"clickhouse端口,默认8123")("ckUser",true,"clickhouse用户名")("ckPass",true,"clickhouse密码")("ckDb",true,"clickhouse数据库名")("ckTable",true,"clickhouse表名")("ckPartition",true,"clickhouse分区字段名")("ckPartitionValue",true,"clickhouse表分区")("hiveTable",true,"hive表名")("hiveColumns",true,"hive查询列")("hivePartition",true,"hive分区字段名")("hivePartitionValue",true,"hive分区值")("ckTableType",true,"clickhouse表类型,默认是非集群版,可填写cluster")("parallelize",true,"write并行度,默认是8")("batchSize",true,"每批写入条数,默认是100000")valparser=newPosixParservalline=(options,args)if(!("ckHost")){("缺少必要ck_host参数");(1)}elseif(!("ckUser")){("缺少必要ck_user参数");(1)}elseif(!("ckPass")){("缺少必要ck_pass参数");(1)}elseif(!("ckDb")){("缺少必要ck_db参数");(1)}elseif(!("ckTable")){("缺少必要ck_table参数");(1)}elseif(!("hiveTable")){("缺少必要hive_table参数");(1)}elseif(!("hiveColumns")){("缺少必要hive_columns参数");(1)}valckTableType=if(("ckTableType")"cluster".equals(("ckTableType")))"cluster"else"single"valwriteParallelize=if(("parallelize"))("parallelize").toIntelse8valbatchSize=if(("batchSize"))("batchSize").toIntelse100000valckHost=("ckHost")valckPort=if(("ckPort"))("ckPort")else"8123"valckUser=("ckUser")valckPass=("ckPass")valckDb=("ckDb")valckTb=("ckTable")valckUrl=s"jdbc:clickhouse://$ckHost:$ckPort/$ckDb"valckProp=newProperties()("driver",ckDriver)("user",ckUser)("password",ckPass)valhiveTable:String=("hiveTable")valhivePartition:String=("hivePartition")valhivePartitionValue:String=("hivePartitionValue")valcolumns:String=("hiveColumns")valckPartition:String=("ckPartition")valckPartitionValue:String=("ckPartitionValue")valspark=().enableHiveSupport().appName("zhuyitian-CommercialAdvMinutes").config("","").config("","10").getOrCreate()vardf:DataFrame=nullvarhiveCondition:String=nullvarckCondition:String=nullif((hivePartition)(ckPartitionValue)){//hive和ck都指定了分区,删除ck分区数据,倒入hive指定分区数据dropClickHousePartition(newClickHouseDataSource(ckUrl,ckProp),ckDb,ckTb,ckPartitionValue,ckTableType)df=(s"select$columnsfrom$hiveTablewhere$hivePartition='$hivePartitionValue'")println(s"select$columnsfrom$hiveTablewhere$hivePartition='$hivePartitionValue'")hiveCondition=s"$hivePartition='$hivePartitionValue'"ckCondition=s"$ckPartition='$ckPartitionValue'"}elseif((hivePartition)(ckPartitionValue)){//hive指定了分区,清空ck表,倒入指定分区数据truncateClickHouseTable(newClickHouseDataSource(ckUrl,ckProp),ckDb,ckTb,ckTableType)df=(s"select$columnsfrom$hiveTablewhere$hivePartition='$hivePartitionValue'")println(s"select$columnsfrom$hiveTablewhere$hivePartition='$hivePartitionValue'")hiveCondition=s"$hivePartition='$hivePartitionValue'"}else{//hive和ck均未指定分区,全表清空,全表覆盖truncateClickHouseTable(newClickHouseDataSource(ckUrl,ckProp),ckDb,ckTb,ckTableType)df=(s"select$columnsfrom$hiveTable")println(s"select$columnsfrom$hiveTable")}()(writeParallelize).(saveMode="app").option("batchsize",batchSize).option("isolationLevel","NONE")//设置事务.jdbc(ckUrl,s"$ckDb.$ckTb",ckProp)checkClickhouseDataNum(hiveTable,s"$ckDb.$ckTb",hiveCondition,ckCondition,spark,newClickHouseDataSource(ckUrl,ckProp))}privatedefdropClickHousePartition(chDatasource:ClickHouseDataSource,ckDb:String,ckTb:String,partition:String,ckTableType:String):Unit={try{valchConn=()valdropSQL=ckTableTypematch{case"single"=s"ALTERTABlE$ckDb.$ckTbDROPPARTITION'$partition'"case"cluster"=s"ALTERTABlE$ckDb.${ckTb}_localDROPPARTITION'$partition'"case_=s"ALTERTABlE$ckDb.$ckTbDROPPARTITION'$partition'"}println(dropSQL)valpsmt=(dropSQL)()(dropSQL)}catch{casee:Exception=()("删除clickhouse分区失败,程序退出")(1)}}privatedeftruncateClickHouseTable(chDatasource:ClickHouseDataSource,ckDb:String,ckTb:String,ckTableType:String):Unit={try{valchConn=()valtruncateSQL=ckTableTypematch{case"single"=s"TRUNCATETABlE$ckDb.$ckTb"case"cluster"=s"TRUNCATETABlE$ckDb.${ckTb}_local"case_=s"TRUNCATETABlE$ckDb.$ckTb"}println(truncateSQL)valpsmt=(truncateSQL)()(truncateSQL)}catch{casee:Exception=()("清空clickhouse表数据失败,程序退出")(1)}}/***检测Clickhouse的数据量是否和hive数据量一致*/privatedefcheckClickhouseDataNum(hiveTableName:String,ckTableName:String,hivePartition:String,ckPartition:String,spark:SparkSession,chDatasource:ClickHouseDataSource):Unit={("entercheckClickhouseDataNum")varhiveSql:String=s"SELECTcount(1)FROM$hiveTableName"varckSql:String=s"SELECTcount(1)FROM$ckTableName"if((hivePartition)){hiveSql=(s"WHERE$hivePartition")}if((ckPartition)){ckSql=(s"WHERE$ckPartition")}("hiveSql:{}",hiveSql)("ckSql:{}",ckSql)valdf=(hiveSql)valchConn==();valchResult=(ckSql)();if(().getLong(0).longValue()!=(1).longValue()){("checkClickhouseDataNumfailed,hive-count:{},ck-count:{}",().getLong(0),(1))(1)}("checkClickhouseDataNumsuccess,hive-count:{},ck-count:{}",().getLong(0),(1))}}
篇章2.实时数据任务

在七猫大数据开发中,优先使用FlinkSql作为实时任务的主要技术栈。该篇章会介绍一下FlinkSql的各种Source和Sink的写法与运用。通过不同的Source和Sink组合就可以实现各数据库之间的数据同步。

KafkaAsSource

例子:

kafka中数据(json格式):{"json_array_string":[{"id":"1","params":{"col1":"0","col2":"0"}}],"json_map_string":{"col3":"0","col4":"0"},"ts":0}
CREATETABLEkafka_source_table_bigdata_reader_smallproject(json_array_stringARRAYROWidSTRING,paramsMAPSTRING,STRING,json_map_stringMAPSTRING,STRING,tsbigint,--水位线的设置ts_ltzASTO_TIMESTAMP_LTZ(ts,0),WATERMARKFORts_ltzASts_ltz-INTERVAL'5'SECOND)WITH('connector'='kafka','topic'='topic',''='ip1:port1,ip2:port2,ip3:port3',''='groupid',''='group-offsets',--''='1684771200000',''='180','format'='json',''='false',''='true');

基础参数介绍:
1.connector:指定使用的连接器,Kafka连接器使用'kafka'。


2.topic:当表用作source时读取数据的topic名。亦支持用分号间隔的topic列表,如'topic-1;topic-2'。注意,对source表而言,'topic'和'topic-pattern'两个选项只能使用其中一个。当表被用作sink时,该配置表示写入的topic名。注意sink表不支持topic列表。


3.topic-pattern:匹配读取topic名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的topic都将被Kafkaconsumer订阅。


4.:逗号分隔的Kafkabroker列表。


5.:Kafkasource的消费组id。如果未指定消费组ID,则会使用自动生成的"KafkaSource-{tableIdentifier}"作为消费组ID。


6.format:用来序列化或反序列化Kafka消息的格式。请参阅格式页面以获取更多关于格式的细节和相关配置项。注意:该配置项和''二者必需其一。


7.:序列化和反序列化Kafka消息体时使用的格式。


8.:Kafkaconsumer的启动模式。有效值为:'earliest-offset','latest-offset','group-offsets','timestamp'和'specific-offsets'。

MySQLCDCasSource

原理:通过FlinkSQL来监听MySQL的binlog,获取MySQL的数据。

例子:

CREATETABLEsource_table(--元数据写法database_nameSTRINGMETADATAVIRTUAL,--table_nameSTRINGMETADATAVIRTUAL,col1STRING,col2STRINGPRIMARYKEY(id)NOTENFORCED)WITH('connector'='mysql-cdc','hostname'='xxxx','port'='xxxx','username'='xxxx','password'='xxxx',''='10',''='true','server-id'='7100-7120','database-name'='xxxx','table-name'='xxxx_[0-9]+')

基础参数介绍:
1.connector:指定要使用的连接器,这里应该是'mysql-cdc'.


2.hostname:MySQL数据库服务器的IP地址或主机名。


3.username:连接到MySQL数据库服务器时要使用的MySQL用户的名称。


4.password:连接MySQL数据库服务器时使用的密码。


5.database-name:要监视的MySQL服务器的数据库名称。数据库名称还支持正则表达式。


6.table-name:需要监视的MySQL数据库的表名。表名支持正则表达式。


7.:增量快照是一种读取表快照的新机制,与旧的快照机制相比,增量快照有许多优点,包括:(1)在快照读取期间,Source支持并发读取,(2)在快照读取期间,Source支持进行chunk粒度的checkpoint,(3)在快照读取之前,Source不需要数据库锁权限。如果希望Source并行运行,则每个并行Readers都应该具有唯一的Serverid,所以Serverid必须是类似5400-6400的范围,并且该范围必须大于并行度。


8.server-id:读取数据使用的serverid,serverid可以是个整数或者一个整数范围,比如'5400'或'5400-5408',建议在''参数为启用时,配置成整数范围。因为在当前MySQL集群中运行的所有slave节点,标记每个salve节点的id都必须是唯一的。所以当连接器加入MySQL集群作为另一个slave节点(并且具有唯一id的情况下),它就可以读取binlog。默认情况下,连接器会在5400和6400之间生成一个随机数,但是我们建议用户明确指定Serverid。

支持的元数据:
table_name:当前记录所属的Mysql表名称。
database_name:当前记录所属的库名称。
op_ts:当前记录表在数据库中更新的时间。如果从表的快照而不是binlog读取记录,该值将始终为0。

MySQLasSource

使用场景:Mysql数据作为维度表使用。

例子:

CREATETABLEdim_topic_info(idBIGINT,is_delINT,`status`INT,tag_idsSTRING,analysis_tagSTRING,PRIMARYKEY(id)NOTENFORCED)WITH('connector'='mysql','hostname'='xxxx','port'='xxxx','username'='xxxx','password'='xxxx','database-name'='xxxx','table-name'='xxxx',''='ALL',''='5min')

基础参数介绍:
1.:
支持以下三种缓存策略:
None(默认值):无缓存。
LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据。如果没有找到,则去物理维表中查找。使用该缓存策略时,必须配置参数。
ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。适用于远程表数据量小且MISSKEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。


2.:当选择LRU缓存策略后,必须设置缓存大小。当选择ALL缓存策略后,可以不设置缓存大小。


3.:的配置和有关,详情如下:
如果配置为None,则可以不配置,表示缓存不超时。
如果配置为LRU,则为缓存超时时间。默认不过期。
如果配置为ALL,则为缓存加载时间。默认不重新加载。
填写时请使用时间格式,例如1min或10s。

StarRocksAsSource

例子:

CREATETABLEflink_test(`id`INT,`name`STRING,`score`INT)WITH('connector'='starrocks','scan-url'='192.168.:8030','jdbc-url'='jdbc:mysql://192.168.:9030','username'='xxxxxx','password'='xxxxxx','database-name'='test','table-name'='score_board',''='1000',''='10',''='600',''='1073741824',''='1');

连接参数同MysqlSource
特殊参数介绍:
1.:FlinkConnector连接StarRocks集群的时间上限。单位:毫秒。默认值:1000。超过该时间上限,则数据读取任务会报错
2.:数据读取任务的保活时间,通过轮询机制定期检查。单位:分钟。默认值:10。建议取值大于等于5
3.:数据读取任务的超时时间,在任务执行过程中进行检查。单位:秒。默认值:600。如果超过该时间,仍未返回读取结果,则停止数据读取任务
4.:BE节点中单个查询的内存上限。单位:字节。默认值:1073741824(即1GB)
5.:数据读取失败时的最大重试次数。默认值:1。超过该数量上限,则数据读取任务报错

注:
1.仅支持使用部分SQL语句读取StarRocks中的数据,如SELECTFROMtable_nameWHERE。暂不支持除count以外的聚合函数。
2.使用SQL语句时,支持自动进行谓词下推。如过滤条件char_1'A'andint_1=-126,会下推到FlinkConnector中并转换成适用于StarRocks的语句后,再执行查询,不需要额外配置。
3.不支持LIMIT语句。
4.StarRocks暂时不支持Checkpoint机制。因此,如果读取任务失败,则无法保证数据一致性

RedisAsSource

使用场景:Redis数据作为维度表使用。

例子:

CREATETEMPORARYTABLEredis_dim(idSTRING,nameSTRING,PRIMARYKEY(id)NOTENFORCED--Redis中的RowKey字段。)WITH('connector'='redis','host'='xxxx','port'='xxxx','password'='xxxx','cache'='LRU');

基础参数介绍:
1.connector:固定值为redis。
2.host:RedisServer连接地址。
3.post:RedisServer连接端口。
4.password:Redis数据库密码。
5.dbNum:选择操作的数据库编号。
6.mode:对应Redis的数据结构。一般为string或者hash

维度独有参数:
1.cache:支持以下两种缓存策略:
None(默认值):无缓存。
LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
2.cacheSize:选择LRU缓存策略后,可以设置缓存大小。默认10000
3.cacheTTLMs:cacheTTLMs配置和cache有关:如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。

注:
Redis维表必须声明且只能声明一个主键。
Redis维表仅支持声明两个字段,且字段类型必须为STRING。
Redis维表仅支持读取Redis数据存储中STRING和HASHMAP类型的数据。
Redis维表JOIN时,ON条件必须包含所有主键的等值条件。
Redis维表仅支持None和LRU两种缓存策略。

KafkaAsSink

例子:

CREATETABLEkafka_sink_table(col1string,col2string)WITH('connector'='kafka','topic'='topic',''='ip1:port1,ip2:port2,ip3:port3',''='60','format'='json',''='round-robin',''='10');

基础参数介绍:
1.:
default:使用Kafka默认的分区器对消息进行分区;
fixed:每个Flinkpartition最终对应最多一个Kafkapartition;
round-robin:Flinkpartition按轮循(round-robin)的模式对应到Kafkapartition;只有当未指定消息的消息键时生效。
自定义FlinkKafkaPartitioner的子类:例如''。
2.:定义Kafkasink算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。

注:当Source源是mysql-cdc时获取的数据类型为Changelog格式,所以需要指定format=debezium-json

StarRocksAsSink

例子:

1.sink为默认csv格式(数据中无特殊字符建议使用)CREATETABLEsink_table(database_nameSTRING,table_nameSTRING,idbigint,PRIMARYKEY(col1)NOTENFORCED)WITH('connector'='starrocks','jdbc-url'='jdbc:mysql://node1:port1,node2:port2,node3:port3?characterEncoding=utf-8useSSL=false','load-url'='node1:port1,node2:port2,node3:port3?','database-name'='xxxx','table-name'='xxxx','username'='xxxx','password'='xxxx',''='500000',''='94371840',''='120000','_separator'='col_sep',''='3');2.sink为json格式(数据中可能出现特殊字符时建议使用)CREATETABLEsink_table(col1date,col2string,col3string)WITH('connector'='starrocks','jdbc-url'='jdbc:mysql://node1:port1,node2:port2,node3:port3?characterEncoding=utf-8useSSL=false','load-url'='node1:port1,node2:port2,node3:port3?','database-name'='xxxx','table-name'='xxxx','username'='xxxx','password'='xxxx',''='1000000',''='94371840',''='120000',''='3',''='5',''='json','_outer_array'='true','_json_size'='true');

注:参数代表StarRocks写入时的格式,默认为csv。当数据中有类似分隔符或者换行符这种特殊符号时,csv格式会将行和列解析错误,该情况建设设置成json格式使用(会加大资源消耗,按需调整资源大小)

sink独有参数介绍:
1.:积攒在内存的数据条数,达到该阈值后数据通过StreamLoad一次性导入StarRocks。取值范围:[64000,5000000]


2.:积攒在内存的数据大小,达到该阈值后数据通过StreamLoad一次性导入StarRocks。取值范围:[64MB,10GB]。将此参数设置为较大的值可以提高导入性能,但可能会增加导入延迟。该参数只在为at-least-once才会生效。为exactly-once,则只有Flinkcheckpoint触发时flush内存的数据,因此该参数不生效。


3.:数据发送的间隔,用于控制数据写入StarRocks的延迟,取值范围:[1000,3600000]。该参数只在为at-least-once才会生效。


4._separator:CSV数据的列分隔符。


5.:StreamLoad失败后的重试次数。超过该数量上限,则数据导入任务报错。取值范围:[0,10]。该参数只在为V1才会生效。

json格式独有参数介绍:
1._outer_array:用于指定是否裁剪最外层的数组结构。取值范围:true和false。默认值:false。真实业务场景中,待导入的JSON数据可能在最外层有一对表示数组结构的中括号[]。这种情况下,一般建议您指定该参数取值为true,这样StarRocks会剪裁掉外层的中括号[],并把中括号[]里的每个内层数组都作为一行单独的数据导入


2._json_size:用于指定是否检查HTTP请求中JSONBody的大小。HTTP请求中JSONBody的大小默认不能超过100MB。如果JSONBody的大小超过100MB,会提示"Thesizeofthisbatchexceedthemaxsize[104857600]ofjsontypedatadata[8617627793].Setignore_json_sizetoskipcheck,althoughitmayleadhugememoryconsuming."错误。为避免该报错,可以在HTTP请求头中添加"ignore_json_size:true"设置,忽略对JSONBody大小的检查。

RedisAsSink

例子:

hash类型:CREATETABLEredis_sink(keySTRING,hash_keySTRING,hash_valueSTRING,PRIMARYKEY(key)NOTENFORCED--必填。)WITH('connector'='redis','host'='xxxx','dbNum'='xxxx','mode'='hashmap','expiration'='86400000');string类型:CREATETABLEredis_sink(source_uidSTRING,json_infoSTRING,PRIMARYKEY(source_uid)NOTENFORCED--必填。)WITH('connector'='redis','host'='xxxx','dbNum'='xxxx','mode'='string','expiration'='86400000');

基础参数参考ReidsAsSource

sink独有参数:
1.ignoreDelete:是否忽略Retraction消息。true:收到Retraction消息时,忽略Retraction消息。false:收到Retraction消息时,同时删除数据对应的key及已插入的数据。
2.expiration:为写入数据对应的Key设置TTL。如果该参数的值大于0,则写入数据对应的Key会被设置相应的TTL,单位为毫秒。

HiveAsSink

例子:

CREATECATALOGdlfcatalogWITH('type'='hive','default-database'='temp','hive-version'='3.1.2','hive-conf-dir'='xxxx');beginstatementset;__tableselectcol1,col2,col3FROMsource;;

FlinkSql需要通过YAML配置,使用Catalog接口和HiveCatalog连接到现有的Hive集群。

定义HiveCatalog时所支持的参数介绍:
1.type:Catalog的类型。创建HiveCatalog时,该参数必须设置为'hive'。
2.hive-conf-dir:指向包含目录的URI。该URI必须是Hadoop文件系统所支持的类型。如果指定一个相对URI,即不包含scheme,则默认为本地文件系统。如果该参数没有指定,我们会在classpath下查找。
3.default-database:当一个catalog被设为当前catalog时,所使用的默认当前database。
4.hive-version:HiveCatalog能够自动检测使用的Hive版本。

注:该模式只支持增量写入,不支持更新

总结

本篇文章只介绍了七猫在常用数据库下的数据同步。无论是Hbase、MongoDb、ES等其他数据库的使用,还是数据同步平台化的建设,七猫有有着一套完整的体系。通过这个文档希望大家能够对数据同步有所收获,也对七猫未来的文章有所期待。

作者:罗嗣挺

出处: