您的位置:首页 > 文旅 > 美景 > 网络架构师主要做什么_展架立式落地式_北京seo网络优化师_网店运营流程步骤

网络架构师主要做什么_展架立式落地式_北京seo网络优化师_网店运营流程步骤

2025/2/26 7:18:02 来源:https://blog.csdn.net/weixin_42960808/article/details/145856903  浏览:    关键词:网络架构师主要做什么_展架立式落地式_北京seo网络优化师_网店运营流程步骤
网络架构师主要做什么_展架立式落地式_北京seo网络优化师_网店运营流程步骤

自从上了flink后,其实已经好几年没有编写SparkStreaming作业了。但是还有一个机器学习的作业是通过sparkml+sparkstreaming的。这个不方便迁移到flink上,所以一直保留着。
再有就是,我们的spark作业都是运行在k8s上的。并没有工具或者平台进行sparkstreaming作业的监控和维护。因为作业较少,以后也不会再新增类似作业了,所以我就编写几个shell脚本+企业微信机器人+Linux的crontab来时间sparkstreaming作业的状态监控告警,日志收集和自动重启。

1.启动脚本

#!/bin/sh
#source ~/.bashrccd $(cd "`dirname "$0"`";pwd)
#这是作业提交spark-on-k8s的yaml文件无后缀名字
yamlname1=ParkPrediction#名称为1-63个字符,可包含数字、小写英文字以及短划线(-)、不能以短线(-)开头
appname=${yamlname1,,}date=$(date  +"%Y-%m-%d" -d "-0 day")yamlname=${yamlname1}-${date}
appname=${appname}-${date}
#指定日志路径
logdir=/home/hadoop/pdEnv/spark_k8s/ml/log/${appname}
[ ! -d $logdir  ] && mkdir -p $logdir
#定义日志输出方法
log2file() {running_pod=$(kubectl get pod -n bigdata | grep ${appname} | grep "Running\|Error"  2>/dev/null| awk '{print $1}')for podName in ${running_pod[@]}dologcommand=$(ps -aux | grep kubectl | grep logs |grep $podName)logpath=${logdir}/${podName}.logif [[ -z "$logcommand" ]]; thenecho '输出日志文件:'${logpath}nohup kubectl logs -f  --tail=100 ${podName} -n bigdata >> ${logpath} 2>&1 &fidone
}#定义执行文件
[ ! -d executor  ] && mkdir executor
cp ${yamlname1}.yaml executor/${yamlname}-exec.yaml
sed -i "s/\${appname}/${appname}/g" executor/${yamlname}-exec.yaml#删除已有作业
kubectl delete -f executor/${yamlname}-exec.yaml --wait#提交执行文件
sleep 20s
echo "执行 executor/${yamlname}-exec.yaml"
kubectl apply -f executor/${yamlname}-exec.yaml
sleep 5s#监听状态获取日志
while [ 1 ]
dosleep 5s#echo "获取 driver pod状态"status=`kubectl get pod  -n bigdata|grep ${appname}-driver|awk '{print $3}'`if [[ ${status} == "Error" ]]; thenlog2fileexit 255elif [[ ${status} == "" ]]; thensleep 120sstatus=`kubectl get pod  -n bigdata|grep ${appname}-driver|awk '{print $3}'`if [[ ${status} == "" ]]; thenecho "pod 异常,请检查yaml文件"exit 255elif [[ ${status} == "Error" ]]; thenkubectl logs --tail=100 ${appname}-driver -n bigdataexit 255fielif [[ ${status} == "Running" ]];thenlog2filecontinueelif [[ ${status} == "Completed" ]]; thenexit 0elsecontinuefisleep 5s
done

执行命令为:

nohup sh ParkPrediction.sh > ParkPrediction.log 2>&1 &

执行效果为:ParkPrediction.log里会记录driver的日志路径和executor日志的路径。方便排查问题查看日志
在这里插入图片描述
在这里插入图片描述

2.状态监控脚本+自动重启

#!/bin/bash
#预定义需要状态监听的spark-on-k8s作业名称
apps=("parkareaprediction" "parkprediction")running_app=$(kubectl get sparkapplication -n bigdata 2>/dev/null \| grep -E "parkareaprediction|parkprediction" \| grep "RUNNING" \| awk '{print $1}' \| sed -E 's/-[0-9]{4}-[0-9]{2}-[0-9]{2}//')echo running_app=$running_appfor app in ${apps[@]}doif [[ "${running_app[@]}" =~ "${app}" ]]; thenecho $app " is running"elseecho $app " is not running"curl 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=this-is-your-robot-key' \-H 'Content-Type: application/json' \-d '{"msgtype": "text","text": {"mentioned_mobile_list":["your-phone-number"],"content": "华为云生产环境sparkStreaming实时任务:'$app' is not running,即将重启"}}'echocd /home/hadoop/pdEnv/spark_k8s/mlif [ "$app" == "parkprediction" ]; thenecho "ParkPrediction 作业日志路径如下:"tail -n 10 ParkPrediction.logecho "-----------开始重启 ParkPrediction---------------"nohup sh ParkPrediction.sh > ParkPrediction.log 2>&1 &elif [ "$app" == "parkareaprediction" ]; thenecho "ParkAreaPrediction 作业日志路径如下:"tail -n 10 ParkAreaPrediction.logecho "-----------开始重启 ParkAreaPrediction---------------"nohup sh ParkAreaPrediction.sh > ParkAreaPrediction.log 2>&1 &elseecho "未知作业名:$app"fifidone

然后配置crontab定时执行状态监控脚本

#每20分钟执行检查一次spark streaming作业运行状态
*/20 * * * * sh /home/hadoop/pdEnv/spark_k8s/ml/StreamMonitor.sh >>  /home/hadoop/pdEnv/spark_k8s/ml/StreamMonitor.log

效果如下:
在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com