您的位置:首页 > 科技 > IT业 > 微信公众号小程序开发教程_公司运营管理方案_百度手机版_百度电视剧风云榜

微信公众号小程序开发教程_公司运营管理方案_百度手机版_百度电视剧风云榜

2024/11/14 16:07:32 来源:https://blog.csdn.net/m0_65850671/article/details/142333716  浏览:    关键词:微信公众号小程序开发教程_公司运营管理方案_百度手机版_百度电视剧风云榜
微信公众号小程序开发教程_公司运营管理方案_百度手机版_百度电视剧风云榜


import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.elasticsearch.action.DocWriteRequest
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests

object demo{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

//需要状态开启下面的配置
//env.setStateBackend(new RocksDBStateBackend(s"hdfs://${namenodeID}", true))//hdfs 作为状态后端
//env.enableCheckpointing(10 * 60 * 1000L)
//env.getCheckpointConfig.setCheckpointTimeout(10 * 60 * 1000L)

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //处理时间

val props = new Properties
    props.setProperty("bootstrap.servers", "host:6667")//有些是9092端口
    props.setProperty("group.id", "groupId")
    props.setProperty("retries", "10")
    props.setProperty("retries.backoff.ms", "100")
    props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000")
//是否配置了权限,有的话加上下面的配置
     // props.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='' password='';")
//props.setProperty("security.protocol", "SASL_PLAINTEXT");
    //  props.setProperty("sasl.mechanism", "PLAIN")


val myConsumer = new FlinkKafkaConsumer[String]("topicName", new SimpleStringSchema(), props)
.setStartFromEarliest()//从什么时间开始读

val stream = env.addSource(myConsumer)
.map(m => {
        val list= m.split("\t")
        val id = list(1)
        val ts = list(2)
        Demo(id,ts)
})


val httpHosts = CP.getESConf

val esSinkBuilder = new ElasticsearchSink.Builder[Demo](
httpHosts,
new ElasticsearchSinkFunction[Demo] {

def process(element: Demo, ctx: RuntimeContext, indexer: RequestIndexer) {

val json = new java.util.HashMap[String, String]

json.put("@timestamp", element.ts)
json.put("id", element.id)

val rqst: IndexRequest = Requests.indexRequest
//.id("自定义id,不加会自动生成")
.id(element.id)
.index("indexName")
.source(json)
.opType(DocWriteRequest.OpType.INDEX)

indexer.add(rqst)

}
}
)

setESConf(esSinkBuilder, 50000)

stream.addSink(esSinkBuilder.build())
.uid("write-to-es")
.name("write-to-es")

env.execute(s"demo")
}

  def setESConf[T](esSinkBuilder: ElasticsearchSink.Builder[T], numMaxActions: Int) {
    esSinkBuilder.setBulkFlushMaxActions(numMaxActions)
    esSinkBuilder.setBulkFlushMaxSizeMb(10)
    esSinkBuilder.setBulkFlushInterval(10000)
    esSinkBuilder.setBulkFlushBackoff(true)
    esSinkBuilder.setBulkFlushBackoffDelay(2)
    esSinkBuilder.setBulkFlushBackoffRetries(3)

    esSinkBuilder.setRestClientFactory(new RestClientFactory {
      override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
        restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
          override def customizeRequestConfig(requestConfigBuilder: RequestConfig.Builder): RequestConfig.Builder = {
            requestConfigBuilder.setConnectTimeout(12000)
            requestConfigBuilder.setSocketTimeout(90000)
          }
        })
      }
    })
  }
}

case class Demo(id: String, ts: String)
 

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com