当前会有已抽取好的数据存放在 ODS 层
通过数据清洗, 把数据存放在 DWD 层
数据清洗的规范
crm_user_base_info_his_full
erp_u_memcard_reg_full
erp_u_sale_m_inc
erp_u_sale_pay_inc
erp_c_memcard_class_group_full
his_chronic_patient_info_new_full
从简单到复杂
his_chronic_patient_info_new_full
检测表
erp_code 门店 ID - 替换成具体的门店名称 关联门店表
spark 导入文件:
from pyspark.sql import SparkSession
import os
# spark 的实例化
# master 运行模式 - local 本地模式 只消耗本机资源, yarn 集群模式 由
yarn 来做资源调度, standard 模式 内置的集群模式
# appName 名称
# config 运行配置 一般是设置内存的使用 执行器的内存
# enableHiveSupport 共享 Hive 的元数据库 启动后可以读写 hive 表
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
#spark 实现 sql 语句
sql = '''
with t as (
select * from
ods_shihaihong.his_chronic_patient_info_new_full
)
select
t.id,
t.member_id,
b.orgname as erp_code, -- 门店名称
t.extend,
t.detect_time,
t.bec_chr_mbr_date
from t
left join ods_shihaihong.erp_c_org_busi_full b on b.id =
t.erp_code
'''
df = spark.sql(sql)
df.show()
# 存入到 change 的 hive 表
# 创建 HDFS 路径
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/change/his_chronic_patient_info_new")
# 创建数据
os.system('''
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists change_shihaihong location
"/zhiyun/shihaihong/change";'
''')
# 保存到 hive 表里
# parquet 文件格式 类似 orc 因为 spark 会把 orc 表的版本存成高版本
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/change/his_chronic_p
atient_info_new"). \
saveAsTable("change_shihaihong.his_chronic_patient_info_new
")
print("清洗完成")
加入任务中:
执行一次:
成品表:
erp_u_sale_pay_inc 支付表
paytype 支付类型 码值 - 关联码值表做码值替换 替换成具体的含义
先查询一下表中的所有支付方式:
select distinct paytype from ods_shihaihong.erp_u_sale_pay_inc ;
可以对 paytype 中的值进行翻译:
在 spark 中实现:
from pyspark.sql import SparkSession
import os
# spark 的实例化# master 运行模式 - local 本地模式 只消耗本机资源, yarn 集群模式 由
yarn 来做资源调度, standard 模式 内置的集群模式
# appName 名称
# config 运行配置 一般是设置内存的使用 执行器的内存
# enableHiveSupport 共享 Hive 的元数据库 启动后可以读写 hive 表
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
#spark 实现 sql 语句
sql = '''
select
id,
saleno,
cardno,
netsum,
case paytype when 'CARD' then '刷卡支付'
when 'CASH' then '现金支付'
when 'PHONE' then '手机支付'
end as paytype,
bak1
from ods_shihaihong.erp_u_sale_pay_inc
'''
df = spark.sql(sql)
df.show()
# 存入到 change 的 hive 表
# 创建 HDFS 路径
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/change/erp_u_sale_pay_inc_new")
# 创建数据
os.system('''
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists change_shihaihong location
"/zhiyun/shihaihong/change";'
''')
# 保存到 hive 表里
# parquet 文件格式 类似 orc 因为 spark 会把 orc 表的版本存成高版本
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/change/erp_u_sale_pa
y_inc_new"). \
saveAsTable("change_shihaihong.erp_u_sale_pay_inc_new")
print("清洗完成")
任务调度:
将代码写入 GLUE IDE ,执行一次:
成品表:
erp_c_memcard_class_group_full 分组表
notes - 规则 - 拆分成两个字段, 大于某个值的字段, 小于某个值的字段
在 spark 中实现:
from pyspark.sql import SparkSession
import os
# spark 的实例化
# master 运行模式 - local 本地模式 只消耗本机资源, yarn 集群模式 由
yarn 来做资源调度, standard 模式 内置的集群模式
# appName 名称
# config 运行配置 一般是设置内存的使用 执行器的内存
# enableHiveSupport 共享 Hive 的元数据库 启动后可以读写 hive 表
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
#spark 实现 sql 语句
sql = '''
select
createtime,
createuser,
groupid,
groupname,
left(notes,case instr(notes,'小') when 0 then length(notes)
else instr(notes,'小') end -1) as LessThen,
right(notes,instr(reverse(notes),'小')) as MoreThen,
stamp
from ods_shihaihong.erp_c_memcard_class_group_full;
'''
df = spark.sql(sql)
df.show()
# 存入到 change 的 hive 表
# 创建 HDFS 路径
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/change/erp_c_memcard_class_group_full_new")# 创建数据
os.system('''
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists change_shihaihong location
"/zhiyun/shihaihong/change";
'
''')
# 保存到 hive 表里
# parquet 文件格式 类似 orc 因为 spark 会把 orc 表的版本存成高版本
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/change/erp_c_memcard
_class_group_full_new"). \
saveAsTable("change_shihaihong.erp_c_memcard_class_group_fu
ll_new")
print("清洗完成")
任务调度:
GLUE IDE 中插入代码后执行一次:
结果表:
erp_u_sale_m_inc 订单表
busno 门店 ID - 替换成具体的门店名称 关联门店表
extno 编码号 - 空值处理 组成 o2o_trade_from + 20 + 订单号数字部分
不
为空则不处理
o2o_trade_from O2O 商城 - 码值处理 替换成码值表 销售 O2O 分类下的具体含
义
channel 渠道 - 码值处理 替换成码值表 销售渠道 分类下的具体含义
discount 折扣 - 空值处理 规则 precash / stdsum * 100 保留两位小数 不带
单位 Python:
from pyspark.sql import SparkSession
import os
# spark 的实例化
# master 运行模式 - local 本地模式 只消耗本机资源, yarn 集群模式 由
yarn 来做资源调度, standard 模式 内置的集群模式
# appName 名称
# config 运行配置 一般是设置内存的使用 执行器的内存
# enableHiveSupport 共享 Hive 的元数据库 启动后可以读写 hive 表
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
#spark 实现 sql 语句
sql = '''
with c_code_value_full_GB00001 as (
select * from ods_shihaihong.c_code_value_full
where cat_name='销售渠道'
)
select
ods_shihaihong.erp_u_sale_m_inc.id,
saleno,
orgname,
posno,
case extno when "" then
o2o_trade_from||'20'||right(saleno,instr(reverse(saleno),'P')-
1)
else extno end as extno,
extsource,
o2o_trade_from,
cat_name,
starttime,
finaltime,
payee,nvl(cast(discounter as decimal(10,2)),round(cast(precash as
decimal(10,2))/cast(stdsum as decimal(10,2)) * 100,2)) as
discounter,
crediter,
returner,
warranter1,
warranter2,
stdsum,
netsum,
loss,
discount,
member,
precash,
ods_shihaihong.erp_u_sale_m_inc.stamp,
shiftid,
shiftdate,
yb_saleno
from ods_shihaihong.erp_u_sale_m_inc
left join ods_shihaihong.erp_c_org_busi_full
on ods_shihaihong.erp_u_sale_m_inc.busno =
ods_shihaihong.erp_c_org_busi_full.busno
left join c_code_value_full_GB00001
on
ods_shihaihong.erp_u_sale_m_inc.channel=c_code_value_full_GB00
001.val_name;
'''
df = spark.sql(sql)
df.show()
# 存入到 change 的 hive 表
# 创建 HDFS 路径
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/change/erp_u_sale_m_inc_new")
# 创建数据
os.system('''
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists change_shihaihong location
"/zhiyun/shihaihong/change";
'
''')# 保存到 hive 表里
# parquet 文件格式 类似 orc 因为 spark 会把 orc 表的版本存成高版本
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/change/erp_u_sale_m_
inc_new"). \
saveAsTable("change_shihaihong.erp_u_sale_m_inc_new")
print("清洗完成")
任务调度:
GLUE IDE 加入代码后执行一次:
erp_u_memcard_reg_full 会员表
busno 门店 ID - 替换成具体的门店名称 关联门店表
cardstatus 卡状态 - 码值处理 替换成码值表 会员卡状态 分类下的具体含
义
cardholder 持卡人 - 存在其他字符 去除所有非中文符号
cardaddress 地址 - XXX 省 XXX 市 XXX 特别行政区 XXX 市 ... 添加 3 个字
段 province( 省份 ) city( 城市 ) postcode( 邮编 )
提取该字段的信息 , 填充到这 3 个字段里
湖南省桂芝市清浦任街 C 座 346094 => 湖南省 桂芝市
346094
香港特别行政区南昌市朝阳阜新路 S 座 569483 => 香港特别行
政区 南昌市 569483
西藏自治区惠州市江北张街 J 座 146096 => 西藏自治区 惠州
市 146096
sex 性别 - 码值处理 替换成码值表 性别 分类下的具体含义
idcard 身份证 - 15 位 18 位 转换所有 15 位成 18 位 去学习和理解身份证
的转化规则 尤其是最后一位的算法
birthday 出 生 日 期 - 提 取 身 份 证 字 段 的 年 月 日 覆 盖 填 充 到 该 字 段
YYYY-MM-DD
scrm_userid 客户关联 ID - CRM 表的 ID 号 空值处理 生成规则 SCRM +
0019( 门店 ID 4 位左补 0) + 1692920001( 会员卡号 )
sickness_motion - 增加疾病关注字段 关联 erp_u_memcard_reg_c_full,
使用逗号拼接 , status=1 倒序 , 如 全心炎 , 脑梗塞 , 最多显示 5 个
python 实现:
#!/bin/python3
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
import re
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
# 处理姓名
@F.udf()
def handle_cardholder(val):
if val:
chinese_chars = re.findall(r'[\u4e00-\u9fa5]+', val)
name = ''.join(chinese_chars)
return name
else:
return val
# 处理省份
@F.udf()
def handle_province(val):
pattern = re.compile(r'^(.+?(?:省|自治区|特别行政区|市))')
match = pattern.search(val)
if match:
return match.group(0)
else:
return ''
# 处理城市
@F.udf()
def handle_city(val):# 在这里提取城市信息
pattern = r'(?:省|特别行政区|自治区|市)(.*?)(市|县)'
match = re.search(pattern, val)
if match:
# 拼接城市名和“市”
city = match.group(1) + match.group(2)
return city
else:
return ''
# 处理邮编
@F.udf()
def handle_postcode(val):
# 在这里提取邮编信息
match=re.search(r'\d+$',val)
if match:
return match.group()
else:
return ''
# 处理身份证
@F.udf()
def handle_idcard(val):
if len(val) == 15:
head = val[:6]
bir = val[6:6+6]
tail = val[6+6:]
ss=f"{head}19{bir}{tail}"
#这里传入的字符串长度要等于 17 位 !!不然算不出
s=idcard_check_digit(ss)
return f"{ss}{s}"
else:
return val
# get 身份证的校验码
def idcard_check_digit(val):
if len(val) != 17 or not val.isdigit():
return ''
coefficients = [7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8,
4, 2]check_digit_map = {'0': 1, '1': 0, '2': 'X', '3': 9, '4': 8,
'5': 7, '6': 6, '7': 5, '8': 4, '9': 3, '10': 'X'}
sum_ = 0
for i, digit in enumerate(val):
sum_ += int(digit) * coefficients[i]
remainder = sum_ % 11
return check_digit_map[str(remainder)]
# 处理生日
@F.udf()
def handle_birthday(val):
if len(val) == 18:
y = val[6:10]
m = val[10:12]
d = val[12:14]
return f"{y}-{m}-{d}"
elif len(val) == 15:
y = val[6:8]
m = val[8:10]
d = val[10:12]
return f"19{y}-{m}-{d}"
else:
return ''
sql = '''
with t as (
select * from ods_shihaihong.erp_u_memcard_reg_full
),md as (
select memcardno,concat_ws(',', collect_list(sickness)) as ness
from (select * from ods_shihaihong.erp_u_memcard_reg_c_full order
by id desc) e
where status = '1' group by memcardno)
select
t.id,
t.memcardno,
bm.orgname as busno,
t.introducer,
t.cardtype,t.cardlevel,
t.cardpass,
b.var_desc as cardstatus,
t.saleamount,
t.realamount,
t.puramount,
t.integral,
t.integrala,
t.integralflag,
t.cardholder,
t.cardaddress,
"" as province, -- 省份
"" as city, -- 城市
"" as postcode, -- 邮编
b1.var_desc as sex,
t.tel,
t.handset,
t.fax,
t.createuser,
t.createtime,
t.tstatus,
t.notes,
t.stamp,
t.idcard,
t.birthday,
t.allowintegral,
t.apptype,
t.applytime,
t.invalidate,
t.lastdate,
t.bak1,
crm.user_id as scrm_userid,
md.ness
from t
left join ods_shihaihong.erp_c_org_busi_full bm on t.busno=bm.id
left join ods_shihaihong.c_code_value_full b on b.val_name =
t.cardstatus and b.cat_name='会员卡状态'
left join ods_shihaihong.c_code_value_full b1 on b1.val_name =
t.sex and b1.cat_name='性别'
left join ods_shihaihong.crm_user_base_info_his_full crm on
crm.id = t.id
left join md on md.memcardno = t.memcardno
'''df = spark.sql(sql)
#处理持卡人
df = df.withColumn("cardholder",
handle_cardholder("cardholder"))
#处理地址
df = df.withColumn("province", handle_province("cardaddress"))
df = df.withColumn("city", handle_city("cardaddress"))
df = df.withColumn("postcode", handle_postcode("cardaddress"))
#处理身份证
df = df.withColumn("idcard", handle_idcard("idcard"))
#处理生日
df = df.withColumn("birthday", handle_birthday("idcard"))
df.show()
# 创建 HDFS 路径
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/change/erp_u_memcard_reg_full_new")
# 创建数据
os.system('''
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists change_shihaihong location
"/zhiyun/shihaihong/change";
'
''')
# 保存到 hive 表里
# parquet 文件格式 类似 orc 因为 spark 会把 orc 表的版本存成高版本
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/change/erp_u_memcard
_reg_full_new"). \
saveAsTable("change_shihaihong.erp_u_memcard_reg_full_new")
print("清洗完成")
# 验证数据
# 计算原表的总记录数original_count_sql = "select count(1) from
ods_shihaihong.erp_u_memcard_reg_full"
original_count = spark.sql(original_count_sql).collect()[0][0]
print(f"原表总记录数: {original_count}")
# 计算新表的总记录数
new_count_sql = "select count(1) from
change_shihaihong.erp_u_memcard_reg_full_new"
new_count = spark.sql(new_count_sql).collect()[0][0]
print(f"新表总记录数: {new_count}")
任务调度:
执行一次:
crm_user_base_info_his_full 客户表
user_type 用户类型 - 码值处理 替换成码值表 会员类型 分类下的具体含义
source 用户来源 - 码值处理 替换成码值表 会员来源 分类下的具体含义
erp_code 门店 ID - 替换成具体的门店名称 关联门店表
sex 性别 - 码值处理 替换成码值表 性别 分类下的具体含义
education 学历 - 码值处理 替换成码值表 学历 分类下的具体含义
email 邮箱 - 格式验证 如果是非法格式 替换成 s60@zhiyun.pub
birthday 出生日期 - 关联会员卡表 覆盖到该字段
age 年龄 - 通过出生日期计算该客户的年龄
id_card_no 身份证 - 关联会员卡表 18 位身份证覆盖到该字段
address - 空值处理 关联会员卡表 写入格式为 省份 + 地区
last_subscribe_tim e 关注日期 - 时间戳格式 日期转换 转换格式为
YYYY-MM-DD HH:mm:ss
Python
#!/bin/python3from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime
import os
import re
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '12g') \
.config('spark.executor.memory', '12g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
# 处理邮箱
@F.udf()
def handle_email(val):
pattern = r'^[A-Za-z0-9]+@'
email = re.match(pattern, val)
if email:
return val
else:
return 's60@zhiyun.pub'
# 处理年龄
@F.udf()
def handle_age(val):
birth_date = datetime.strptime(val, '%Y-%m-%d')
# 获取今天的日期
today = datetime.now()
# 计算年龄
age = today.year - birth_date.year
# 如果今天的日期小于生日日期,则年龄减一
if (today.month, today.day) < (birth_date.month,
birth_date.day):
age -= 1
return int(age)# 处理时间戳
@F.udf()
def handle_last_subscribe_time(val):
# 将时间戳转换为 datetime 对象
dt_object = datetime.fromtimestamp(int(val))
# 将 datetime 对象格式化为指定的字符串格式
formatted_date = dt_object.strftime('%Y-%m-%d %H:%M:%S')
return formatted_date
sql = '''
with t as (
select * from ods_shihaihong.crm_user_base_info_his_full
)
select
t.id,
t.user_id,
b.var_desc as user_type,
b1.var_desc as source,
bm.orgname as erp_code,
t.active_time,
t.name,
b2.var_desc as sex,
b3.var_desc as education,
t.job,
t.email,
t.wechat,
t.webo,
vip.birthday as birthday,
t.age,
vip.idcard as id_card_no,
t.social_insurance_no,
concat(vip.province,vip.city) as address,
t.last_subscribe_time as te,
"" as last_subscribe_time
from t
left join ods_shihaihong.c_code_value_full b on b.val_name =
t.user_type and b.cat_name='会员类型'
left join ods_shihaihong.c_code_value_full b1 on b1.val_name =
t.source and b1.cat_name='会员来源'
left join ods_shihaihong.erp_c_org_busi_full bm on
t.erp_code=bm.idleft join ods_shihaihong.c_code_value_full b2 on b2.val_name =
t.sex and b2.cat_name='性别'
left join ods_shihaihong.c_code_value_full b3 on b3.val_name =
t.education and b3.cat_name='学历代码'
left join change_shihaihong.erp_u_memcard_reg_full_new vip on
vip.id = t.id
'''
df = spark.sql(sql)
#处理持卡人
df = df.withColumn("email", handle_email("email"))
#处理年龄
df = df.withColumn("age", handle_age("birthday"))
#处理时间戳
df = df.withColumn("last_subscribe_time",
handle_last_subscribe_time("te"))
df=df.drop("te")
df.show()
# 创建 HDFS 路径
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/change/crm_user_base_info_his_new")
# 创建数据
os.system('''
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists change_shihaihong location
"/zhiyun/shihaihong/change";
'
''')
# 保存到 hive 表里
# parquet 文件格式 类似 orc 因为 spark 会把 orc 表的版本存成高版本
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/change/crm_user_base
_info_his_new"). \
saveAsTable("change_shihaihong.crm_user_base_info_his_new")print("清洗完成")
# 验证数据
# 计算原表的总记录数
original_count_sql = "select count(1) from
ods_shihaihong.crm_user_base_info_his_full"
original_count = spark.sql(original_count_sql).collect()[0][0]
print(f"原表总记录数: {original_count}")
# 计算新表的总记录数
new_count_sql = "select count(1) from
change_shihaihong.crm_user_base_info_his_new"
new_count = spark.sql(new_count_sql).collect()[0][0]
print(f"新表总记录数: {new_count}")
任务调度: