您的位置:首页 > 健康 > 养生 > 无锡比较大的互联网公司_slava网页设计师_seo网站分析报告_百度投流

无锡比较大的互联网公司_slava网页设计师_seo网站分析报告_百度投流

2025/1/8 13:32:05 来源:https://blog.csdn.net/w776341482/article/details/144903121  浏览:    关键词:无锡比较大的互联网公司_slava网页设计师_seo网站分析报告_百度投流
无锡比较大的互联网公司_slava网页设计师_seo网站分析报告_百度投流

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

  • MyBatis 更新完毕
  • 目前开始更新 Spring,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(已更完)
  • 实时数仓(正在更新…)

章节内容

  • ODS
  • Lambda架构
  • Kappa架构

在这里插入图片描述

基本介绍

在 Kafka 中写入维度表(DIM)通常涉及将实时或批处理数据从 Kafka 主题(Topic)读取,并根据数据流中的信息更新维度表(DIM),这在数据仓库或数据湖的 ETL(提取、转换、加载)过程中非常常见。维度表(DIM)存储的是与业务数据相关的维度信息,例如客户、产品、地理位置等,用于支持 OLAP(联机分析处理)查询。

理解 Kafka 数据流

Kafka 是一个分布式流平台,用于高吞吐量的消息传递。在 ETL 过程中,Kafka 通常用作数据的消息队列或者流处理的来源。每当新数据生成时,它会被发布到 Kafka 中的某个主题(Topic),然后消费者(Consumer)可以从主题中获取数据进行处理。

设计维度表(DIM)

维度表通常包含业务实体的详细信息,如产品名称、客户信息、时间维度等。与事实表(Fact)不同,维度表的数据较为静态,但可能会随着时间更新(例如,客户地址变更或产品类别更新)。每个维度表通常有一个唯一的主键(如 customer_id 或 product_id)来标识记录。

Kafka 消费者(Consumer)

为了从 Kafka 中读取维度数据,需要创建一个消费者(Consumer),它会从 Kafka 的某个主题(Topic)中读取消息。这些消息通常是 JSON 格式,包含需要写入维度表的信息。消费者将从 Kafka 主题中获取数据,可能包括以下步骤:

  • 连接到 Kafka 集群。
  • 订阅一个或多个主题(Topics)。
  • 消费消息并将其传递给后续的处理逻辑。
  • 消费者的实现可以使用 Kafka 提供的客户端库,例如 Kafka 的 Java 客户端、Python 的 confluent-kafka 等。

数据处理和转换

在读取到 Kafka 消息后,消费者需要对数据进行必要的处理和转换。对于维度数据,处理逻辑可能包括:

  • 数据解析:将消息从 Kafka 中的格式(例如 JSON)解析成结构化数据。
  • 校验数据:检查数据是否符合业务规则,是否完整,是否有效。
  • 维度数据更新:如果 Kafka 中的消息包含的维度信息已经存在,则更新相关记录;如果是新维度,则插入新记录。

维度表的更新

维度表的更新通常有两种常见的方式:

  • 全量更新:每次从 Kafka 获取到新的数据时,都将其覆盖到维度表中。这种方式适用于数据变动较少或者可以接受重写的场景。
  • 增量更新:根据时间戳、有效性标志或版本号等信息,更新已有的维度记录。这种方式适用于数据会有更新(如地址或状态变更)的场
    景。

增量更新时,通常会执行以下操作:

  • 查找是否已有该维度记录(例如通过 dimension_id)。
  • 如果存在且数据发生变化,则更新该记录,同时更新 valid_to 时间,并插入一条新的记录,设置 valid_from 和 valid_to 时间。
  • 如果不存在该记录,则直接插入新的维度数据。

写入到目标存储(DIM)

在数据处理后,需要将更新后的维度数据写入目标存储。这通常是一个数据库(例如 MySQL、PostgreSQL 或 NoSQL 数据库)或数据仓库(例如 Snowflake、Google BigQuery、Redshift)中的维度表(DIM)。

数据存储更新(事务性考虑)

对于维度表的更新,通常需要确保数据的一致性。可以使用事务来确保数据在更新过程中的一致性,防止数据丢失或重复。例如,可以在事务中执行所有的更新和插入操作,确保如果操作失败,可以回滚。

TableObject

创建样例 TableObject

case class TableObject(database: String, tableName: String, typeInfo: String, dataInfo: String) extends Serializable

AreaInfo

case class AreaInfo(id: String,name: String,pid: String,sname: String,level: String,citycode: String,yzcode: String,mername: String,Lng: String,Lat: String,pinyin: String)

DataInfo

case class DataInfo(modifiedTime: String,orderNo: String,isPay: String,orderId: String,tradeSrc: String,payTime: String,productMoney: String,totalMoney: String,dataFlag: String,userId: String,areaId: String,createTime: String,payMethod: String,isRefund: String,tradeType: String,status: String
)

ConnHBase

class ConnHBase {def connToHbase:Connection ={val conf : Configuration = HBaseConfiguration.create()conf.set("hbase.zookeeper.quorum","h121.wzk.icu,h122.wzk.icu,h123.wzk.icu")conf.set("hbase.zookeeper.property.clientPort","2181")conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,30000)conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,30000)val connection = ConnectionFactory.createConnection(conf)connection}
}

SinkHBase

class SinkHBase extends RichSinkFunction[util.ArrayList[TableObject]] {var connection : Connection = _var hbTable : Table = _override def open(parameters: Configuration): Unit = {connection = new ConnHBase().connToHbasehbTable = connection.getTable(TableName.valueOf("wzk_area"))}override def close(): Unit = {if (hbTable != null) {hbTable.close()}if (connection != null) {connection.close()}}override def invoke(value: util.ArrayList[TableObject], context: SinkFunction.Context[_]): Unit = {value.forEach(x => {println(x.toString)val database: String = x.databaseval tableName: String = x.tableNameval typeInfo: String = x.typeInfoif ((database.equalsIgnoreCase("dwshow") && tableName.equalsIgnoreCase("wzk_trade_orders"))) {if (typeInfo.equalsIgnoreCase("insert")) {value.forEach(x => {val info: DataInfo = JSON.parseObject(x.dataInfo, classOf[DataInfo])insertTradeOrders(hbTable, info)})} else if (typeInfo.equalsIgnoreCase("update")) {} else if (typeInfo.equalsIgnoreCase("delete")) {}}if (database.equalsIgnoreCase("dwshow") && tableName.equalsIgnoreCase("wzk_area")) {if (typeInfo.equalsIgnoreCase("insert")) {value.forEach(x => {val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])insertArea(hbTable, info)})} else if (typeInfo.equalsIgnoreCase("update")) {value.forEach(x => {val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])insertArea(hbTable, info)})} else if (typeInfo.equalsIgnoreCase("delete")) {value.forEach(x => {val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])deleteArea(hbTable, info)})}}})}def insertTradeOrders(hbTable: Table, dataInfo: DataInfo): Unit = {val tableName = "wzk_trade_orders"val columnFamily = "f1"// 如果表不存在则创建createTableIfNotExists(connection, tableName, columnFamily)val put = new Put(dataInfo.orderId.getBytes)put.addColumn("f1".getBytes, "modifiedTime".getBytes, dataInfo.modifiedTime.getBytes())put.addColumn("f1".getBytes, "orderNo".getBytes, dataInfo.orderNo.getBytes())put.addColumn("f1".getBytes, "isPay".getBytes, dataInfo.isPay.getBytes())put.addColumn("f1".getBytes, "orderId".getBytes, dataInfo.orderId.getBytes())put.addColumn("f1".getBytes, "tradeSrc".getBytes, dataInfo.tradeSrc.getBytes())put.addColumn("f1".getBytes, "payTime".getBytes, dataInfo.payTime.getBytes())put.addColumn("f1".getBytes, "productMoney".getBytes, dataInfo.productMoney.getBytes())put.addColumn("f1".getBytes, "totalMoney".getBytes, dataInfo.totalMoney.getBytes())put.addColumn("f1".getBytes, "dataFlag".getBytes, dataInfo.dataFlag.getBytes())put.addColumn("f1".getBytes, "userId".getBytes, dataInfo.userId.getBytes())put.addColumn("f1".getBytes, "areaId".getBytes, dataInfo.areaId.getBytes())put.addColumn("f1".getBytes, "createTime".getBytes, dataInfo.createTime.getBytes())put.addColumn("f1".getBytes, "payMethod".getBytes, dataInfo.payMethod.getBytes())put.addColumn("f1".getBytes, "isRefund".getBytes, dataInfo.isRefund.getBytes())put.addColumn("f1".getBytes, "tradeType".getBytes, dataInfo.tradeType.getBytes())put.addColumn("f1".getBytes, "status".getBytes, dataInfo.status.getBytes())hbTable.put(put)}def insertArea(hbTable: Table, areaInfo: AreaInfo): Unit = {// val tableName = "wzk_area"// val columnFamily = "f1"// 如果表不存在则创建// createTableIfNotExists(connection, tableName, columnFamily)println(areaInfo.toString)val put = new Put(areaInfo.id.getBytes())put.addColumn("f1".getBytes(), "name".getBytes(), areaInfo.name.getBytes())put.addColumn("f1".getBytes(), "pid".getBytes(), areaInfo.pid.getBytes())put.addColumn("f1".getBytes(), "sname".getBytes(), areaInfo.sname.getBytes())put.addColumn("f1".getBytes(), "level".getBytes(), areaInfo.level.getBytes())put.addColumn("f1".getBytes(), "citycode".getBytes(), areaInfo.citycode.getBytes())put.addColumn("f1".getBytes(), "yzcode".getBytes(), areaInfo.yzcode.getBytes())put.addColumn("f1".getBytes(), "mername".getBytes(), areaInfo.mername.getBytes())put.addColumn("f1".getBytes(), "lng".getBytes(), areaInfo.Lng.getBytes())put.addColumn("f1".getBytes(), "lat".getBytes(), areaInfo.Lat.getBytes())put.addColumn("f1".getBytes(), "pinyin".getBytes(), areaInfo.pinyin.getBytes())hbTable.put(put)}def deleteArea(hbTable: Table, areaInfo: AreaInfo): Unit = {val delete = new Delete(areaInfo.id.getBytes)hbTable.delete(delete)}def createTableIfNotExists(connection: Connection, tableName: String, columnFamily: String): Unit = {val admin = connection.getAdmintry {val table = TableName.valueOf(tableName)// 检查表是否存在if (!admin.tableExists(table)) {val tableDescriptor = new HTableDescriptor(table)val columnDescriptor = new HColumnDescriptor(columnFamily.getBytes())tableDescriptor.addFamily(columnDescriptor)// 创建表admin.createTable(tableDescriptor)println(s"表 $tableName 创建成功")} else {println(s"表 $tableName 已存在")}} finally {admin.close()}}}

SourceKafka

class SourceKafka {def getKafkaSource(topicName: String) : FlinkKafkaConsumer[String] = {val props = new Properties()props.setProperty("bootstrap.servers", "h121.wzk.icu:9092")props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")props.setProperty("group.id", "hbase-test")props.setProperty("auto.offset.reset", "earliest")new FlinkKafkaConsumer[String](topicName, new SimpleStringSchema(), props)}}

KafkaToHBase

object KafkaToHBase {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval kafkaConsumer = new SourceKafka().getKafkaSource("dwshow")kafkaConsumer.setStartFromLatest()val sourceStream = env.addSource(kafkaConsumer)val mapped: DataStream[util.ArrayList[TableObject]] = sourceStream.map(x => {val jsonObj: JSONObject = JSON.parseObject(x)val database: AnyRef = jsonObj.get("database")val table: AnyRef = jsonObj.get("table")val typeInfo: AnyRef = jsonObj.get("type")val objects = new util.ArrayList[TableObject]()jsonObj.getJSONArray("data").forEach(x => {objects.add(TableObject(database.toString, table.toString, typeInfo.toString, x.toString))println(x.toString)})objects})mapped.addSink(new SinkHBase)env.execute()}
}

启动项目

我们对表进行修改:
在这里插入图片描述
可以看到控制台对饮输出了内容:
在这里插入图片描述
别的表也尝试修改一下:
在这里插入图片描述
查看 HBase 可以看到数据已经有了:
在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com