您的位置:首页 > 新闻 > 会展 > 深圳优化怎么做搜索_企业怎么做推广_自己的网站_抖音seo关键词优化排名

深圳优化怎么做搜索_企业怎么做推广_自己的网站_抖音seo关键词优化排名

2024/10/11 18:49:43 来源:https://blog.csdn.net/m0_74070923/article/details/142730064  浏览:    关键词:深圳优化怎么做搜索_企业怎么做推广_自己的网站_抖音seo关键词优化排名
深圳优化怎么做搜索_企业怎么做推广_自己的网站_抖音seo关键词优化排名

基础配置

创建虚拟机,后首先测试是否ping通

ping www.baidu.com

1.修改服务器名称

  修改主机名:

$ hostnamectl set-hostname master01  #临时改名,重启还原bash

 永久修改:

修改配置文件

vi /etc/hostname 将内容改为master,保存退出

vi /etc/sysconfig/network

NETWORKING=yesHOSTNAME=master  修改为新名

 使每一台机器都能通过名字识别另一个机器【配置 hosts实现主机名和ip地址映射】

vi /etc/hosts

192.168.222.171 Competition2024master Competition2024master.root192.168.222.172 Competition2022node1 Competition2022node1.root192.168.222.173 Competition2022node2 Competition2022node2.root

2.大数据集群网络配置

2.1编辑虚拟网络

首先以管理员身份进入,在页面左上角寻找“编辑”,点击进入后选择“虚拟网络编辑器”,点击“添加网络”,选择”VMnet8”,将”仅主机模式”改为“NAT模式”,

第一步:

子网IP:192.168.222.0        #子网的最后一位一定是0

子网掩码:255.255.255.0

第二步:DHCP设置                                                        

起始IP地址:192.168.222.128

结束IP地址:192.168.222.254

第三步:NAT设置

网关IP:192.168.222.2

以上三步做完点击“应用”,再点击“确定”

现在把所有虚拟机都打开设置里面的IP

2.2虚拟机网络配置与远程连接

网络配置

一.先对master进行操作

1.cd /etc/sysconfig/network-scripts

2.vi ifcfg-ens33

3.BOOTPROTO=dhcp   改为BOOTPROTO=static  #静态的意思

4.ONBOOT=no     改为ONBOOT=yes

5.再添加5条记录

IPADDR=192.168.222.171GATEWAY=192.168.222.2NETMASK=255.255.255.0DNS1=192.168.222.2DNS2=8.8.8.8

#IPADDR 是IP地址的意思

#GATEWAY 是网关的意思

#NETWORK是子网掩码的意思

6.网络服务重启

  service network restart

关闭防火墙

systemctl stop firewalld

       克隆虚拟机--

二.再对其他三个虚拟机进行操作

以上操作相同不同点是

IPADDR分别为

IPADDR=192.168.222.172IPADDR=192.168.222.173IPADDR=192.168.222.174

集群时间同步

时间同步的前提:网络配置完成

时间同步的分工

master 作为时间同步的服务器,slave1,slave2和masterbak向master同步

准备工作:挑选时区

tzselect 5 9 1 1

可在profile 文件中添加命令,使时区保存

vi /etc/profile 最低部添加 TZ='Asia/Shanghai'; export TZ

安装时间同步的软件ntp

【注】三台机器都要安装:

yum install -y ntp 直接下载失效,首先配置

禁用镜像列表(mirrorlist)并启用直接链接(baseurl)

方法一:

到网页

mv /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.backup

上传Centos-7.repo

mv /etc/yum.repos.d/Centos-7.repo /etc/yum.repos.d/CentOS-Base.repo

方法二

sed -i s/mirror.centos.org/vault.centos.org/g /etc/yum.repos.d/*.reposed -i s/^#.*baseurl=http/baseurl=https/g /etc/yum.repos.d/*.reposed -i s/^mirrorlist=http/#mirrorlist=https/g /etc/yum.repos.d/*.repo

方法三:

sudo wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo 显示wget找不到命令sudo yum clean allsudo yum makecache

3.修改ntp 配置文件

ntpq -c version

ntpq 4.2.6p5@1.2349-o Tue Jun 23 15:38:21 UTC 2020 (1)

只需要在master中做:

vi /etc/ntp.conf

在末尾追加

server 127.0.0.1fudge 127.0.0.1 stratum 10

注意先关闭防火墙-----一般没关

systemctl stop firewalldsudo ntpdate pool.ntp.org 手动同步时间sudo firewall-cmd --list-all 查看状态

4.重启时间服务

systemctl  restart  ntpd.service

5.在slave1和slave2上去更新

   ntpdate master     #以master为标准进行时间的校对【无用】

SSH免密登录

1.三台机器上生成公钥-私钥

ssh-keygen -t dsa      #注意事项 ssh-keygen空格-t空格dsa

2.分节点先切换到.ssh目录下:

cd /root/.ssh         #注意事项 不要忽略ssh前面的点(“.”)

3.将公钥传递过去:

scp id_dsa.pub root@master:/root/.ssh/s1.pub 

#s1.pub一定要加,意思是传递过去 之前名为id_dsa.pub,和master中公钥名称相同

       s2.pub,mb.pub 操作同上

4.在master中将三个公钥合并成一个公钥包

先切换到/root/.ssh目录下面:

cd /root/.ssh

将三个公钥合并成一个公钥包:

cat id_dsa.pub mb.pub s1.pub s2.pub >> authorized_keys

5.将master中合成的公钥包分别传递到slave1和slave2的/root/.ssh下:

scp authorized_keys root@slave1:/root/.sshscp authorized_keys root@slave2:/root/.ssh scp authorized_keys root@masterbak:/root/.ssh

#传递过去后不用另起别名了,因为/root/.ssh下面没有叫authorized_keys的

6.现在可以免密登录了

假设现在在master中

ssh slave1   #免密登录到slave1

exit        #退出slave1,重回master中

Java配置

将JDK安装到特定的目录下:

mkdir -p /usr/java       #创建一个目录用来安装jdk

5.安装:

tar   -zxvf  jdk-8u17-linux-x64.tar.gz -C /usr/java     #这里的C是大写

                   名称

Java -version : jdk1.8.0_171

6.vi /etc/profile

export JAVA_HOME=/usr/java/jdk1.8.0_212export CLASSPATH=.:$JAVA_HOME/libexport PATH=$PATH:$JAVA_HOME/bin

从打红色箭头的一行开始编辑,一共编辑以上四行,然后退出

7.让环境变量生效

source /etc/profile

大数据组件-Hadoop

  1. Hadoop 完全分布式搭建

创建/usr/Hadoop

tar -zxvf -C 解压文件到/usr/hadoop中

1.配置环境变量

vi /etc/profile

#hadoopexport HADOOP_HOME=/usr/hadoop/hadoop-3.1.3export CLASSPATH=$CLASSPATH:$HADOOP_HOME/libexport PATH=$PATH:$HADOOP_HOME/bin

使生效 source /etc/profile

4.3 hadoop集群配置

在hadoop-3.1.3 位置下有 etc/hadoop 位置

cd  etc/hadoop 进入

1.vi hadoop-env.sh

export JAVA_HOME=/usr/java/jdk1.8.0_212

2.vi core-site.xml

<property><name>fs.default.name</name><value>hdfs://master:9820</value></property><property><name>hadoop.temp.dir</name><value>/usr/hadoop/hadoop-3.1.3/tmp</value></property>

3.vi yarn-site.xml

<property><name>yarn.resourcemanager.hostname</name><value>master</value></property><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property><property><name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name><value>org.apache.hadoop.mapred.ShuffleHandler</value></property>

4.vi workers【关键步骤】

填入

master

slave1

slave2

5.vi hdfs-site.xml

<property><name>dfs.replication</name>  #Block副本数,默认3<value>2</value></property><property><name>dfs.namenode.name.dir</name><value>file:/usr/hadoop/hadoop-3.1.3/hdfs/name</value><final>true</final></property><property><name>dfs.datanode.data.dir</name><value>file:/usr/hadoop/hadoop-3.1.3/hdfs/data</value><final>true</final></property><property><name>dfs.namenode.secondary.http-address</name><value>master:9001</value></property><property><name>dfs.namenode.http-address</name><value>192.168.222.171:9870</value></property><property><name>dfs.permissions</name><value>false</value></property><property><name>dfs.webhdfs.enabled</name><value>true</value></property>

C:\Windows\System32\drivers\etc\hosts  修改好对应配置,否则hdfs Web端无法预览文件

6.vi mapred-site.xml

<property><name>mapreduce.framework.name</name><value>yarn</value></property>

HDFS系统

分发:

scp -r /usr/hadoop root@slave1:/usrscp -r /usr/Hadoop root@slave2:/usr

hadoop namenode -format 格式化 

下面对应hdfs文件夹直接删除才可重新

master:

开:

hdfs --daemon start namenodeyarn --daemon start resourcemanagerhdfs --daemon start secondarynamenode

关:

hdfs --daemon stop namenodeyarn --daemon stop resourcemanagerhdfs --daemon stop secondarynamenode

slaves:

开:

hdfs --daemon start datanodeyarn --daemon start nodemanager

关:

hdfs --daemon stop datanodeyarn --daemon stop nodemanager

注:不可start-all一次性启动了

jps查看进程

即成功

http://192.168.222.171:50070/ 浏览器查看namenode信息 

还有9001 查看2nn 信息

2nn信息显示不出来,解决方案:

cd etc/hadoop

find /usr/hadoop/hadoop-3.1.3/ -name dfs-dust.js

vi /usr/hadoop/hadoop-3.1.3/share/hadoop/hdfs/webapps/static/dfs-dust.js

:se nu显示行数

61行修改为:return new Date(Number(v)).toLocaleString();

清除网页缓存 刷新后显示

打开网站之前注意关闭防火墙(否则连不上)systemctl stop firewalld

关闭安全模式:

hdfs dfsadmin -safemode enter/leave

  1. HDFS 基本操作命令、Java-API 编写

2.1 HDFS 基本操作命令

  1. 上传文件到 HDFS

hdfs dfs -put /path/to/local/file /path/to/hdfs/directory

  1. 从 HDFS 下载文件

hdfs dfs -get /path/to/hdfs/file /path/to/local/directory

  1. 列出 HDFS 目录

hdfs dfs -ls /path/to/hdfs/directory

  1. 删除 HDFS 中的文件或目录

hdfs dfs -rm -r /path/to/hdfs/directory

  1. 查看文件内容

hdfs dfs -cat /path/to/hdfs/file

2.2 HDFS Java API 编写

使用 Hadoop Java API 可以操作 HDFS。以下是一个简单的 HDFS 操作示例,包括文件上传、下载和读取。

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import java.io.InputStream;import java.net.URI;public class HDFSExample {public static void main(String[] args) throws Exception {// 1. 创建 HDFS 配置Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://master:9000");// 2. 获取 HDFS 文件系统对象FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), conf, "hadoop");// 3. 上传文件到 HDFSfs.copyFromLocalFile(new Path("/local/path/file.txt"), new Path("/hdfs/path/file.txt"));System.out.println("File uploaded to HDFS.");// 4. 读取 HDFS 文件内容InputStream in = fs.open(new Path("/hdfs/path/file.txt"));byte[] buffer = new byte[1024];int bytesRead;while ((bytesRead = in.read(buffer)) > 0) {System.out.write(buffer, 0, bytesRead);}// 5. 下载文件到本地fs.copyToLocalFile(new Path("/hdfs/path/file.txt"), new Path("/local/path/downloaded.txt"));System.out.println("File downloaded from HDFS.");// 6. 关闭 HDFS 文件系统fs.close();}}

3. MapReduce 编程、数据预处理、指标计算

3.1 MapReduce 基本概念

MapReduce 是 Hadoop 的分布式计算框架,用于大规模数据处理。它包括两个阶段:

  • Map 阶段:将输入数据分割成键值对,并执行处理。
  • Reduce 阶段:对 Map 阶段的输出进行聚合或汇总。

3.2 MapReduce 编程示例

以下是一个简单的 WordCount 示例:

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.util.StringTokenizer;public class WordCount {public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}

3.3 数据预处理和指标计算

使用 MapReduce 可以对原始数据进行预处理和计算常见指标,比如去重、排序、聚合等。你可以通过自定义 Mapper 和 Reducer 实现这些操作。

4. 任务提交、优化、排错

4.1 提交 MapReduce 任务

将编写的 MapReduce 任务打包为 JAR 文件,然后提交到集群上运行:

hadoop jar WordCount.jar WordCount /input/path /output/path

4.2 优化

  1. 调优 Mapper 和 Reducer 数量

增加 mapreduce.job.maps 和 mapreduce.job.reduces 的数量,以提高并行度。

  1. 压缩 Map 输出

启用 Map 输出压缩来减少网络传输:

mapreduce.map.output.compress=true

  1. 使用合适的数据格式

使用更高效的文件格式(如 Parquet、ORC)以减少 I/O 开销。

  1. 数据本地化

确保计算尽可能靠近数据存储节点,减少网络传输。

4.3 排错

  1. 检查日志文件

Hadoop 任务的日志可以帮助调试。日志通常存储在 HDFS 中的 logs 目录或本地机器的 /tmp/hadoop-logs 目录中。

  1. YARN UI

通过 YARN 的 web UI (http://master:8088) 可以查看任务的执行状态、日志和资源消耗。

  1. 使用 jobhistory 工具

通过 Hadoop Job History 可以查看任务执行的详细信息。

大数据组件-HBase

1. HBase 集群搭建

1.hbase安装:master上

1.准备将windows系统中的hbase上传到/opt/soft:

cd /opt/soft   #由于之前创建过了这里直接切换到/opt/soft目录

2.点击上传按钮,将hbase上传

3.将hbase安装到特定的目录下面:

mkdir -p  /usr/hbase     #创建一个目录用来安装hbase

4.安装:

tar -zxvf  /opt/soft/hbase-1.2.4-bin.tar.gz -C /usr/hbase

配置conf/hbase-env.sh

/usr/hbase/hbase-2.4.11/conf

export HBASE_MANAGES_ZK=falseexport JAVA_HOME=/usr/java/jdk1.8.0_212export HBASE_CLASSPATH=/usr/hadoop/hadoop-3.1.3/etc/Hadoop

配置conf/hbase-site.xml

(5项)注意:机器名:端口号 与Hadoop core-site.xml主机一致

<property><name>hbase.rootdir</name><value>hdfs://master:9820/hbase</value></property><property><name>hbase.cluster.distributed</name><value>true</value></property><property><name>hbase.master</name><value>hdfs://master:6000</value></property><property><name>hbase.zookeeper.quorum</name><value>master,slave1,slave2</value></property><property><name>hbase.zookeeper.property.dataDir</name><value>/usr/zookeeper/apache-zookeeper-3.5.7-bin/zkdata</value></property>

如果zookeeper的zoo.cfg中port:2181则不必配置下面一项。

<property>

<name>hbase.zookeeper.property.clientPort</name>

<value>12181</value>

</property>

#----------------------

#说明clientPort需要与zookeeper一致

#-----------------------------

配置conf/regionservers

(仅master)

slave1

slave2

masterbak

5.cp hadoop的两个配置文件过来

cp  hadoop的hdfs-site.xml和core-site.xml到hbase的conf下

cp /usr/hadoop/hadoop-3.1.3/etc/hadoop/hdfs-site.xml /usr/hbase/hbase-2.4.11/conf/

cp /usr/hadoop/hadoop-3.1.3/etc/hadoop/core-site.xml /usr/hbase/hbase-2.4.11/conf/

6.分发到其他三台机器

scp -r /usr/hbase root@slave1:/usr

scp -r /usr/hbase root@slave2:/usr

scp -r /usr/hbase root@masterbak:/usr

配置环境变量:(所有机器)

#my hbase

export HBASE_HOME=/usr/hbase/hbase-2.4.11

export PATH=$PATH:$HBASE_HOME/bin

生效环境变量(4台机器)

master运行hbase【先启动zookeeper,hadoop】

hbase2.4.11:   bin/start-hbase.sh

jps

192.168.222.171:16010

7.hbase shell

启动hbase shell

list

显示还没有任何表

退出:quit

>status     查看状态

2. HBase Shell 操作

进入 HBase Shell:

$HBASE_HOME/bin/hbase shell

DDL 操作

  1. 创建表

create 'mytable', 'cf'

    • 'mytable' 是表名。
    • 'cf' 是列族名(列族是HBase表的基本存储单元)。
  1. 查看表

list

  1. 查看表的详细描述

describe 'mytable'

  1. 删除表

禁用表:

disable 'mytable'

删除表:

drop 'mytable'

DML 操作

  1. 插入数据

put 'mytable', 'row1', 'cf:column1', 'value1'

    • 'mytable' 是表名。
    • 'row1' 是行键(Row Key)。
    • 'cf:column1' 是列族和列名。
    • 'value1' 是要插入的值。
  1. 读取数据

get 'mytable', 'row1'

  1. 扫描表数据

scan 'mytable'

  1. 删除数据

delete 'mytable', 'row1', 'cf:column1'

  1. 计数表中的行数

count 'mytable'

3. Java-API 编写

步骤 1: 准备 HBase 和依赖 JAR

手动下载和添加 HBase、Hadoop 和 Zookeeper 的 JAR 文件。

步骤 2: 创建 Eclipse Java 项目

  1. 打开 Eclipse,选择 File -> New -> Java Project。
  2. 输入项目名称(例如:HBaseExample),并点击 Finish。

步骤 3: 手动添加 JAR 文件

  1. 右键点击项目(HBaseExample),选择 Build Path -> Configure Build Path。
  2. 在 Libraries 选项卡中点击 Add External JARs。
  3. 导航到 HBase、Hadoop、Zookeeper 的 lib 目录,手动添加所有 JAR 文件。
    • HBase JAR:添加 hbase-client.jar、hbase-server.jar 以及 hbase-common.jar 等。
    • Hadoop JAR:添加 hadoop-common.jar、hadoop-hdfs.jar 等。
    • Zookeeper JAR:添加 zookeeper.jar。
  4. 确保所有必要的 JAR 文件都已添加,并点击 Apply and Close。

步骤 4: 编写 HBase Java API 代码

  1. 右键点击 src 文件夹,选择 New -> Class,输入类名(如 HBaseExample),并勾选 public static void main(String[] args),点击 Finish。
  2. 编写 HBase Java API 代码。以下是示例代码:
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;public class HBaseExample {public static void main(String[] args) throws IOException {// 创建 HBase 配置Configuration config = HBaseConfiguration.create();config.set("hbase.zookeeper.quorum", "master,slave1,slave2");config.set("hbase.zookeeper.property.clientPort", "2181");// 创建 HBase 连接try (Connection connection = ConnectionFactory.createConnection(config)) {Admin admin = connection.getAdmin();// 创建表TableName tableName = TableName.valueOf("mytable");if (!admin.tableExists(tableName)) {TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).build();admin.createTable(tableDescriptor);System.out.println("Table created: " + tableName);}// 插入数据try (Table table = connection.getTable(tableName)) {Put put = new Put(Bytes.toBytes("row1"));put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("column1"), Bytes.toBytes("value1"));table.put(put);System.out.println("Data inserted.");// 读取数据Get get = new Get(Bytes.toBytes("row1"));Result result = table.get(get);byte[] value = result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("column1"));System.out.println("Retrieved value: " + Bytes.toString(value));}} catch (IOException e) {e.printStackTrace();}}}

步骤 5: 运行代码

  1. 确保 HBase 集群已启动

启动 HBase:

$HBASE_HOME/bin/start-hbase.sh

使用 jps 确认 HMaster 和 HRegionServer 是否正常运行。

  在 Eclipse 中运行 Java 项目

右键点击 HBaseExample.java 文件,选择 Run As -> Java Application。

Eclipse 将编译并运行你的代码,控制台会显示程序的输出。

步骤 6: 验证输出

执行成功后,控制台会输出类似如下信息:

Table created: mytable

Data inserted.

Retrieved value: value1

这表明程序已成功连接到 HBase,创建了表 mytable,并执行了数据插入和读取操作。

大数据组件-Hive

  1. Hive 搭建

步骤 1: 安装 Hadoop

Hive 依赖 Hadoop,所以首先需要安装 Hadoop。如果 Hadoop 已经安装,请确保其配置正确。

步骤 2: 安装 Hive

(slave1)【Hadoop先启动好 --- 防火墙一定得关上】

1 上传安装包

       # cd /opt/soft

上传apache-hive-3.1.2-bin.tar.gz压缩包

mkdir /usr/hive

2 解压缩安装包

 tar -zxvf apache-hive-3.1.2-bin.tar.gz -C /usr/hive

3 修改hive的文件夹名称

cd /usr/hive

mv apache-hive-3.1.2-bin hive

4 添加hive的环境变量

vi /etc/profile

              =======添加内容如下======

# ----HIVE_HOME

export HIVE_HOME=/usr/hive/hive

export PATH=$PATH:$HIVE_HOME/bin

soruce /etc/profile

cd $HIVE_HOME

5  拷贝MySQL驱动包

cd /opt/soft

上传mysql驱动包  mysql-connector-java-5.1.37.jar

将mysql驱动包拷贝到hive中

cp mysql-connector-java-5.1.37.jar $HIVE_HOME/lib

在slave1:apache-hive/conf

cp  hive-env.sh.template  hive-env.sh

vi  hive-env.sh

修改如下内容:

# Set HADOOP_HOME to point to a specific hadoop

HADOOP_HOME=/usr/hadoop/hadoop-3.1.3

slave1中

cp  hive-default.xml.template  hive-site.xml

vi  hive-site.xml

Shift+g跳到最后

添加内容:

<property><name>hive.metastore.warehouse.dir</name> <value>/warehousedir/home</value> </property><property><name>javax.jdo.option.ConnectionURL</name><value>jdbc:mysql://slave2:3306/hive?createDatabaseIfNotExist=true&amp;useSSL=false</value></property><property><name>javax.jdo.option.ConnectionDriverName</name><value>com.mysql.jdbc.Driver</value></property><property><name>javax.jdo.option.ConnectionUserName</name><value>root</value></property><property><name>javax.jdo.option.ConnectionPassword</name><value>123456</value></property><property><name>hive.metastore.schema.verification</name><value>false</value></property><property><name>datanucleus.schema.autoCreateAll</name><value>true</value></property><property><name>hive.exec.scratchdir</name><value>/tmp/hive</value></property><property><name>hive.exec.local.scratchdir</name><value>/tmp/hive/local</value></property><property><name>hive.downloaded.resources.dir</name><value>/tmp/hive/resources</value></property>

slave1上启动hive服务

执行初始化Hive元数据库命令

schematool -initSchema -dbType mysql -verbos

先启动HADOOP

在/hive/ 输入命令: bin/hive (注意hive-site.xml配置是否正确,否则无法通过

启动hive

# cd /usr/hive/hive/

# bin/hive

hive>show databases;

hive> use default;

hive>show tables;

hive>quit;

  1. 基于Linux的MySQL安装、元数据存储到MySQL配置

步骤 1: 安装 MySQL(slave2)

MySQL安装包上传(在slave2上)

cd /opt/soft

1.上传到该目录mysql-5.7.28-1.el7.x86_64.rpm-bundle.tar

2.解压缩第一层包(在slave2cd上)

cd /opt/soft

tar -xf mysql-5.7.28-1.el7.x86_64.rpm-bundle.tarsudo rpm -ivh mysql-community-common-5.7.28-1.el7.x86_64.rpmsudo rpm -ivh mysql-community-libs-5.7.28-1.el7.x86_64.rpm --force --nodepssudo rpm -ivh mysql-community-libs-compat-5.7.28-1.el7.x86_64.rpm --force --nodepssudo rpm -ivh mysql-community-client-5.7.28-1.el7.x86_64.rpmsudo yum install -y libaiosudo rpm -ivh mysql-community-server-5.7.28-1.el7.x86_64.rpm --force --nodeps

 3.删除配置文件(在slave2上)

       查看mysql所安装的目录(查看datadir的目录结果)

vi /etc/my.cnf    就是/var/lib/mysql这里

       删除datadir指向的目录所有文件内容

cd /var/lib/mysql

sudo rm -rf ./*

4 初始化数据库

        sudo mysqld --initialize --user=mysql

5 查看初始化密码( -localhost后面)

        sudo cat /var/log/mysqld.log

6 启动MySQL的服务

        sudo systemctl start mysqld

7 登录MySQL数据库

        mysql -u root -p

              Enter password:输入mysqld.log中的密码

8 修改数据库密码

        mysql>set password = password("123456");

9 修改数据库任意连接

        mysql>update mysql.user set host='%' where user='root';

        mysql>flush privileges;

        mysql>quit;

10 测试mysql数据库

mysql -u root -p

       Enter password:123456

mysql>quit;

show databases;

创建一个数据库名为Hive,并设置编码为latin1,用于存储Hive的元数据

create database Hive default character set latin1;

创建一个特定的数据库用户hive,密码设置为123456,并将步骤3中创建的Hive库授权给该用户

create user 'hive'@'localhost' identified by '123456';

grant create,alter,drop,select,insert,update,delete on Hive.user_info to hive@localhost identified by '123456';

  1. DDL、DML、查询(基本查询、分组、join语句、排序)

Hive 支持常见的 SQL DDL 和 DML 操作。以下是常用的操作示例。

DDL 操作

  1. 创建数据库

CREATE DATABASE mydb;

  1. 创建表
CREATE TABLE students (id INT,name STRING,age INT,gpa FLOAT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
  1. 删除表

DROP TABLE students;

DML 操作

  1. 插入数据

INSERT INTO students VALUES (1, 'Alice', 21, 3.5);

  1. 加载数据

LOAD DATA INPATH '/path/to/data' INTO TABLE students;

查询操作

  1. 基本查询

SELECT * FROM students;

  1. 分组查询

SELECT age, COUNT(*) FROM students GROUP BY age;

  1. 排序查询

SELECT * FROM students ORDER BY gpa DESC;

  1. Join 查询
SELECT a.name, b.course_nameFROM students aJOIN courses bON a.id = b.student_id;

4. 函数(单行、聚合、炸裂、窗口)、自定义函数UDF

单行函数

  • 字符串函数

SELECT UPPER(name), LOWER(name) FROM students;

  • 日期函数

SELECT CURRENT_DATE();

  • 数学函数

SELECT ABS(-10), ROUND(3.1415, 2);

聚合函数

  • 求和、平均、计数

SELECT SUM(gpa), AVG(gpa), COUNT(*) FROM students;

炸裂函数(explode)

  • 用于将数组或嵌套数据展开为多行。

SELECT name, explode(array(1, 2, 3)) AS num FROM students;

窗口函数

  • ROW_NUMBER():返回行号。
SELECT name, gpa, ROW_NUMBER() OVER (PARTITION BY age ORDER BY gpa DESC) AS rank FROM students;

自定义函数UDF

Hive 允许用户编写自定义函数(UDF)来扩展其功能。以下是如何编写和使用自定义UDF的步骤。

编写 UDF 示例

  1. 编写一个简单的 Hive UDF:
import org.apache.hadoop.hive.ql.exec.UDF;import org.apache.hadoop.io.Text;public class UpperCaseUDF extends UDF {public Text evaluate(Text input) {if (input == null) {return null;}return new Text(input.toString().toUpperCase());}}
  1. 编译 UDF:
    • 使用 Hadoop 和 Hive 的 jar 文件编译这个 Java 类。

javac -cp $(hadoop classpath):$(hive classpath) UpperCaseUDF.java

jar -cf UpperCaseUDF.jar UpperCaseUDF.class

  1. 将 UDF 注册到 Hive 中:
    • 将 UpperCaseUDF.jar 复制到 Hive 集群,并通过 Hive CLI 注册该 UDF。

ADD JAR /path/to/UpperCaseUDF.jar;

CREATE TEMPORARY FUNCTION to_upper AS 'UpperCaseUDF';

  1. 使用 UDF:

SELECT to_upper(name) FROM students;

大数据组件-Spark

  1. Spark三种搭建方式

1.1 Standalone模式

Standalone模式是Spark自带的集群管理器,适合快速测试或简单的生产环境。你不需要任何外部资源管理工具,Spark的Master节点充当资源调度器,Task节点作为工作节点。

优点

  • 简单易用,适合小型集群或测试环境。
  • 无需外部的集群管理工具。

搭建步骤

  1. 下载并解压Spark:

上传spark源码包到/opt/soft

mkdir /usr/spark

cd /opt/soft

tar -zxvf spark-3.1.1-bin-hadoop3.2.tgz -C /usr/spark

  1. 启动Spark Master和Worker:

./sbin/start-master.sh

./sbin/start-worker.sh spark://<master-ip>:7077

  1. 提交任务:

./bin/spark-submit --master spark://<master-ip>:7077 <your_spark_job.py>

安装scala [master]

[root@master ~]# mkdir /usr/scala

上传scala-2.11.12.tgz 到/opt/soft

yum install unzip

unzip -o -d /usr/scala scala-2.12.11.zip

[root@master soft]# tar -zxvf scala-2.11.12.tgz -C /usr/scala

配置环境变量

[root@master scala-2.11.12]# vi /etc/profile

加入如下三行:

#scala

export SCALA_HOME=/usr/scala/scala-2.12.11

export PATH=$PATH:$SCALA_HOME/bin

保存退出

[root@master scala-2.11.12]# source /etc/profile

[root@master scala-2.11.12]# scala -version

Scala code runner version 2.12.11 -- Copyright 2002-2020, LAMP/EPFL and Lightbend, Inc.

分发到其他服务器,并配置环境变量

scp -r /usr/scala/ root@slave1:/usr

scp -r /usr/scala/ root@slave2:/usr

[我自己的Standalone]配置spark

进入spark安装目录下的conf

cd /usr/spark/spark-3.1.1-bin-hadoop3.2/conf/

cp workers.template slaves  并修改

vi进入:

最后一行

删掉localhost

填上

master

slave1

slave2

cp spark-env.sh.template spark-env.sh

# spark-env.sh

export JAVA_HOME=/usr/java/jdk1.8.0_212

export SCALA_HOME=/usr/scala/scala-2.12.11

export SPARK_WORKER_MEMORY=1g

export HADOOP_CONF_DIR=/usr/hadoop/hadoop-3.1.3/etc/hadoop

cp spark-defaults.conf.template spark-site.conf

# Example:

spark.master  spark://master:7077

这两个文件暂时不必修改

cp fairscheduler.xml.template fairscheduler.xml

cp log4j.properties.template log4j.properties

分发到其他机器,并修改环境变量

scp -r /usr/spark root@slave1:/usr

scp -r /usr/spark root@slave2:/usr

vi /etc/profile

#spark

export SPARK_HOME=/usr/spark/spark-3.1.1-bin-hadoop3.2/

export PATH=$PATH:SPARK_HOME/bin

启动spark

在spark安装目录下:

sbin/start-all.sh

jps

spark安装目录下执行:bin/spark-shell

ctrl-z 退出

1.2 YARN模式

YARN(Yet Another Resource Negotiator)是Hadoop生态系统中的资源管理器。使用YARN可以让Spark与Hadoop无缝集成,允许它在Hadoop集群中运行。

优点

  • 与Hadoop集群完美结合,利用HDFS、MapReduce等资源。
  • 支持动态资源调度,适合大规模集群。

搭建步骤

  1. 确保Hadoop和YARN集群已配置和运行。
  2. 启动YARN Resource Manager:

start-yarn.sh

  1. 提交Spark任务到YARN:

./bin/spark-submit --master yarn <your_spark_job.py>

1.3 Mesos模式

Mesos是一个通用的集群管理系统,允许Spark与其他服务共享集群资源。

优点

  • 适用于需要多个框架共享集群资源的环境。
  • 支持多种类型的任务调度。

搭建步骤

  1. 安装并配置Mesos。
  2. 启动Mesos Master和Slave。
  3. 使用Spark提交任务到Mesos:

./bin/spark-submit --master mesos://<mesos-master-ip>:5050 <your_spark_job.py>

2. Spark Core

Spark Core是Spark的基础模块,负责内存管理、调度、任务分发、容错和与存储系统的交互。Spark Core包含以下核心概念:

2.1 RDD (Resilient Distributed Dataset)

RDD是Spark的核心抽象,代表一个不可变的分布式数据集。通过对RDD进行变换(transformation)或行动(action)来执行计算。

  • Transformation:转换操作,惰性执行。例如,map()、filter()。
  • Action:执行操作,立即触发计算。例如,count()、collect()。

示例代码:RDD的基本操作(scala)

import org.apache.spark.{SparkConf, SparkContext}// 配置和初始化 Sparkval conf = new SparkConf().setAppName("Spark Core Example").setMaster("local")val sc = new SparkContext(conf)// 创建 RDDval data = Array(1, 2, 3, 4, 5)val rdd = sc.parallelize(data)// Transformation: map 将每个元素乘以 2val rdd2 = rdd.map(x => x * 2)// Action: collect 收集结果并打印val result = rdd2.collect()result.foreach(println)// 关闭 Sparksc.stop()

3. Spark SQL

Spark SQL 提供了在Spark上使用结构化数据的API。它允许用户通过SQL查询数据,并且无缝集成到Spark的批处理和流处理任务中。Spark SQL使用DataFrame和Dataset API进行操作。

示例代码:使用DataFrame和SQL查询(scala)

import org.apache.spark.sql.SparkSession// 初始化 SparkSessionval spark = SparkSession.builder().appName("Spark SQL Example").master("local").getOrCreate()// 创建 DataFrameval df = spark.read.json("examples/src/main/resources/people.json")// 显示 DataFrame 内容df.show()// 使用 SQL 查询df.createOrReplaceTempView("people")val sqlDF = spark.sql("SELECT name, age FROM people WHERE age > 30")sqlDF.show()// 关闭 SparkSessionspark.stop()

优点:

  • 支持标准的SQL查询。
  • 可以直接与结构化数据(如JSON、Parquet、Hive表)集成。
  • 动态查询优化(Catalyst引擎)提高性能。

4. Spark Streaming

Spark Streaming 是用于实时数据处理的组件。它可以通过微批处理将实时数据流分批处理,每一小批数据都以RDD的形式进行处理。

示例代码:Spark Streaming 处理Socket数据流

import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")val ssc = new StreamingContext(conf, Seconds(1))// 从 TCP Socket 接收数据val lines = ssc.socketTextStream("localhost", 9999)val words = lines.flatMap(_.split(" "))// 统计单词数val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)// 输出结果wordCounts.print()// 启动 StreamingContextssc.start()ssc.awaitTermination()

优点:

  • 支持多种数据源:Kafka、Flume、HDFS等。
  • 适合处理实时流数据。
  • 支持窗口化操作和故障恢复。

5. Spark MLlib

MLlib 是 Spark 中的机器学习库,包含常见的机器学习算法和工具,如分类、回归、聚类和协同过滤。它基于Spark的分布式架构,能够对大规模数据集进行高效的机器学习任务。

示例代码:使用MLlib进行线性回归

import org.apache.spark.ml.regression.LinearRegressionimport org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().appName("LinearRegressionExample").getOrCreate()// 加载数据val data = spark.read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")// 初始化线性回归模型val lr = new LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)// 训练模型val lrModel = lr.fit(data)// 打印模型系数和截距println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}"// 评估模型val trainingSummary = lrModel.summaryprintln(s"RMSE: ${trainingSummary.rootMeanSquaredError}")// 关闭 SparkSessionspark.stop()

MLlib 支持的主要功能:

  • 分类:逻辑回归、决策树、随机森林等。
  • 回归:线性回归、岭回归等。
  • 聚类:KMeans、Gaussian Mixture等。
  • 降维:PCA、SVD等。

大数据组件-Zookeeper

  1. Zookeeper 集群安装

1.准备将windows系统中的zookeeper上传到/opt/soft:

cd /opt/soft   #由于之前创建过了这里直接切换到/opt/soft目录

2.点击上传按钮,将zookeeper上传

3.将zookeeper安装到特定的目录下面:

mkdir -p /usr/zookeeper     #创建一个目录用来安装zookeeper

4.安装:

tar  -zxvf   zookeeper-3.4.10.tar.gz  -C  /usr/zookeeper

2.Zookeeper配置(所有机器都需要)

1.切换到相关目录

cd  /usr/zookeeper/ zookeeper-3.4.10/conf    #接下来的步骤我们在这个目录下进行

2.ls      #查看该目录下面有什么

3.我们需要配置文件,但是从“ls”可知,只给了我们一个模板,需要我们复制一份,然后再配置文件

4.cp  zoo_sample.cfg  zoo.cfg  #zoo.cfg是它的新名字,内容和zoo_sample.cfg一样

5.编辑zoo.cfg

 vi zoo.cfg

dataDir后面

改为

dataDir=/usr/zookeeper/apache-zookeeper-3.5.7-bin/zkdata

dataLogDir=/usr/zookeeper/apache-zookeeper-3.5.7-bin/zkdatalog

server.1=192.168.222.171:2888:3888

server.2=192.168.222.172:2888:3888

server.3=192.168.222.173:2888:3888

6.返回上一级位置

cd ..

7.创建我们在第5步要创建的文件

mkdir zkdata

mkdir zkdatalog

8.进入到zkdata目录下创建一个新文件并编辑它

cd zkdata

vi myid

编辑的内容和虚拟机有关系,如果是master,里面写入1,如果是slave1里面写入2,如果是slave2,里面写入3,如果是masterbak,里面写入4

9.接下来我们还要配置另外两台虚拟机,为了方便,我们直接远用scp,将master的zookeeper整个文件夹远程拷贝给slave1,slave2,masterbak,这样就不需要配置了

scp   -r  /usr/zookeeper  root@slave1:/usr  #这里考的是文件夹所以是scp -r 命令

scp   -r  /usr/zookeeper  root@slave2:/usr    #同样拷一份到slave2中去

scp   -r  /usr/zookeeper  root@masterbak:/usr

10.拷贝过来的内容仍然需要做一些修改

到slave1中

cd /usr/zookeeper/zookeeper-3.4.10/zkdata   #切换到该目录下

11.对myid 文件中的内容进行修改,因为不同的虚拟机,里面的内容不一样

   vi myid

 将里面的内容改为2

12.slave2同上10,11步骤,不同的是myid文件中,内容改为3

3.配置zookeeper的环境变量(所有都需要)

1.进入到指定的文件夹

vi /etc/profile

 zookeeper的环境变量放在Java的环境变量下面

#Zookeeper------

export ZOOKEEPER_HOME=/usr/zookeeper/apache-zookeeper-3.5.7-bin/

export PATH=$PATH:$ZOOKEEPER_HOME

2. 使环境变量生效

source /etc/profile

3.运行zookeeper

必须在zookeeper安装目录下

cd /usr/zookeeper/apache-zookeeper-3.5.7-bin/

bin/zkServer.sh start       #这一步必须所有机器同时运行

可能会出现错误:关闭防火墙再看状态即可(systemctl stop firewalld)

端点占用:sudo yum install net-tools

netstat -apn | grep 2181

kill -9 xx

bin/zkServer.sh status   #这一步选出谁是领导者谁是跟随者

jps 查看进程

4. Zookeeper 启停脚本编写

为了简化启动和停止 Zookeeper 集群的操作,我们可以编写一个脚本,通过 SSH 登录集群节点并启动或停止 Zookeeper。

启停脚本示例:zk_manage.sh

#!/bin/bashZOOKEEPER_HOME=/usr/zookeeper/apache-zookeeper-3.5.7-binZOOKEEPER_NODES="node01 node02 node03"case $1 in"start")for node in $ZOOKEEPER_NODESdoecho "Starting Zookeeper on $node"ssh "$node" "$ZOOKEEPER_HOME/bin/zkServer.sh start"done;;"stop")for node in $ZOOKEEPER_NODESdoecho "Stopping Zookeeper on $node"ssh "$node" "$ZOOKEEPER_HOME/bin/zkServer.sh stop"done;;"status")for node in $ZOOKEEPER_NODESdoecho "Checking Zookeeper status on $node"ssh "$node" "$ZOOKEEPER_HOME/bin/zkServer.sh status"done;;*)echo "Usage: $0 {start|stop|status}"exit 1;;esac

说明:

  • 该脚本读取集群节点列表(ZOOKEEPER_NODES),并通过SSH连接到每个节点执行启动、停止或状态检查命令。

使用方式:

    • 启动:./zk_manage.sh start
    • 停止:./zk_manage.sh stop
    • 查看状态:./zk_manage.sh status

设置可执行权限:

chmod +x zk_manage.sh

5. 客户端命令行操作

Zookeeper 提供了命令行客户端工具 zkCli.sh,可以用来连接到 Zookeeper 服务并执行命令,如创建、删除 Znodes,查询节点数据等。

连接 Zookeeper:

在任意节点上使用 zkCli.sh 连接到 Zookeeper 服务:

/usr/zookeeper/apache-zookeeper-3.5.7-bin/bin/zkCli.sh -server node01:2181

常用命令:

1.创建节点

create /myNode "hello zookeeper"

2.查询节点数据

get /myNode

3.查看子节点

ls /

4.修改节点数据

set /myNode "new data"

5.删除节点

delete /myNode

6.退出客户端

quit

6. Java-API编写

步骤 1: 创建 Java 项目

  1. 创建新项目
    • 打开Eclipse,选择 File -> New -> Java Project。
    • 输入项目名称(例如:ZookeeperExample)。
    • 点击 Finish 完成项目创建。

步骤 2: 下载 Zookeeper JAR 文件

  1. 下载Zookeeper的JAR包:解压Zookeeper的JAR包

通常解压后会看到 zookeeper-<version>/lib 目录,其中包含了多个依赖库(包括Zookeeper的核心库和其他所需依赖)。

  1. 主要的JAR文件

需要包括 zookeeper-<version>.jar(位于解压目录的根目录)和 lib 文件夹中的所有JAR文件。

步骤 3 在 Eclipse 中添加 Zookeeper 库

  1. 将 Zookeeper 的 JAR 文件添加到项目中

右键点击你的项目(例如:ZookeeperExample),选择 Build Path -> Configure Build Path。

选择 Libraries 选项卡,然后点击 Add External JARs。

导航到你下载并解压的Zookeeper目录,选择 zookeeper-<version>.jar 和 lib 目录中的所有JAR文件。

点击 Apply and Close,这将把Zookeeper的依赖库添加到你的项目中。

步骤 4: 编写 Zookeeper Java API 代码

创建 Java 类

右键点击 src 目录,选择 New -> Class。

将类命名为 ZookeeperExample,并勾选 public static void main(String[] args)。

点击 Finish。

编写 Zookeeper 示例代码

在 ZookeeperExample.java 中,输入以下代码:

package com.example;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import java.io.IOException;public class ZookeeperExample {private static ZooKeeper zooKeeper;private static ZooKeeperConnection conn;public static void main(String[] args) throws IOException, KeeperException, InterruptedException {// 连接到 Zookeeperconn = new ZooKeeperConnection();zooKeeper = conn.connect("localhost");  // 假设 Zookeeper 在本地运行,端口为默认的 2181// 创建 ZnodeString path = "/myNode";byte[] data = "Hello Zookeeper".getBytes();create(path, data);// 获取 Znode 数据System.out.println("Data of Znode: " + new String(getData(path)));// 修改 Znode 数据byte[] newData = "New data".getBytes();setData(path, newData);// 获取修改后的 Znode 数据System.out.println("Updated data of Znode: " + new String(getData(path)));// 删除 Znodedelete(path);// 关闭连接conn.close();}// 创建 Znodepublic static void create(String path, byte[] data) throws KeeperException, InterruptedException {zooKeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}// 获取 Znode 数据public static byte[] getData(String path) throws KeeperException, InterruptedException {Stat stat = zooKeeper.exists(path, true);return zooKeeper.getData(path, false, stat);}// 修改 Znode 数据public static void setData(String path, byte[] data) throws KeeperException, InterruptedException {zooKeeper.setData(path, data, zooKeeper.exists(path, true).getVersion());}// 删除 Znodepublic static void delete(String path) throws KeeperException, InterruptedException {zooKeeper.delete(path, zooKeeper.exists(path, true).getVersion());}}class ZooKeeperConnection {private ZooKeeper zoo;// 连接到 Zookeeper 服务器public ZooKeeper connect(String host) throws IOException, InterruptedException {zoo = new ZooKeeper(host, 5000, we -> {});return zoo;}// 关闭连接public void close() throws InterruptedException {zoo.close();}}

步骤 5: 运行代码

  1. 确保 Zookeeper 服务已经启动

在本地或服务器上启动Zookeeper服务:

/usr/zookeeper/apache-zookeeper-3.5.7-bin/bin/zkServer.sh start

  1. 在 Eclipse 中运行 Java 项目

右键点击 ZookeeperExample.java,选择 Run As -> Java Application。

控制台将会显示程序的输出,执行成功后,你将看到Znode被创建、读取、更新和删除的操作。

步骤 6: 验证输出

在Eclipse的控制台中,你应该看到类似如下的输出:

Data of Znode: Hello Zookeeper

Updated data of Znode: New data

这表明程序已经成功连接到Zookeeper,创建了Znode /myNode,并对其数据进行了读写操作。

大数据组件-Fink

1. Flink Yarn 模式搭建

在大数据环境中,Flink可以通过YARN进行集群部署管理。YARN作为资源管理器,负责分配和管理Flink作业的资源。

Step 1: 下载和解压Flink

软件包 放入集群 解压 /usr/flink

cd /usr/flink/flink-1.14.0/

Step 2: 配置Flink

1.修改配置文件:进入Flink安装目录,并修改 flink-conf.yaml 文件。

vi conf/flink-conf.yaml

在flink-conf.yaml中添加或修改以下配置:

jobmanager.memory.process.size: 1024m

taskmanager.memory.process.size: 2048m

taskmanager.numberOfTaskSlots: 4

parallelism.default: 2

2. YARN配置:确保Hadoop配置路径在Flink中可见,并设置HADOOP_CONF_DIR。

export HADOOP_CONF_DIR=/usr/hadoop/Hadoop-3.1.3/etc/hadoop

Step 3: 启动YARN集群

确保你的Hadoop集群已经启动并运行:

start-dfs.sh

start-yarn.sh

Step 4: 通过YARN启动Flink

你可以通过以下命令启动Flink的JobManager和TaskManager,交由YARN管理:

./bin/yarn-session.sh -d -n 3 -s 4 -jm 1024m -tm 2048m

  • -n 表示启动的TaskManager数量
  • -s 表示每个TaskManager的插槽数量
  • -jm 表示JobManager使用的内存
  • -tm 表示每个TaskManager使用的内存

此命令会在YARN上启动Flink会话模式,Flink作业可以提交到YARN集群中。

【Standalone模式】

vi /etc/profile

配置环境变量

# ---Flink

export FLINK_HOME=/usr/flink/flink-1.14.0/

export PATH=$PATH:$FLINK_HOME/bin

source /etc/profile

$FLINK_HOME

2.配置conf文件

cd conf

配置flink-conf.yaml

#1. 配置jobmanager rpc 地址

jobmanager.rpc.address: master

#2. 修改taskmanager内存大小,可改可不改

taskmanager.memory.process.size: 2048m

#3. 修改一个taskmanager中对于的taskslot个数,可改可不改

taskmanager.numberOfTaskSlots: 4

#修改并行度,可改可不改

parallelism.default: 4

3.vi masters

master:8081

vi workers

master

4.配置zoo

# 新建snapshot存放的目录,在flink目录下建

mkdir tmp

cd tmp

mkdir zookeeper

#修改conf下zoo.cfg配置

vi zoo.cfg

#snapshot存放的目录

dataDir=/usr/flink/flink-1.14.0/tmp/zookeeper

#配置zookeeper 地址

server.1=192.168.222.171:2888:3888

5.启动集群

bin/start-cluster.sh

192.168.222.171:8081进入浏览器界面

2. Flink 运行架构

Flink的运行架构基于JobManagerTaskManager,并结合分布式集群来处理作业。

  • JobManager:管理作业的执行,是Flink的调度器,负责协调各个任务,管理故障恢复。
  • TaskManager:执行实际的数据处理任务。它们接受来自JobManager的任务指令,负责执行并报告任务状态。
  • Dispatcher:负责提交作业并与外部接口进行通信。
  • ResourceManager:负责管理集群资源,并向JobManager分配资源。

Flink可以在不同模式下运行:

  • Standalone模式:Flink可以单独运行在一台或多台机器上。
  • YARN模式:Flink作业运行在YARN集群上,动态分配资源。
  • Kubernetes模式:Flink作业运行在Kubernetes集群上。

3. DataStream API

DataStream API 是Flink的核心API之一,用于处理无界数据流。以下是使用DataStream API进行简单数据流处理的代码示例。

import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;public class WordCount {public static void main(String[] args) throws Exception {// 设置执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从socket读取数据,假设监听在localhost:9999env.socketTextStream("localhost", 9999).flatMap(new Tokenizer()).keyBy(value -> value.f0).sum(1).print();// 执行流处理作业env.execute("Flink Streaming Word Count");}// FlatMap function,用于将输入数据分割成单词public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {// 分割单词并计数for (String word : value.split("\\s")) {if (word.length() > 0) {out.collect(new Tuple2<>(word, 1));}}}}}

步骤:

  1. 设置执行环境:使用 StreamExecutionEnvironment 创建Flink执行环境。
  2. 读取数据流:从Socket(localhost:9999)读取流式数据。
  3. 定义FlatMapFunction:用于将接收到的行分割成单词,并创建 Tuple2 形式的(word, count) 对象。
  4. 按key进行分组:通过 keyBy 操作,根据单词(Tuple2.f0)进行分组。
  5. 聚合:使用 sum(1) 对每个单词出现次数进行累加。
  6. 输出结果:使用 print() 输出到控制台。
  7. 启动执行:通过 env.execute 启动作业。

4. Flink中的时间和窗口

Flink有两种时间语义:事件时间处理时间。窗口是Flink处理无界数据流的核心机制,允许将数据划分为时间段来进行聚合处理。

时间类型:

1.事件时间:数据发生的实际时间,通常从数据源携带的时间戳中提取。

2.处理时间:数据到达Flink处理系统的时间。

窗口类型:

  1. 滚动窗口(Tumbling Window):将数据分割成不重叠的固定大小的窗口。
  2. 滑动窗口(Sliding Window):窗口有一个固定的大小和滑动步长,允许窗口重叠。
  3. 会话窗口(Session Window):根据用户的行为进行窗口划分,窗口之间没有固定的大小。

窗口示例:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;public class WindowExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 读取数据流DataStream<String> textStream = env.socketTextStream("localhost", 9999);// 进行窗口操作,按单词分组,并在时间窗口内进行计数textStream.flatMap(new Tokenizer()).keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))) // 使用滚动窗口,每10秒聚合一次.sum(1).print();env.execute("Window Example");}}

在此示例中,使用 TumblingEventTimeWindows 创建了一个10秒的滚动窗口。

5. 处理函数(Process Function)

ProcessFunction 是Flink提供的低级别的处理API,允许开发者对数据流进行灵活的操作。

示例代码:Process Function的使用

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.ProcessFunction;import org.apache.flink.util.Collector;public class ProcessFunctionExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.socketTextStream("localhost", 9999).process(new MyProcessFunction()).print();env.execute("ProcessFunction Example");}public static class MyProcessFunction extends ProcessFunction<String, String> {@Overridepublic void processElement(String value, Context ctx, Collector<String> out) throws Exception {// 输出原始数据并添加前缀out.collect("Processed: " + value);}}}

说明:

  • processElement 方法允许对流中的每个元素进行处理。
  • Collector 用于收集处理后的数据并输出。

大数据组件-Flume、Kafka

  1. 安装Flume

Step 1: 解压Flume安装包

1. 将Master节点Flume安装包解压到/opt/soft目录下,创建好/usr/flume

tar -zxvf /opt/soft/apache-flume-1.9.0-bin.tar.gz -C /usr/flume

Step 2: 配置环境变量

1. 配置环境变量

vi /etc/profile

添加以下内容:

#FLUME_HOME

export FLUME_HOME=/usr/flume/flume-1.9.0

export PATH=$PATH:$FLUME_HOME/bin

2. 配置文件(位于:/usr/flume/flume-1.9.1/conf)

将 flume-env.sh.template 复制更名为 flume-env.sh

cp flume-env.sh.template flume-env.sh

并添加以下内容:

vi flume-env.sh

export JAVA_HOME=/usr/java/jdk1.8.0_212

使配置文件生效

source /etc/profile

前提:

关闭hbase配置文件

将 hbase 的 hbase.env.sh 的一行配置注释掉

#Extra Java CLASSPATH elements. Optional.

#export HBASE_CLASSPATH=/home/hadoop/hbase/conf

验证

输入命令:

flume-ng version

三、任务

启动Flume传输Hadoop日志(namenode或datanode日志),查看HDFS中/tmp/flume目录下生成的内容

1,将hadoop与flume中 guava-27.0-jre.jar 包版本保持一致( 因为hadoop中此包版本是27而flume中版本是11 )

首先删除Flume自带的旧版本的Guava库

rm -rf /usr/flume/flume-1.9.1/lib/guava-11.0.2.jar

然后将hadoop里面的此包复制给flume

cp /usr/hadoop/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /usr/flume/flume-1.9.0/lib/

2.在 /usr/flume/flume-1.9.0/conf下新建文件 conf-file ,并写配置文件

创建Flume配置文件(conf-file)

Flume的工作方式是通过定义Source、Channel和Sink来收集、传输和存储数据。这些术语的含义如下:

Source(源):从某个位置读取数据(在这里是Hadoop的日志文件)。

Channel(通道):数据在从Source到Sink传输的过程中,需要通过Channel进行中转。

Sink(接收器):将数据存储到目标位置(在这里是HDFS)。

在/usr/flume/flume-1.9.0/conf目录下创建一个名为conf-file的Flume配置文件,内容如下:

a1.sources=r1

a1.sinks=k1

a1.channels=c1

# Source配置: 使用TAILDIR类型读取Hadoop的日志文件

a1.sources.r1.type=TAILDIR

a1.sources.r1.filegroups=as

a1.sources.r1.filegroups.as=/usr/hadoop/hadoop-3.1.3/logs/hadoop-root-namenode-master.log

# Channel配置: 使用内存通道存储数据

a1.channels.c1.type=memory

# Sink配置: 将数据写入HDFS

a1.sinks.k1.type=hdfs

a1.sinks.k1.hdfs.path=hdfs://master:9820/tmp/flume

# 绑定SourceChannel

a1.sources.r1.channels=c1

# 绑定SinkChannel

a1.sinks.k1.channel=c1

3.启动Flume传输Hadoop日志,查看HDFS中/tmp/flume目录下生成的文件

前提:启动hadoop集群

启动命令:

flume-ng agent -c conf -f conf-file -n a1 -Dflume.root.logger=INFO,console &

查看HDFS中/tmp/flume目录下生成的文件命令:

hdfs dfs -ls /tmp/flume

停止它:

通过ps命令查找并停止Flume进程

Step 1: 查找Flume进程ID (PID)

使用 ps 命令查找运行中的Flume进程:

ps -ef | grep flume

你将看到类似如下输出:

user   12345  6789  0  10:23 ?  00:00:15 java -cp /opt/flume/lib/... flume-ng agent -c conf ...

user   12350  6789  0  10:23 pts/1  00:00:00 grep --color=auto flume

这里,12345 是Flume agent进程的PID(进程ID)。你需要记住这个PID。

Step 2: 停止Flume进程

使用 kill -9命令来强制停止这个进程:

kill -9 12345

bin/flume-manage.sh stop force

  1. 安装Kafka

1、使用服务器名称进行通信

编辑 /etc/hosts 文件,在最后添加如下内容

192.168.222.171 node01

192.168.222.172 node02

192.168.222.173 node03

2、关闭防火墙

systemctl stop firewalld

3、安装 Zookeeper 集群

4.上传并解压文件到指定位置  tar -zxvf

5.配置

vi config/server.properties

添加内容:

# zookeeper myid 一样,每个实例拥有唯一的 ID

broker.id=1

# 日志文件,数据文件目录

log.dirs=/usr/kafka/kafka_2.12-3.0.0/logs

# zookeeper 集群,使用逗号分隔

zookeeper.connect=node01:2181,node02:2181,node03:2181/kafka

6.配置环境变量

编辑 /etc/profile 文件,添加如下内容至文件末尾

# kafka

export KAFKA_HOME=/usr/kafka/kafka_2.12-3.0.0

export PATH=$PATH:$KAFKA_HOME/bin

使配置生效

source /etc/profile

7.配置其它两台服务器

使用 scp 命令将 kafka 文件夹发送至其它两台服务器

scp /usr/kafka/kafka_2.12-3.0.0 @node02:/usr/kafka

scp /usr/kafka/kafka_2.12-3.0.0 @node03:/usr/kafka

修改 server.properties 配置文件中的 broker.id

部署完成,启动服务。

8.Kafka基本操作

先启动zookeeper服务

启动服务

kafka-server-start.sh /usr/kafka/kafka_2.12-3.0.0/config/server.properties &

3. Flume端口监听

设置Flume监听端口

在Flume的配置文件中设置source为netcat类型,并绑定到一个特定的IP地址和端口,这样Flume会在这个端口监听传入的数据。例如:

agent.sources.source1.type = netcat

agent.sources.source1.bind = 0.0.0.0

agent.sources.source1.port = 44444

启动Flume后,它会开始在44444端口监听。你可以使用telnet或netcat工具发送数据到这个端口:

telnet localhost 44444

  1. Kafka启停脚本编写

注意:

1.需先在/etc/hosts 文件中配置 IP 映射。

2.启动kafka集群前,要先启动zoopkeeper集群。

一、kafka集群启动脚本

#!/bin/bashBROKERS="node01 node02 node03" KAFKA_HOME="/usr/kafka/kafka_2.12-3.0.0" KAFKA_NAME=”kafka_2.12-3.0.0”for i in ${BROKERS}doecho "Starting ${KAFKA_NAME} on ${i} "ssh ${i} "source /etc/profile; nohup sh ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties > /dev/null 2>&1 &"if [[ $? -ne 0 ]]; thenecho "Starting ${KAFKA_NAME} on ${i} is  ok"fidoneecho All ${KAFKA_NAME} are startedexit 0

作用:通过ssh远程连接到当前服务器${i}(即hadoop01、hadoop02或hadoop03),并执行Kafka的启动命令。

详细解释

  • ssh ${i}:通过SSH连接到当前的服务器${i}。
  • "source /etc/profile":远程服务器执行source /etc/profile,加载环境变量,确保Kafka所需的环境变量已加载。
  • nohup sh ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties > /dev/null 2>&1 &:启动Kafka服务器,具体操作为:
    • nohup:使命令在后台运行,即使SSH连接断开,Kafka仍会继续运行。
    • sh ${KAFKA_HOME}/bin/kafka-server-start.sh:调用Kafka的启动脚本。
    • ${KAFKA_HOME}/config/server.properties:指定Kafka的配置文件位置。
    • > /dev/null 2>&1 &:将所有输出(包括标准输出和标准错误输出)重定向到/dev/null,即忽略所有输出,并在后台运行进程。

二、kafka集群停止脚本

#!/bin/bashBROKERS="node01 node02 node03" KAFKA_HOME="/usr/kafka/kafka_2.12-3.0.0" KAFKA_NAME="kafka_2.12-3.0.0"for i in $BROKERSdoecho "Stopping ${KAFKA_NAME} on ${i}..."# 使用 ssh 执行 Kafka 停止命令,并捕获其返回状态ssh ${i} "source /etc/profile; bash ${KAFKA_HOME}/bin/kafka-server-stop.sh"# 检查 ssh 命令是否成功if [[ $? -eq 0 ]]; thenecho "Kafka stopped successfully on ${i}"elseecho "Failed to stop Kafka on ${i}"fidoneecho "All ${KAFKA_NAME} are stopped"exit 0

为保证这两个脚本能够执行,需要设置可执行权限:

chmod +x start-kafka.sh

chmod +x stop-kafka.sh

三、启停合并脚本

#! /bin/bashcase $2 in"start")for i in $(cat $1)doecho " --------启动 $i Kafka-------"# 用于KafkaManager监控,JMX_PORT 设置ssh $i "export JMX_PORT=9988; /usr/local/kafka/bin/kafka-server-start.sh -daemon /opt/local/kafka/config/server.properties"done;;"stop")for i in $(cat $1)doecho " --------停止 $i Kafka-------"ssh $i "/usr/local/kafka/bin/kafka-server-stop.sh"done;;*)echo "Usage: $0 <hosts_file> <start|stop>"exit 1;;esac

使用方法:

启动Kafka

./script.sh hosts.txt start

其中 hosts.txt 包含你要启动Kafka的所有服务器名称或IP地址,每行一个。例如:

node01

node02

node03

停止Kafka

./script.sh hosts.txt stop

  1. Kafka命令行操作

5.1 创建主题

创建一个名为test的主题,分区数为1,副本数为1:

bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

5.2 查看主题列表

查看当前集群中的主题列表:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

5.3 发送消息

使用命令行工具向主题test发送消息:

bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092

然后你可以在控制台输入消息,按Enter发送。

5.4 消费消息

在另一个终端窗口中,使用命令行工具消费test主题的消息:bash

复制代码

bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

此命令将从主题的开始读取所有消息,并显示在控制台上。

5.5 查看主题详情

查看主题test的详细信息,包括分区数、副本数等:

bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092

大数据组件-ClickHouse【考纲新增】

1. ClickHouse单机安装

安装步骤

Step 1: 安装依赖

更新系统并安装必要的依赖:

sudo yum update -ysudo yum install -y epel-releasesudo yum install -y libicu libtool-ltdl openssl

Step 2: 安装ClickHouse

1. 安装 clickhouse-common-static

该包包含ClickHouse的核心文件和二进制:

sudo rpm -ivh clickhouse-common-static-21.7.3.14-2.x86_64.rpm

安装 clickhouse-common-static-dbg

这是用于调试的包,可以在排查问题时使用:

sudo rpm -ivh clickhouse-common-static-dbg-21.7.3.14-2.x86_64.rpm

安装 clickhouse-client

安装ClickHouse客户端,用于连接并操作ClickHouse数据库:

sudo rpm -ivh clickhouse-client-21.7.3.14-2.noarch.rpm

安装 clickhouse-server

安装ClickHouse服务器,这个包是ClickHouse的核心服务:

sudo rpm -ivh clickhouse-server-21.7.3.14-2.noarch.rpm

2. 启动ClickHouse服务:

   sudo systemctl start clickhouse-server

3. 设置ClickHouse服务在系统启动时自动启动:

   sudo systemctl enable clickhouse-server

Step 3: 验证安装

安装完成后,使用`clickhouse-client`命令连接到本地的ClickHouse实例,并执行简单的查询验证安装:

clickhouse-client

执行查询命令查看版本:

SELECT version();

返回ClickHouse的版本信息,表示安装成功。

2. 创建表、插入数据

2.1 创建数据库和表

在ClickHouse中,可以根据需要创建数据库和表。首先,创建一个数据库:

CREATE DATABASE test_db;

使用刚创建的数据库:

USE test_db;

创建一个简单的表,包含用户ID、姓名和年龄三个字段:

CREATE TABLE users (id UInt32,name String,age UInt8) ENGINE = MergeTree()ORDER BY id;

- `MergeTree` 是ClickHouse中常用的表引擎,支持高效的读写和索引。

- `ORDER BY` 指定了表的排序键,在插入数据时自动进行排序。

2.2 插入数据

向表中插入一些数据,可以使用SQL的`INSERT`语句:

INSERT INTO users (id, name, age) VALUES (1, 'Alice', 25), (2, 'Bob', 30), (3, 'Charlie', 35);

查看表中的数据:

SELECT * FROM users;

输出结果类似于:

id

name

age

1

Alice

25

2

Bob

30

3

Charlie

35

3. 查询、修改、删除

3.1 查询数据

ClickHouse支持标准的SQL查询。常见查询操作包括:

查询所有数据:

SELECT * FROM users;

根据条件查询:

查询年龄大于30的用户:

SELECT * FROM users WHERE age > 30;

查询特定字段:

查询用户的姓名和年龄:

SELECT name, age FROM users;

排序查询:

按年龄降序排列:

SELECT * FROM users ORDER BY age DESC;

3.2 修改数据

在ClickHouse中直接修改已有的数据比较特殊,ClickHouse本身不支持传统的`UPDATE`操作。

可以通过删除并重新插入的方式进行修改,或使用`ALTER`语句来调整表的结构。

例如,要修改ID为2的用户年龄为32,可以先删除该条数据,再插入新值:

删除数据:

ALTER TABLE users DELETE WHERE id = 2;

插入新数据:

INSERT INTO users (id, name, age) VALUES (2, 'Bob', 32);

查询更新后的数据:

SELECT * FROM users WHERE id = 2;

输出结果:

id

name

age

2

Bob

32

3.3 删除数据

删除单条数据:

删除ID为3的用户:

ALTER TABLE users DELETE WHERE id = 3;

删除表中的所有数据:

TRUNCATE TABLE users;

删除表:

如果你不再需要某个表,可以将其删除:

DROP TABLE users;

4. 常见操作总结

操作

示例命令

创建数据库

CREATE DATABASE test_db;

创建表

CREATE TABLE users (id UInt32, name String, age UInt8) ENGINE = MergeTree() ORDER BY id;

插入数据

INSERT INTO users (id, name, age) VALUES (1, 'Alice', 25);

查询所有数据

SELECT * FROM users;

条件查询

SELECT * FROM users WHERE age > 30;

修改数据

ALTER TABLE users DELETE WHERE id = 2;

然后 INSERT INTO users (id, name, age) VALUES (2, 'Bob', 32);

删除单条数据

ALTER TABLE users DELETE WHERE id = 3;

删除表

DROP TABLE users;

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com