当前会有已抽取好的数据存放在 ODS 层  
 
 通过数据清洗, 把数据存放在 DWD 层  
 
 数据清洗的规范  
 
 crm_user_base_info_his_full  
 
 erp_u_memcard_reg_full  
 
 erp_u_sale_m_inc  
 
 erp_u_sale_pay_inc  
 
 erp_c_memcard_class_group_full  
 
 his_chronic_patient_info_new_full  
 
 从简单到复杂  
 
 his_chronic_patient_info_new_full  
 
 检测表  
 
 
 
 erp_code 门店 ID - 替换成具体的门店名称 关联门店表  
 
 spark  导入文件:  
 
from pyspark.sql import SparkSession
import os
# spark 的实例化
# master 运行模式 - local 本地模式 只消耗本机资源, yarn 集群模式 由
yarn 来做资源调度, standard 模式 内置的集群模式
# appName 名称
# config 运行配置 一般是设置内存的使用 执行器的内存
# enableHiveSupport 共享 Hive 的元数据库 启动后可以读写 hive 表
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
#spark 实现 sql 语句
sql = '''
with t as (
select * from
ods_shihaihong.his_chronic_patient_info_new_full
)
select
t.id,
t.member_id,
b.orgname as erp_code, -- 门店名称
t.extend,
t.detect_time,
t.bec_chr_mbr_date
from t
left join ods_shihaihong.erp_c_org_busi_full b on b.id =
t.erp_code
'''
df = spark.sql(sql)
df.show()
# 存入到 change 的 hive 表
# 创建 HDFS 路径
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/change/his_chronic_patient_info_new")
# 创建数据
os.system('''
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists change_shihaihong location
"/zhiyun/shihaihong/change";'
''')
# 保存到 hive 表里
# parquet 文件格式 类似 orc 因为 spark 会把 orc 表的版本存成高版本
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/change/his_chronic_p
atient_info_new"). \
saveAsTable("change_shihaihong.his_chronic_patient_info_new
")
print("清洗完成") 加入任务中:  
 
 
 
 执行一次: 
 
 成品表:
 成品表:  
 
 
 erp_u_sale_pay_inc  支付表 
 
 
 
 paytype 支付类型 码值 - 关联码值表做码值替换 替换成具体的含义  
 
 先查询一下表中的所有支付方式:  
 
 select distinct  paytype  from  ods_shihaihong.erp_u_sale_pay_inc ;  
 
 
 
 可以对  paytype  中的值进行翻译:  
 
 
 
 在  spark  中实现:  
 
from pyspark.sql import SparkSession
import os
# spark 的实例化# master 运行模式 - local 本地模式 只消耗本机资源, yarn 集群模式 由
yarn 来做资源调度, standard 模式 内置的集群模式
# appName 名称
# config 运行配置 一般是设置内存的使用 执行器的内存
# enableHiveSupport 共享 Hive 的元数据库 启动后可以读写 hive 表
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
#spark 实现 sql 语句
sql = '''
select
id,
saleno,
cardno,
netsum,
case paytype when 'CARD' then '刷卡支付'
when 'CASH' then '现金支付'
when 'PHONE' then '手机支付'
end as paytype,
bak1
from ods_shihaihong.erp_u_sale_pay_inc
'''
df = spark.sql(sql)
df.show()
# 存入到 change 的 hive 表
# 创建 HDFS 路径
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/change/erp_u_sale_pay_inc_new")
# 创建数据
os.system('''
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists change_shihaihong location
"/zhiyun/shihaihong/change";'
''')
# 保存到 hive 表里
# parquet 文件格式 类似 orc 因为 spark 会把 orc 表的版本存成高版本
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/change/erp_u_sale_pa
y_inc_new"). \
saveAsTable("change_shihaihong.erp_u_sale_pay_inc_new")
print("清洗完成") 任务调度:  
 
 
 
 将代码写入  GLUE IDE ,执行一次: 
 
 
 
 成品表:  
 
 
 
 erp_c_memcard_class_group_full  分组表 
 
 
 
 notes - 规则 - 拆分成两个字段, 大于某个值的字段, 小于某个值的字段  
 
 在 spark 中实现:  
 
from pyspark.sql import SparkSession
import os
# spark 的实例化
# master 运行模式 - local 本地模式 只消耗本机资源, yarn 集群模式 由
yarn 来做资源调度, standard 模式 内置的集群模式
# appName 名称
# config 运行配置 一般是设置内存的使用 执行器的内存
# enableHiveSupport 共享 Hive 的元数据库 启动后可以读写 hive 表
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
#spark 实现 sql 语句
sql = '''
select
createtime,
createuser,
groupid,
groupname,
left(notes,case instr(notes,'小') when 0 then length(notes)
else instr(notes,'小') end -1) as LessThen,
right(notes,instr(reverse(notes),'小')) as MoreThen,
stamp
from ods_shihaihong.erp_c_memcard_class_group_full;
'''
df = spark.sql(sql)
df.show()
# 存入到 change 的 hive 表
# 创建 HDFS 路径
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/change/erp_c_memcard_class_group_full_new")# 创建数据
os.system('''
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists change_shihaihong location
"/zhiyun/shihaihong/change";
'
''')
# 保存到 hive 表里
# parquet 文件格式 类似 orc 因为 spark 会把 orc 表的版本存成高版本
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/change/erp_c_memcard
_class_group_full_new"). \
saveAsTable("change_shihaihong.erp_c_memcard_class_group_fu
ll_new")
print("清洗完成") 任务调度:  
 
 
 
 GLUE IDE  中插入代码后执行一次: 
 
 
 
 结果表:  
 
 
 
 erp_u_sale_m_inc  订单表  
 
 busno 门店 ID - 替换成具体的门店名称 关联门店表  
 
 extno 编码号 - 空值处理 组成 o2o_trade_from + 20 + 订单号数字部分  
 
 不  
 
 为空则不处理  
 
 o2o_trade_from O2O 商城 - 码值处理 替换成码值表 销售 O2O 分类下的具体含  
 
 义  
 
 channel 渠道 - 码值处理 替换成码值表 销售渠道 分类下的具体含义  
 
 discount 折扣 - 空值处理 规则 precash / stdsum * 100 保留两位小数 不带  
 
 单位 Python:  
 
from pyspark.sql import SparkSession
import os
# spark 的实例化
# master 运行模式 - local 本地模式 只消耗本机资源, yarn 集群模式 由
yarn 来做资源调度, standard 模式 内置的集群模式
# appName 名称
# config 运行配置 一般是设置内存的使用 执行器的内存
# enableHiveSupport 共享 Hive 的元数据库 启动后可以读写 hive 表
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
#spark 实现 sql 语句
sql = '''
with c_code_value_full_GB00001 as (
select * from ods_shihaihong.c_code_value_full
where cat_name='销售渠道'
)
select
ods_shihaihong.erp_u_sale_m_inc.id,
saleno,
orgname,
posno,
case extno when "" then
o2o_trade_from||'20'||right(saleno,instr(reverse(saleno),'P')-
1)
else extno end as extno,
extsource,
o2o_trade_from,
cat_name,
starttime,
finaltime,
payee,nvl(cast(discounter as decimal(10,2)),round(cast(precash as
decimal(10,2))/cast(stdsum as decimal(10,2)) * 100,2)) as
discounter,
crediter,
returner,
warranter1,
warranter2,
stdsum,
netsum,
loss,
discount,
member,
precash,
ods_shihaihong.erp_u_sale_m_inc.stamp,
shiftid,
shiftdate,
yb_saleno
from ods_shihaihong.erp_u_sale_m_inc
left join ods_shihaihong.erp_c_org_busi_full
on ods_shihaihong.erp_u_sale_m_inc.busno =
ods_shihaihong.erp_c_org_busi_full.busno
left join c_code_value_full_GB00001
on
ods_shihaihong.erp_u_sale_m_inc.channel=c_code_value_full_GB00
001.val_name;
'''
df = spark.sql(sql)
df.show()
# 存入到 change 的 hive 表
# 创建 HDFS 路径
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/change/erp_u_sale_m_inc_new")
# 创建数据
os.system('''
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists change_shihaihong location
"/zhiyun/shihaihong/change";
'
''')# 保存到 hive 表里
# parquet 文件格式 类似 orc 因为 spark 会把 orc 表的版本存成高版本
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/change/erp_u_sale_m_
inc_new"). \
saveAsTable("change_shihaihong.erp_u_sale_m_inc_new")
print("清洗完成") 任务调度:  
 
 
 
 GLUE IDE  加入代码后执行一次: 
 
 
 
 erp_u_memcard_reg_full  会员表  
 
 busno  门店  ID -  替换成具体的门店名称 关联门店表  
 
 cardstatus  卡状态  -  码值处理 替换成码值表 会员卡状态 分类下的具体含  
 
 义  
 
 cardholder  持卡人  -  存在其他字符 去除所有非中文符号  
 
 cardaddress  地址  - XXX  省  XXX  市  XXX  特别行政区  XXX  市  ...  添加  3  个字  
 
 段  province( 省份 ) city( 城市 ) postcode( 邮编 )  
 
 提取该字段的信息 ,  填充到这  3  个字段里  
 
 湖南省桂芝市清浦任街  C  座  346094 =>  湖南省 桂芝市  
 
 346094  
 
 香港特别行政区南昌市朝阳阜新路  S  座  569483 =>  香港特别行  
 
 政区 南昌市  569483  
 
 西藏自治区惠州市江北张街  J  座  146096 =>  西藏自治区 惠州  
 
 市  146096  
 
 sex  性别  -  码值处理 替换成码值表 性别 分类下的具体含义  
 
 idcard  身份证  - 15  位  18  位 转换所有  15  位成  18  位 去学习和理解身份证  
 
 的转化规则 尤其是最后一位的算法  
 
 birthday  出 生 日 期  -  提 取 身 份 证 字 段 的 年 月 日 覆 盖 填 充 到 该 字 段  
 
 YYYY-MM-DD  
 
 scrm_userid  客户关联  ID - CRM  表的  ID  号 空值处理 生成规则  SCRM +  
 
 0019( 门店  ID 4  位左补  0) + 1692920001( 会员卡号 )  
 
 sickness_motion -  增加疾病关注字段 关联  erp_u_memcard_reg_c_full,  
 
 使用逗号拼接 , status=1  倒序 ,  如 全心炎 , 脑梗塞 ,  最多显示  5  个 
 
 python  实现:  
 
#!/bin/python3
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
import re
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '6g') \
.config('spark.executor.memory', '6g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
# 处理姓名
@F.udf()
def handle_cardholder(val):
if val:
chinese_chars = re.findall(r'[\u4e00-\u9fa5]+', val)
name = ''.join(chinese_chars)
return name
else:
return val
# 处理省份
@F.udf()
def handle_province(val):
pattern = re.compile(r'^(.+?(?:省|自治区|特别行政区|市))')
match = pattern.search(val)
if match:
return match.group(0)
else:
return ''
# 处理城市
@F.udf()
def handle_city(val):# 在这里提取城市信息
pattern = r'(?:省|特别行政区|自治区|市)(.*?)(市|县)'
match = re.search(pattern, val)
if match:
# 拼接城市名和“市”
city = match.group(1) + match.group(2)
return city
else:
return ''
# 处理邮编
@F.udf()
def handle_postcode(val):
# 在这里提取邮编信息
match=re.search(r'\d+$',val)
if match:
return match.group()
else:
return ''
# 处理身份证
@F.udf()
def handle_idcard(val):
if len(val) == 15:
head = val[:6]
bir = val[6:6+6]
tail = val[6+6:]
ss=f"{head}19{bir}{tail}"
#这里传入的字符串长度要等于 17 位 !!不然算不出
s=idcard_check_digit(ss)
return f"{ss}{s}"
else:
return val
# get 身份证的校验码
def idcard_check_digit(val):
if len(val) != 17 or not val.isdigit():
return ''
coefficients = [7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8,
4, 2]check_digit_map = {'0': 1, '1': 0, '2': 'X', '3': 9, '4': 8,
'5': 7, '6': 6, '7': 5, '8': 4, '9': 3, '10': 'X'}
sum_ = 0
for i, digit in enumerate(val):
sum_ += int(digit) * coefficients[i]
remainder = sum_ % 11
return check_digit_map[str(remainder)]
# 处理生日
@F.udf()
def handle_birthday(val):
if len(val) == 18:
y = val[6:10]
m = val[10:12]
d = val[12:14]
return f"{y}-{m}-{d}"
elif len(val) == 15:
y = val[6:8]
m = val[8:10]
d = val[10:12]
return f"19{y}-{m}-{d}"
else:
return ''
sql = '''
with t as (
select * from ods_shihaihong.erp_u_memcard_reg_full
),md as (
select memcardno,concat_ws(',', collect_list(sickness)) as ness
from (select * from ods_shihaihong.erp_u_memcard_reg_c_full order
by id desc) e
where status = '1' group by memcardno)
select
t.id,
t.memcardno,
bm.orgname as busno,
t.introducer,
t.cardtype,t.cardlevel,
t.cardpass,
b.var_desc as cardstatus,
t.saleamount,
t.realamount,
t.puramount,
t.integral,
t.integrala,
t.integralflag,
t.cardholder,
t.cardaddress,
"" as province, -- 省份
"" as city, -- 城市
"" as postcode, -- 邮编
b1.var_desc as sex,
t.tel,
t.handset,
t.fax,
t.createuser,
t.createtime,
t.tstatus,
t.notes,
t.stamp,
t.idcard,
t.birthday,
t.allowintegral,
t.apptype,
t.applytime,
t.invalidate,
t.lastdate,
t.bak1,
crm.user_id as scrm_userid,
md.ness
from t
left join ods_shihaihong.erp_c_org_busi_full bm on t.busno=bm.id
left join ods_shihaihong.c_code_value_full b on b.val_name =
t.cardstatus and b.cat_name='会员卡状态'
left join ods_shihaihong.c_code_value_full b1 on b1.val_name =
t.sex and b1.cat_name='性别'
left join ods_shihaihong.crm_user_base_info_his_full crm on
crm.id = t.id
left join md on md.memcardno = t.memcardno
'''df = spark.sql(sql)
#处理持卡人
df = df.withColumn("cardholder",
handle_cardholder("cardholder"))
#处理地址
df = df.withColumn("province", handle_province("cardaddress"))
df = df.withColumn("city", handle_city("cardaddress"))
df = df.withColumn("postcode", handle_postcode("cardaddress"))
#处理身份证
df = df.withColumn("idcard", handle_idcard("idcard"))
#处理生日
df = df.withColumn("birthday", handle_birthday("idcard"))
df.show()
# 创建 HDFS 路径
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/change/erp_u_memcard_reg_full_new")
# 创建数据
os.system('''
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists change_shihaihong location
"/zhiyun/shihaihong/change";
'
''')
# 保存到 hive 表里
# parquet 文件格式 类似 orc 因为 spark 会把 orc 表的版本存成高版本
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/change/erp_u_memcard
_reg_full_new"). \
saveAsTable("change_shihaihong.erp_u_memcard_reg_full_new")
print("清洗完成")
# 验证数据
# 计算原表的总记录数original_count_sql = "select count(1) from
ods_shihaihong.erp_u_memcard_reg_full"
original_count = spark.sql(original_count_sql).collect()[0][0]
print(f"原表总记录数: {original_count}")
# 计算新表的总记录数
new_count_sql = "select count(1) from
change_shihaihong.erp_u_memcard_reg_full_new"
new_count = spark.sql(new_count_sql).collect()[0][0]
print(f"新表总记录数: {new_count}") 任务调度:  
 
 
 
 执行一次: 
 
 
 
 crm_user_base_info_his_full  客户表  
 
 user_type  用户类型  -  码值处理 替换成码值表 会员类型 分类下的具体含义  
 
 source  用户来源  -  码值处理 替换成码值表 会员来源 分类下的具体含义  
 
 erp_code  门店  ID -  替换成具体的门店名称 关联门店表  
 
 sex  性别  -  码值处理 替换成码值表 性别 分类下的具体含义  
 
 education  学历  -  码值处理 替换成码值表 学历 分类下的具体含义  
 
 email  邮箱  -  格式验证 如果是非法格式 替换成  s60@zhiyun.pub  
 
 birthday  出生日期  -  关联会员卡表 覆盖到该字段  
 
 age  年龄  -  通过出生日期计算该客户的年龄  
 
 id_card_no  身份证  -  关联会员卡表  18  位身份证覆盖到该字段  
 
 address -  空值处理 关联会员卡表 写入格式为 省份 + 地区  
 
 last_subscribe_tim e  关注日期  -  时间戳格式 日期转换 转换格式为  
 
 YYYY-MM-DD HH:mm:ss  
 
 Python  
 
#!/bin/python3from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime
import os
import re
spark = SparkSession.builder \
.master("local[*]") \
.appName("app") \
.config('spark.driver.memory', '12g') \
.config('spark.executor.memory', '12g') \
.config('spark.sql.parquet.writeLegacyFormat', 'true') \
.config("hive.metastore.uris", "thrift://cdh01:9083") \
.enableHiveSupport() \
.getOrCreate()
# 处理邮箱
@F.udf()
def handle_email(val):
pattern = r'^[A-Za-z0-9]+@'
email = re.match(pattern, val)
if email:
return val
else:
return 's60@zhiyun.pub'
# 处理年龄
@F.udf()
def handle_age(val):
birth_date = datetime.strptime(val, '%Y-%m-%d')
# 获取今天的日期
today = datetime.now()
# 计算年龄
age = today.year - birth_date.year
# 如果今天的日期小于生日日期,则年龄减一
if (today.month, today.day) < (birth_date.month,
birth_date.day):
age -= 1
return int(age)# 处理时间戳
@F.udf()
def handle_last_subscribe_time(val):
# 将时间戳转换为 datetime 对象
dt_object = datetime.fromtimestamp(int(val))
# 将 datetime 对象格式化为指定的字符串格式
formatted_date = dt_object.strftime('%Y-%m-%d %H:%M:%S')
return formatted_date
sql = '''
with t as (
select * from ods_shihaihong.crm_user_base_info_his_full
)
select
t.id,
t.user_id,
b.var_desc as user_type,
b1.var_desc as source,
bm.orgname as erp_code,
t.active_time,
t.name,
b2.var_desc as sex,
b3.var_desc as education,
t.job,
t.email,
t.wechat,
t.webo,
vip.birthday as birthday,
t.age,
vip.idcard as id_card_no,
t.social_insurance_no,
concat(vip.province,vip.city) as address,
t.last_subscribe_time as te,
"" as last_subscribe_time
from t
left join ods_shihaihong.c_code_value_full b on b.val_name =
t.user_type and b.cat_name='会员类型'
left join ods_shihaihong.c_code_value_full b1 on b1.val_name =
t.source and b1.cat_name='会员来源'
left join ods_shihaihong.erp_c_org_busi_full bm on
t.erp_code=bm.idleft join ods_shihaihong.c_code_value_full b2 on b2.val_name =
t.sex and b2.cat_name='性别'
left join ods_shihaihong.c_code_value_full b3 on b3.val_name =
t.education and b3.cat_name='学历代码'
left join change_shihaihong.erp_u_memcard_reg_full_new vip on
vip.id = t.id
'''
df = spark.sql(sql)
#处理持卡人
df = df.withColumn("email", handle_email("email"))
#处理年龄
df = df.withColumn("age", handle_age("birthday"))
#处理时间戳
df = df.withColumn("last_subscribe_time",
handle_last_subscribe_time("te"))
df=df.drop("te")
df.show()
# 创建 HDFS 路径
os.system("hadoop fs -mkdir -p
/zhiyun/shihaihong/change/crm_user_base_info_his_new")
# 创建数据
os.system('''
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists change_shihaihong location
"/zhiyun/shihaihong/change";
'
''')
# 保存到 hive 表里
# parquet 文件格式 类似 orc 因为 spark 会把 orc 表的版本存成高版本
df.write.format("hive") \
.mode("overwrite") \
.option("fileFormat","parquet") \
.option("location","/zhiyun/shihaihong/change/crm_user_base
_info_his_new"). \
saveAsTable("change_shihaihong.crm_user_base_info_his_new")print("清洗完成")
# 验证数据
# 计算原表的总记录数
original_count_sql = "select count(1) from
ods_shihaihong.crm_user_base_info_his_full"
original_count = spark.sql(original_count_sql).collect()[0][0]
print(f"原表总记录数: {original_count}")
# 计算新表的总记录数
new_count_sql = "select count(1) from
change_shihaihong.crm_user_base_info_his_new"
new_count = spark.sql(new_count_sql).collect()[0][0]
print(f"新表总记录数: {new_count}") 任务调度: 
 
 
 
 
 
