您的位置:首页 > 科技 > 能源 > 【用户投稿】使用 SeaTunnel 进行 HTTP 同步到 Doris 实战经验分享

【用户投稿】使用 SeaTunnel 进行 HTTP 同步到 Doris 实战经验分享

2024/12/23 9:54:24 来源:https://blog.csdn.net/weixin_54625990/article/details/140527411  浏览:    关键词:【用户投稿】使用 SeaTunnel 进行 HTTP 同步到 Doris 实战经验分享

需求背景

由于我司的项目中需要接入不同的数据源的数据到数仓中,在选择了众多的产品中最后选择了Apache SeaTunnel,对比参考

目前我这边使用的接口,暂时没有接口认证,如果需要接口认证的方式接入数据,再做讨论及测试

实际使用

Apache SeaTunnel版本: 2.3.4

话不多说,先贴最终的运行文件,由于我使用的jsonrest-api提交方式,所以结果如下图所示:

使用restconf的区别就在于job执行的环境不同,conf使用的是ClientJobExecutionEnvironment(经测试也支持json格式),而rest方式则使用的是RestJobExecutionEnvironment

接口返回的数据格式

{"code": "0000","msg": "成功","data": {"records": [{"id": "1798895733824393218","taskContent": "许可证02","taskType": "许可证"}]}
}
// 实际数据分页的很多,以上是示例

接入配置

{"env": {"job.mode": "BATCH","job.name": "SeaTunnel_Job"},"source": [{"result_table_name": "Table13367210156032","plugin_name": "Http","url": "http://*.*.*.*:*/day_plan_repair/page","method": "GET",  // Http请求方式 只支持GET和POST两种方式"format": "json", // 默认值是text 只支持json和text两种方式"json_field": {   // 可以看看作是从上述接口返回的数据中取数据的路径和key的映射关系,value则是取值的JsonPath"id": "$.data.records[*].id","taskContent": "$.data.records[*].taskContent","taskType": "$.data.records[*].taskType"},// "pageing": {//   "page_field": "current", // 当前页的key,就是分页接口的请求参数中的当前页的key,//   "batch_size": 10         // 每页取多少数据// },"schema": {"fields": {"id": "BIGINT", // 主键列问题,详见下面的问题"taskContent": "STRING","taskType": "STRING"}}}],"transform": [{"field_mapper": { // key是source中的schema.field中的值,value是sink中使用的值,例如下面的save_mode_create_template里的${rowtype_fields}使用的就是value,可以更改value作为sink的新命名列"id": "id", "taskContent": "task_content","taskType": "task_type"},"result_table_name": "Table13367210156033","source_table_name": "Table13367210156032","plugin_name": "FieldMapper"}],"sink": [{"source_table_name": "Table13367210156033","plugin_name": "Doris","fenodes ": "*.*.*.*:*","database": "test","password": "****","username": "****","table": "ods_day_plan","sink.label-prefix": "test-ods_day_plan", // Stream Load 导入使用的标签前缀。在 2pc 场景下,需要全局唯一性来保证 SeaTunnel 的 EOS 语义"sink.enable-2pc": false, // 是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。Doris的二阶段提交详见https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual"data_save_mode": "APPEND_DATA", // 数据保存模式 DROP_DATA、APPEND_DATA、CUSTOM_PROCESSING、ERROR_WHEN_DATA_EXISTS官方提供了四种,我使用保留数据库结构,追加数据,可以详见源码中的DataSaveMode枚举"schema_save_mode": "CREATE_SCHEMA_WHEN_NOT_EXIST", // Scheme保存模式 RECREATE_SCHEMA、CREATE_SCHEMA_WHEN_NOT_EXIST、ERROR_WHEN_SCHEMA_NOT_EXIST 我使用的是当Schema不存在时创建;具体释义详见SchemaSaveMode枚举"save_mode_create_template": "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n ${rowtype_fields}\n ) ENGINE=OLAP\n UNIQUE KEY (id)\n DISTRIBUTED BY HASH (id)\n PROPERTIES (\n \"replication_allocation\" = \"tag.location.default: 1\",\n \"in_memory\" = \"false\",\n  \"storage_format\" = \"V2\",\n \"disable_auto_compaction\" = \"false\"\n )","sink.enable-delete": true, //是否启用删除,此配置只有Doris的表模型是Unique模型,同时需要Doris表开启批量删除功能(默认开启 0.15+ 版本)"doris.config": {"format": "json","read_json_by_line": "true"}}]
}

实际使用中遇到的问题

Handle save mode failed

具体的报错日志中包含
Caused by: java.sql.SQLException: errCode = 2, detailMessage = Syntax error in line 21:UNIQUE KEY ()^
Encountered: )
Expected: IDENTIFIER
解决方案:详见链接[issue](https://github.com/apache/seatunnel/issues/6646)使用了上述配置文件中的`save_mode_create_template`字段解决,目标中值的可以自行根据业务配置。

NoSuchMethodError

java.lang.NoSuchMethodError: retrofit2.Retrofit$Builder.client(Lshaded/okhttp3/OkHttpClient;)Lretrofit2/Retrofit$Builder;at org.influxdb.impl.InfluxDBImpl.<init>(InfluxDBImpl.java:179) ~[connector-influxdb-2.3.4.jar:2.3.4]at org.influxdb.impl.InfluxDBImpl.<init>(InfluxDBImpl.java:120) ~[connector-influxdb-2.3.4.jar:2.3.4]at org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient.getInfluxDB(InfluxDBClient.java:72) ~[connector-influxdb-2.3.4.jar:2.3.4]
在使用influxdb的连接时,遇到了**jar包冲突**的问题,最终发现在创建http链接的时候,`retrofit2`的依赖与`datahub`连接器中的存在版本冲突,我这里没有使用到`datahub`,所以**删除datahub的连接器**即可解决问题

Apache Doris BIGINT类型精度丢失问题

详见帖子

配置主键

Doris配置save_mode_create_template包含主键时,主键类型必须是数字或日期类型。

上面的source配置的schema中的id,接口返回的实际类型是字符串类型,但是是雪花算法的全数字类型,所以使用BIGINT类型自动转换

原因是Sink配置中的save_mode_create_templateUNIQUE KEY使用的id作为主键,Doris要求主键列类型必须是数字或者日期类型!!

个人经验

  1. 当sink、source、transform只有一个时,可以省略result_table_name、source_table_name配置项
  2. 下载源码,修改源码,在源码中增加log日志,并打包替换SeaTunnel运行时的jar,以方便根据日志得到自己想知道的结果或者方便理解代码
  3. 根据1的运用,熟知代码后可以进行二次开发,例如需要token认证的接口该怎么处理,值得深思。
  4. 另外source配置中的json_field中的value的JsonPath值,不支持 列表中复杂类型取值的问题Array或Map<String, Object>。也可以考虑二开解决
    // 举例:
    {"code": "0000","msg": "成功","data": {"records": [{"id": "1798895733824393218","taskContent": "许可证02","taskType": "许可证","region_list": [ // 此格式中的region_list无法解析和同步 $.data.records[*].region_list[*].id 会报数据和总数不匹配的错误{"id":"1","name": "11"},{"id":"1","name": "11"}]}]}
    }

    附上 测试代码 (使用的是JDK17)

        private static final Option[] DEFAULT_OPTIONS = {Option.SUPPRESS_EXCEPTIONS, Option.ALWAYS_RETURN_LIST, Option.DEFAULT_PATH_LEAF_TO_NULL};private JsonPath[] jsonPaths;private final Configuration jsonConfiguration = Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);@Testpublic void test5() {String data = """{"code": "0000","msg": "成功","data": {"records": [{"id": "1798895733824393218","taskContent": "12312312313"}]}}""";Map<String, String> map = new HashMap<>();map.put("id", "$.data.records[*].id");map.put("taskContent", "$.data.records[*].taskContent");JsonField jsonField = JsonField.builder().fields(map).build();initJsonPath(jsonField);data = JsonUtils.toJsonNode(parseToMap(decodeJSON(data), jsonField)).toString();log.error(data);}// 以下代码都是HttpSourceReader中的代码private void initJsonPath(JsonField jsonField) {jsonPaths = new JsonPath[jsonField.getFields().size()];for (int index = 0; index < jsonField.getFields().keySet().size(); index++) {jsonPaths[index] =JsonPath.compile(jsonField.getFields().values().toArray(new String[] {})[index]);}}private List<Map<String, String>> parseToMap(List<List<String>> datas, JsonField jsonField) {List<Map<String, String>> decodeDatas = new ArrayList<>(datas.size());String[] keys = jsonField.getFields().keySet().toArray(new String[] {});for (List<String> data : datas) {Map<String, String> decodeData = new HashMap<>(jsonField.getFields().size());final int[] index = {0};data.forEach(field -> {decodeData.put(keys[index[0]], field);index[0]++;});decodeDatas.add(decodeData);}return decodeDatas;}private List<List<String>> decodeJSON(String data) {ReadContext jsonReadContext = JsonPath.using(jsonConfiguration).parse(data);List<List<String>> results = new ArrayList<>(jsonPaths.length);for (JsonPath path : jsonPaths) {List<String> result = jsonReadContext.read(path);results.add(result);}for (int i = 1; i < results.size(); i++) {List<?> result0 = results.get(0);List<?> result = results.get(i);if (result0.size() != result.size()) {throw new HttpConnectorException(HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT,String.format("[%s](%d) and [%s](%d) the number of parsing records is inconsistent.",jsonPaths[0].getPath(),result0.size(),jsonPaths[i].getPath(),result.size()));}}return dataFlip(results);}private List<List<String>> dataFlip(List<List<String>> results) {List<List<String>> datas = new ArrayList<>();for (int i = 0; i < results.size(); i++) {List<String> result = results.get(i);if (i == 0) {for (Object o : result) {String val = o == null ? null : o.toString();List<String> row = new ArrayList<>(jsonPaths.length);row.add(val);datas.add(row);}} else {for (int j = 0; j < result.size(); j++) {Object o = result.get(j);String val = o == null ? null : o.toString();List<String> row = datas.get(j);row.add(val);}}}return datas;}

    以上是我的一些经验分享,希望对大家有帮助!

    本文由 白鲸开源科技 提供发布支持!

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com