您的位置:首页 > 娱乐 > 明星 > 绵阳疫情最新情况_dreamweaver网页设计期末考试_全网热搜榜_京东seo搜索优化

绵阳疫情最新情况_dreamweaver网页设计期末考试_全网热搜榜_京东seo搜索优化

2025/1/7 17:27:51 来源:https://blog.csdn.net/weixin_39555954/article/details/144921196  浏览:    关键词:绵阳疫情最新情况_dreamweaver网页设计期末考试_全网热搜榜_京东seo搜索优化
绵阳疫情最新情况_dreamweaver网页设计期末考试_全网热搜榜_京东seo搜索优化

目录

一、读数据

二、写数据

1、写数据

2、数据刷到磁盘过程

3、数据合并过程

三、Api的使用

四、Phoenix

1、安装Phoenix

2、简单使用

2.1、查看表信息

2.2、创建表

2.3、插入数据

2.4、查看数据

2.5、修改数据

2.6、删除数据

2.7、退出

2.8、导入数据

3、API的使用

五、整合SpringBoot+MyBatis

六、参考


一、读数据

  1. Client先访问zookeeper,请求获取Meta表信息。从上面图可以看到Zookeeper中存储了HBase的元数据信息。
  2. Zookeeper返回meta数据信息。
  3. 根据namespace、表名和rowkey在meta表的信息去获取数据所在的RegionServer和Region
  4. 找到这个region对应的regionserver;
  5. 查找对应的region;
  6. 先从MemStore找数据,如果没有,再到BlockCache里面读;
  7. BlockCache还没有,再到StoreFile上读(为了读取的效率);
  8. 如果是从StoreFile里面读取的数据,不是直接返回给客户端,而是先写入BlockCache,再返回给客户端。

二、写数据

1、写数据

写数据和读数据前几步的过程都是一样的,首先从Zookeeper获取Meta表信息,根据meta表信息确认RegionServer 和Region

  1. Client向RegionServer发送写请求。
  2. RegionServer将数据写到HLog(write ahead log)。为了数据的持久化和恢复(避免数据在内存还没刷到磁盘就宕机丢失数据)。
  3. RegionServer将数据写到内存(MemStore),此时就认为写数据完成了,至于什么时候数据写进HDFS,它不关心。
  4. 反馈Client写成功。

2、数据刷到磁盘过程

当MemStore数据达到阈值(默认是128M),将数据刷到硬盘,将内存中的数据删除,同时删除HLog(Write Ahead Log)中的历史数据;并将数据存储到HDFS中;在HLog中做标记点。

3、数据合并过程

  1. 当数据块达到3块,Hmaster触发合并操作,Region将数据块加载到本地,进行合并。
  2. 当合并的数据超过256M,进行拆分,将拆分后的Region分配给不同的HregionServer管理。
  3. 当RegionServer宕机后,将RegionServer上的hlog拆分,然后分配给不同的RegionServer加载,修改.META。
  4. HLog的数据会同步到HDFS,保证数据的可靠性。

三、Api的使用

package com.xiaojie.hadoop.utils;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.quotas.QuotaSettings;
import org.apache.hadoop.hbase.quotas.QuotaType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;import java.io.IOException;
import java.util.List;/*** @author 熟透的蜗牛* @version 1.0* @description: hbase工具类* @date 2025/1/3 9:27*/
@Slf4j
public class HBaseUtil {private static Connection connection;static {Configuration configuration = HBaseConfiguration.create();//设置端口号configuration.set("hbase.zookeeper.property.clientPort", "2181");//设置zk连接configuration.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3");//创建连接try {connection = ConnectionFactory.createConnection(configuration);} catch (IOException e) {throw new RuntimeException(e);}}/*** @param namespace* @description: 创建namespace* @return: boolean* @author 熟透的蜗牛* @date: 2025/1/3 11:03*/public static boolean createNameSpace(String namespace) {try {Admin admin = connection.getAdmin();NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();admin.createNamespace(namespaceDescriptor);log.info(">>>>>>>>>>>>创建namespace成功");return true;} catch (IOException e) {throw new RuntimeException(e);}}/*** @param namespace* @description: 删除ns* @return: boolean* @author 熟透的蜗牛* @date: 2025/1/3 11:05*/public static boolean deleteNameSpace(String namespace) {try {Admin admin = connection.getAdmin();admin.deleteNamespace(namespace);log.info(">>>>>>>>>>>>删除namespace成功");return true;} catch (IOException e) {throw new RuntimeException(e);}}/*** @param tableName      表名* @param columnFamilies 列族* @description: 创建表* @return: boolean* @author 熟透的蜗牛* @date: 2025/1/3 9:35*/public static boolean createTable(String tableName, List<String> columnFamilies, String nameSpace) throws IOException {Admin admin = connection.getAdmin();boolean exists = admin.tableExists(TableName.valueOf(tableName));//创建表if (!exists) {//如果namespace是空值则会使用default,作为命名空间TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(nameSpace, tableName));columnFamilies.forEach(cf -> {ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf));columnFamilyDescriptorBuilder.setMaxVersions(1);ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptorBuilder.build();tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);});admin.createTable(tableDescriptorBuilder.build());return true;} else {log.info("table exists>>>>>>>>");return false;}}/*** @param tableName 表名* @description: 删除表* @return: boolean* @author 熟透的蜗牛* @date: 2025/1/3 10:16*/public static boolean deleteTable(String tableName) {try {Admin admin = connection.getAdmin();//先禁用表admin.disableTable(TableName.valueOf(tableName));//再删除admin.deleteTable(TableName.valueOf(tableName));return true;} catch (IOException e) {throw new RuntimeException(e);}}/*** @param tableName        表名* @param rowKey           rowkey* @param columnFamilyName 列族* @param qualifier        列标识* @param value            数据* @description: 插入数据* @return: boolean* @author 熟透的蜗牛* @date: 2025/1/3 16:46*/public static boolean putRow(String nameSpace, String tableName, String rowKey, String columnFamilyName, String qualifier,String value) {Table table = null;try {table = connection.getTable(TableName.valueOf(nameSpace, tableName));Put put = new Put(Bytes.toBytes(rowKey));put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(qualifier), Bytes.toBytes(value));table.put(put);log.info(">>>>>>>插入数据成功");return true;} catch (IOException e) {throw new RuntimeException(e);} finally {if (table != null) {try {table.close();} catch (IOException e) {throw new RuntimeException(e);}}}}/*** @param nameSpace* @param tableName* @param rowKey* @param columnFamilyName* @param pairList         键值对集合* @description: 插入数据* @return: boolean* @author 熟透的蜗牛* @date: 2025/1/3 17:32*/public static boolean putRow(String nameSpace, String tableName, String rowKey, String columnFamilyName, List<Pair<String, String>> pairList) {Table table = null;try {table = connection.getTable(TableName.valueOf(nameSpace, tableName));Put put = new Put(Bytes.toBytes(rowKey));pairList.forEach(pair -> {put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(pair.getFirst()), Bytes.toBytes(pair.getSecond()));});table.put(put);log.info(">>>>>>>插入数据成功");return true;} catch (IOException e) {throw new RuntimeException(e);} finally {if (table != null) {try {table.close();} catch (IOException e) {throw new RuntimeException(e);}}}}/*** @param nameSpace* @param tableName* @param rowKey* @description:根据Rowkey查询* @return: org.apache.hadoop.hbase.client.Result* @author 熟透的蜗牛* @date: 2025/1/3 17:42*/public static Result getRowByRowKey(String nameSpace, String tableName, String rowKey) {try {Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));Result result = table.get(new Get(Bytes.toBytes(rowKey)));return result;} catch (IOException e) {throw new RuntimeException(e);}}/*** @param nameSpace* @param tableName* @description: 查询所有数据, 和范围查询* @return: org.apache.hadoop.hbase.client.ResultScanner* @author 熟透的蜗牛* @date: 2025/1/3 18:12*/public static ResultScanner getAll(String nameSpace, String tableName) {try {Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));Scan scan = new Scan();scan.setCacheBlocks(true);//设置读缓存scan.withStartRow(Bytes.toBytes("1002")); //rowkey的起始值scan.withStopRow(Bytes.toBytes("1003"));  //rowkey的结束值,返回结果不包含该值ResultScanner scanner = table.getScanner(scan);return scanner;} catch (IOException e) {throw new RuntimeException(e);}}/*** @param nameSpace* @param tableName* @param filterList* @description: 查找过滤* @return: org.apache.hadoop.hbase.client.ResultScanner* @author 熟透的蜗牛* @date: 2025/1/3 20:47*/public static ResultScanner getScanner(String nameSpace, String tableName, FilterList filterList) {try {Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));Scan scan = new Scan();scan.setFilter(filterList);ResultScanner scanner = table.getScanner(scan);return scanner;} catch (IOException e) {throw new RuntimeException(e);}}/*** @param tableName* @param rowKey* @param columnFamily 列族* @param qualifier    限定符* @description: 获取指定cell数据* @return: java.lang.String* @author 熟透的蜗牛* @date: 2025/1/3 20:59*/public static String getCell(String nameSpace, String tableName, String rowKey, String columnFamily, String qualifier) {try {Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));Get get = new Get(Bytes.toBytes(rowKey));if (!get.isCheckExistenceOnly()) {get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));Result result = table.get(get);byte[] resultValue = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));return Bytes.toString(resultValue);}} catch (IOException e) {throw new RuntimeException(e);}return null;}/*** @param nameSpace* @param tableName* @param rowKey* @description: 删除一行数据* @return: boolean* @author 熟透的蜗牛* @date: 2025/1/3 21:34*/public static boolean deleteRow(String nameSpace, String tableName, String rowKey) {try {Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));Delete delete = new Delete(Bytes.toBytes(rowKey));table.delete(delete);} catch (IOException e) {e.printStackTrace();}return true;}/*** @param nameSpace* @param tableName* @param rowKey* @param familyName* @param qualifier* @description: 删除指定列* @return: boolean* @author 熟透的蜗牛* @date: 2025/1/3 21:34*/public static boolean deleteColumn(String nameSpace, String tableName, String rowKey, String familyName,String qualifier) {try {Table table = connection.getTable(TableName.valueOf(nameSpace, tableName));Delete delete = new Delete(Bytes.toBytes(rowKey));delete.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(qualifier));table.delete(delete);table.close();} catch (IOException e) {e.printStackTrace();}return true;}}

四、Phoenix

Phoenix是 HBase 的开源 SQL 中间层,它允许你使用标准 JDBC 的方式来操作 HBase 上的数据。

1、安装Phoenix

#解压文件
tar -zxf phoenix-hbase-2.6-5.2.1-bin.tar.gz #移动文件
mv phoenix-hbase-2.6-5.2.1-bin /usr/local/#复制jar到regionserver 和master的lib
cp phoenix-server-hbase-2.6.jar /usr/local/hbase-2.6.1/lib #分发到其他服务器
xsync /usr/local/hbase-2.6.1/#重启hbase
./hbase.sh stop
./hbase.sh start#启动phoenix
/usr/local/phoenix-hbase-2.6-5.2.1-bin/bin/sqlline.py hadoop1,hadoop2,hadoop3

2、简单使用

2.1、查看表信息

#查看表信息
!tables

2.2、创建表

CREATE TABLE IF NOT EXISTS us_population (state CHAR(2) NOT NULL,city VARCHAR NOT NULL,population BIGINTCONSTRAINT my_pk PRIMARY KEY (state, city));

2.3、插入数据

#插入数据
UPSERT INTO us_population VALUES('NY','New York',8143197);

2.4、查看数据

select * from us_population;

2.5、修改数据

UPSERT INTO us_population VALUES('NY','New York',999999);

2.6、删除数据

DELETE FROM us_population WHERE city='New York';#删除表
drop table us_population;

2.7、退出

!quit

2.8、导入数据

us_population.csv

NY,New York,8143197
CA,Los Angeles,3844829
IL,Chicago,2842518
TX,Houston,2016582
PA,Philadelphia,1463281
AZ,Phoenix,1461575
TX,San Antonio,1256509
CA,San Diego,1255540
TX,Dallas,1213825
CA,San Jose,912332

us_population.sql

CREATE TABLE IF NOT EXISTS us_population (state CHAR(2) NOT NULL,city VARCHAR NOT NULL,population BIGINTCONSTRAINT my_pk PRIMARY KEY (state, city));
#导入数据
./psql.py hadoop1,hadoop2,hadoop3 us_population.sql us_population.csv#查看数据
select * from us_population;#查询数据
SELECT state as "State",count(city) as "City Count",sum(population) as "Population Sum"
FROM us_population
GROUP BY state
ORDER BY sum(population) DESC;

3、API的使用

package com.xiaojie;import java.sql.*;/*** @author 熟透的蜗牛* @version 1.0* @description: 测试phoenix* @date 2025/1/3 23:08*/
public class PhoenixApi {public static void main(String[] args) throws ClassNotFoundException, SQLException {//1、加载驱动Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");//2、创建连接String url = "jdbc:phoenix:hadoop1,hadoop2,hadoop3";Connection connect = DriverManager.getConnection(url);//3、创建查询PreparedStatement preparedStatement = connect.prepareStatement("SELECT * FROM us_population");//4、遍历结果ResultSet resultSet = preparedStatement.executeQuery();while (resultSet.next()) {System.out.println(resultSet.getString("city") + " "+ resultSet.getInt("population"));}//5、关闭资源preparedStatement.close();resultSet.close();connect.close();}
}

五、整合SpringBoot+MyBatis

package com.xiaojie.hadoop.hbase.phoenix.dao;import com.xiaojie.hadoop.hbase.phoenix.bean.USPopulation;
import org.apache.ibatis.annotations.*;import java.util.List;@Mapper
public interface PopulationDao {@Select("SELECT * from us_population")List<USPopulation> queryAll();@Insert("UPSERT INTO us_population VALUES( #{state}, #{city}, #{population} )")void save(USPopulation USPopulation);@Select("SELECT * FROM us_population WHERE state=#{state} AND city = #{city}")USPopulation queryByStateAndCity(String state, String city);@Delete("DELETE FROM us_population WHERE state=#{state} AND city = #{city}")void deleteByStateAndCity(String state, String city);}
server:port: 9999
spring:application:name: hadoopdatasource:#zookeeper地址url: jdbc:phoenix:hadoop1,hadoop2,hadoop3driver-class-name: org.apache.phoenix.jdbc.PhoenixDrivertype: com.zaxxer.hikari.HikariDataSourcehikari:# 池中维护的最小空闲连接数minimum-idle: 10# 池中最大连接数,包括闲置和使用中的连接maximum-pool-size: 20# 此属性控制从池返回的连接的默认自动提交行为。默认为trueauto-commit: true# 允许最长空闲时间idle-timeout: 30000# 此属性表示连接池的用户定义名称,主要显示在日志记录和JMX管理控制台中,以标识池和池配置。 默认值:自动生成pool-name: custom-hikari#此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟max-lifetime: 1800000# 数据库连接超时时间,默认30秒,即30000connection-timeout: 30000# 连接测试sql 这个地方需要根据数据库方言差异而配置 例如 oracle 就应该写成  select 1 from dualconnection-test-query: SELECT 1
wssnail:hdfs:url: hdfs://hadoop1:8020user-name: root
# mybatis 相关配置
mybatis:configuration:# 是否打印sql语句 调试的时候可以开启log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

六、参考

https://phoenix.apache.org/Phoenix-in-15-minutes-or-less.html

完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码 - Gitee.com

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com