您的位置:首页 > 汽车 > 新车 > 网页设计的规格_我要自学网官方网站_电商培训机构排名_上海疫情最新消息

网页设计的规格_我要自学网官方网站_电商培训机构排名_上海疫情最新消息

2025/1/13 13:23:54 来源:https://blog.csdn.net/inori1256/article/details/143489679  浏览:    关键词:网页设计的规格_我要自学网官方网站_电商培训机构排名_上海疫情最新消息
网页设计的规格_我要自学网官方网站_电商培训机构排名_上海疫情最新消息

我的Flink版本为1.13.6

<flink.version>1.13.6</flink.version>

FlinkSql读取外部的MySQL是走的JDBC所以需要以下两个依赖:

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>1.13.6</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.25</version></dependency>

读取HBase需要如下依赖:

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hbase-2.2_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>

我们首先在虚拟机的MySQL中随意找一张表,我以我这边的ebs库的customer表为例:

表内容如下:

在scala工程中建立一个类,内容如下:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject MysqlJdbc {def main(args: Array[String]): Unit = {val settings = EnvironmentSettings.newInstance().inStreamingMode().build()val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(see)tabEnv.executeSql("""|CREATE TABLE person(|gender string,|country string,|job string,|credit_type string,|customer_id int ,|first_name string,|last_name string,|email string,|address string,|`language` string,|credit_no string|)|WITH (|'connector' = 'jdbc',|'url' = 'jdbc:mysql://single01:3306/ebs',|'driver' = 'com.mysql.cj.jdbc.Driver',|'username' = 'root',|'password' = 'sakura20031204',|'table-name' = 'customer'|)|""".stripMargin)tabEnv.sqlQuery("""|SELECT * FROM person||""".stripMargin).execute().print()}
}

在flinkSQL中,我建立一张虚拟表person,然后在字段列表中,每一个字段名的数据类型都必须和MySQL中源数据的数据类型相匹配,否则会报错。

其中一些参数解释如下:

'connector' = 'jdbc'

指定连接器类型为jdbc。

'url' = 'jdbc:mysql://single01:3306/ebs'

指定所要连接的MySQL服务器地址以及库名。

'driver' = 'com.mysql.cj.jdbc.Driver'

指定MySQL驱动包。

'username' = 'root'

'password' = 'sakura20031204'

指定登录MySQL用户的账号和密码,我这边为了方便使用的是root用户,实际使用时不能使用root。要保证该用户有读取你指定的那张表的权限。

'table-name' = 'customer'

指定表的名字。

结果如下(截图只有部分内容,因为字段列表长):

同样的,在HBase中先找一张表,我这里以我这边的hbase_test:tranfer_from_mysql为例:

HBase shell中看表内容比较费劲,这张表大致内容有一个列族baseinfo 内容是:

<age INT, gender STRING,name STRING,phone STRING>

在scala工程中建立一个类,内容如下:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject HBase {def main(args: Array[String]): Unit = {val settings = EnvironmentSettings.newInstance().inStreamingMode().build()val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(see)tabEnv.executeSql("""|CREATE TABLE person(|id INT,|baseinfo ROW<age INT, gender STRING,name STRING,phone STRING>|) WITH (|'connector' = 'hbase-2.2',|'table-name' = 'hbase_test:tranfer_from_mysql',|'zookeeper.quorum' = 'single01:2181',|'zookeeper.znode.parent' = '/hbase'|)|""".stripMargin)tabEnv.sqlQuery("""|SELECT id, baseinfo.age, baseinfo.gender, baseinfo.name, baseinfo.phone|FROM person|WHERE baseinfo.gender = 'female'|and baseinfo.age > 20|""".stripMargin).execute().print()}
}

其中一些参数内容解释如下:

'connector' = 'hbase-2.2'

指定使用的连接器类型,这里是 HBase 的版本 2.2。

'table-name' = 'hbase_test:tranfer_from_mysql'

指定在 HBase 中要访问的表的名称。

'zookeeper.quorum' = 'single01:2181'

指定 HBase 使用的 ZooKeeper 集群的地址和端口。

'zookeeper.znode.parent' = '/hbase'

指定 ZooKeeper 中的父节点(znode)。

最终结果如下:

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com