自从上了flink后,其实已经好几年没有编写SparkStreaming作业了。但是还有一个机器学习的作业是通过sparkml+sparkstreaming的。这个不方便迁移到flink上,所以一直保留着。
再有就是,我们的spark作业都是运行在k8s上的。并没有工具或者平台进行sparkstreaming作业的监控和维护。因为作业较少,以后也不会再新增类似作业了,所以我就编写几个shell脚本+企业微信机器人+Linux的crontab来时间sparkstreaming作业的状态监控告警,日志收集和自动重启。
1.启动脚本
#!/bin/sh
#source ~/.bashrccd $(cd "`dirname "$0"`";pwd)
#这是作业提交spark-on-k8s的yaml文件无后缀名字
yamlname1=ParkPrediction#名称为1-63个字符,可包含数字、小写英文字以及短划线(-)、不能以短线(-)开头
appname=${yamlname1,,}date=$(date +"%Y-%m-%d" -d "-0 day")yamlname=${yamlname1}-${date}
appname=${appname}-${date}
#指定日志路径
logdir=/home/hadoop/pdEnv/spark_k8s/ml/log/${appname}
[ ! -d $logdir ] && mkdir -p $logdir
#定义日志输出方法
log2file() {running_pod=$(kubectl get pod -n bigdata | grep ${appname} | grep "Running\|Error" 2>/dev/null| awk '{print $1}')for podName in ${running_pod[@]}dologcommand=$(ps -aux | grep kubectl | grep logs |grep $podName)logpath=${logdir}/${podName}.logif [[ -z "$logcommand" ]]; thenecho '输出日志文件:'${logpath}nohup kubectl logs -f --tail=100 ${podName} -n bigdata >> ${logpath} 2>&1 &fidone
}#定义执行文件
[ ! -d executor ] && mkdir executor
cp ${yamlname1}.yaml executor/${yamlname}-exec.yaml
sed -i "s/\${appname}/${appname}/g" executor/${yamlname}-exec.yaml#删除已有作业
kubectl delete -f executor/${yamlname}-exec.yaml --wait#提交执行文件
sleep 20s
echo "执行 executor/${yamlname}-exec.yaml"
kubectl apply -f executor/${yamlname}-exec.yaml
sleep 5s#监听状态获取日志
while [ 1 ]
dosleep 5s#echo "获取 driver pod状态"status=`kubectl get pod -n bigdata|grep ${appname}-driver|awk '{print $3}'`if [[ ${status} == "Error" ]]; thenlog2fileexit 255elif [[ ${status} == "" ]]; thensleep 120sstatus=`kubectl get pod -n bigdata|grep ${appname}-driver|awk '{print $3}'`if [[ ${status} == "" ]]; thenecho "pod 异常,请检查yaml文件"exit 255elif [[ ${status} == "Error" ]]; thenkubectl logs --tail=100 ${appname}-driver -n bigdataexit 255fielif [[ ${status} == "Running" ]];thenlog2filecontinueelif [[ ${status} == "Completed" ]]; thenexit 0elsecontinuefisleep 5s
done
执行命令为:
nohup sh ParkPrediction.sh > ParkPrediction.log 2>&1 &
执行效果为:ParkPrediction.log里会记录driver的日志路径和executor日志的路径。方便排查问题查看日志
2.状态监控脚本+自动重启
#!/bin/bash
#预定义需要状态监听的spark-on-k8s作业名称
apps=("parkareaprediction" "parkprediction")running_app=$(kubectl get sparkapplication -n bigdata 2>/dev/null \| grep -E "parkareaprediction|parkprediction" \| grep "RUNNING" \| awk '{print $1}' \| sed -E 's/-[0-9]{4}-[0-9]{2}-[0-9]{2}//')echo running_app=$running_appfor app in ${apps[@]}doif [[ "${running_app[@]}" =~ "${app}" ]]; thenecho $app " is running"elseecho $app " is not running"curl 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=this-is-your-robot-key' \-H 'Content-Type: application/json' \-d '{"msgtype": "text","text": {"mentioned_mobile_list":["your-phone-number"],"content": "华为云生产环境sparkStreaming实时任务:'$app' is not running,即将重启"}}'echocd /home/hadoop/pdEnv/spark_k8s/mlif [ "$app" == "parkprediction" ]; thenecho "ParkPrediction 作业日志路径如下:"tail -n 10 ParkPrediction.logecho "-----------开始重启 ParkPrediction---------------"nohup sh ParkPrediction.sh > ParkPrediction.log 2>&1 &elif [ "$app" == "parkareaprediction" ]; thenecho "ParkAreaPrediction 作业日志路径如下:"tail -n 10 ParkAreaPrediction.logecho "-----------开始重启 ParkAreaPrediction---------------"nohup sh ParkAreaPrediction.sh > ParkAreaPrediction.log 2>&1 &elseecho "未知作业名:$app"fifidone
然后配置crontab定时执行状态监控脚本
#每20分钟执行检查一次spark streaming作业运行状态
*/20 * * * * sh /home/hadoop/pdEnv/spark_k8s/ml/StreamMonitor.sh >> /home/hadoop/pdEnv/spark_k8s/ml/StreamMonitor.log
效果如下: