您的位置:首页 > 房产 > 家装 > 进销存管理软件_福州制作网站提供商_中山百度推广公司_新产品推广方案怎么写

进销存管理软件_福州制作网站提供商_中山百度推广公司_新产品推广方案怎么写

2024/12/27 1:16:57 来源:https://blog.csdn.net/bluuusea/article/details/143996300  浏览:    关键词:进销存管理软件_福州制作网站提供商_中山百度推广公司_新产品推广方案怎么写
进销存管理软件_福州制作网站提供商_中山百度推广公司_新产品推广方案怎么写

flink输出至es,连接es7时使用的sink类是Elasticsearch7SinkBuilder,与es6链接方式不同,官网地址:fink连接es官方地址。

flink连接es7官方示例

依赖jar:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>3.0.1-1.16</version>
</dependency>

官方连接es7代码

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.util.HashMap;
import java.util.Map;DataStream<String> input = ...;input.sinkTo(new Elasticsearch7SinkBuilder<String>().setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered.setHosts(new HttpHost("127.0.0.1", 9200, "http")).setEmitter((element, context, indexer) ->indexer.add(createIndexRequest(element))).build());private static IndexRequest createIndexRequest(String element) {Map<String, Object> json = new HashMap<>();json.put("data", element);return Requests.indexRequest().index("my-index").id(element).source(json);
}

本地运行报错

java.lang.IllegalStateException: The elasticsearch emitter must be serializable.
错误提示很明确:emitter没有序列化,点击查看ElasticsearchEmitter的源码其实最终是有继承序列化接口的,那就猜测是使用lambda方式不能自己实现序列化,那就自己写个实现类ElasticsearchEmitter接口然后显示序列化一次。

修改后代码

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.util.HashMap;
import java.util.Map;DataStream<String> input = ...;input.sinkTo(new Elasticsearch7SinkBuilder<String>().setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered.setHosts(new HttpHost("127.0.0.1", 9200, "http")).setEmitter((element, context, indexer) ->indexer.add(createIndexRequest(element))).build());private static IndexRequest createIndexRequest(String element) {Map<String, Object> json = new HashMap<>();json.put("data", element);return Requests.indexRequest().index("my-index").id(element).source(json);
}//自己新增类继承ElasticsearchEmitter,实现序列化接口static class MyElasticsearchEmitter implements ElasticsearchEmitter<String>, Serializable {@Overridepublic void emit(String element, SinkWriter.Context context, RequestIndexer indexer) {indexer.add(createIndexRequest(element));}}

启动后验证成功。

flink连接es7抽出完成工具类


import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.elasticsearch.sink.*;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.io.Serializable;@Slf4j
public class Es7SinkFactory {public static Sink createSink(String host, Integer port, String schema, String user, String pwd, String index) {try {ElasticsearchSink<String> elasticsearchSink = new Elasticsearch7SinkBuilder<String>().setHosts(new HttpHost(host, port, schema)).setConnectionUsername(user).setConnectionPassword(pwd).setEmitter(new MyElasticsearchEmitter(index)).setBulkFlushMaxSizeMb(5)//最大缓冲大小.setBulkFlushInterval(5000)//刷新间隔.setBulkFlushMaxActions(1000)//批量插入最大条数.setBulkFlushBackoffStrategy(FlushBackoffType.CONSTANT, 5, 100).build();return elasticsearchSink;} catch (Exception e) {log.error("Es7SinkFactory.createSink error e=", e);}return null;}private static IndexRequest createIndexRequest(String element, String index) {return Requests.indexRequest().index(index).source(JSON.parseObject(element));}//匿名内部类/lamda表达式未集成序列化接口,需要实现序列化接口static class MyElasticsearchEmitter implements ElasticsearchEmitter<String>, Serializable {private String index;MyElasticsearchEmitter(String index) {this.index = index;}@Overridepublic void emit(String element, SinkWriter.Context context, RequestIndexer indexer) {indexer.add(createIndexRequest(element, index));}}}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com