import datetime
import json
import random
import timefrom kafka import KafkaProducer'''生产者demo向branch-event主题中循环写入10条json数据注意事项:要写入json数据需加上value_serializer参数,如下代码
'''
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),security_protocol='SASL_PLAINTEXT',sasl_mechanism='PLAIN',sasl_plain_username='kafkadmin',sasl_plain_password='xxxxxxxx',bootstrap_servers=['10.10.xx.xx:9092'] )def gen(i):""" 生成当前日期和时间戳 """time_stamp = str(round(time.time() * 1000) - 0)time_stamp_format = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]current_date = datetime.datetime.now().strftime('%Y.%m.%d')dst_ip = str(random.randint(1, 254)) + '.' + \str(random.randint(1, 254)) + '.' + \str(random.randint(1, 254)) + '.' + \str(random.randint(1, 254))src_ip = str(random.randint(1, 254)) + '.' + \str(random.randint(1, 254)) + '.' + \str(random.randint(1, 254)) + '.' + \str(random.randint(1, 254))model_name = f"zhang-{current_date}.cc2{i}"""" 原始日志 """s = {"dst_device_ip": "777.77.77.77","src_device_ip": "777.77.77.77", "src_device_dept": " ","eqpt_asset_type": "/IDS/Network/WAF","app_protocol": f"abc{i}","alarm_times": "9","start_time": f"{time_stamp_format}","src_account": model_name + "." + str(i),"answer_address": "光谷创新港","alarm_direction": "xxgcn","additional_name": "www.jw.com","http_url_externalurl": f"http://www.{model_name}.com","http_url_externalurl_domain": f"www.{model_name}.com","response_action": {"alertRestrainAccordingCols": "","sinkCols": "group_array(src_device_ip) as src_ip,group_array(src_device_uuid) as src_device_uuid,group_array(dst_device_ip) as dst_ip,group_array(dst_device_uuid) as dst_device_uuid,group_array(src_device_ip_country) as src_country,group_array(src_device_ip_province) as src_province,group_array(src_device_ip_city) as src_city,group_array(src_port) as src_port,group_array(dst_device_ip_country) as dst_country,group_array(dst_device_ip_province) as dst_province,group_array(dst_device_ip_city) as dst_city,group_array(dst_port) as dst_port,group_array(http_url_externalurl_domain) as dst_domain,group_array(http_url_externalurl) as dst_url,group_array(protocol) as agreement,uuid as uuids,first(A.start_time) as strategy_alert_first_time,first(A.start_time) as strategy_alert_last_time","sinkStaticInfo": "[{\"strategy_alert_name\":\"名单过滤-实时告警6.9\"},{\"strategy_att_ck\":\"侦察-搜集主机信息\"},{\"strategy_alert_desc\":\"\"},{\"strategy_risk_score\":5},{\"strategy_alert_category\":\"告警分类\"},{\"strategy_alert_summary\":\"\"}]","sinkType": "each"},"response_code": "standby2-jwwwwwwwwwwwww","response_data": f"TotoLink 多款路由器downloadFlilecgi命执行漏洞(CVE-2022-25075--CVE-2022-25083)_{i}","file_hash": "36426c221bfa23180805d78c8421b653","branch_code": "xxgcn","external_alarm_attack": "漏洞 恶意域名 XRed","external_alarm_attack_type": "bbaccb","attack_ip": "211.211.211.211","log_type": "uum","result_action": "用户静态密码错误","eqpt_vendor": f"idss_{i}","src_port": f"111{i}","dst_port": f"53","src_network_domain": None,"dst_network_domain": None,"object_type": "公共服务","src_device_vendor": "联通","dst_device_type": "服务器","src_person": "zhangxingheng","dst_person":"xuqq","dst_person_name":"zhangxingheng","dst_person_status":"在职","dst_person_ctpositionname":"测试","dst_person_types":"企业员工","dst_person_org_name":"技术部",}print('打印插入数据:',s)producer.send('zhang_orglog', s)if __name__ == '__main__':for i in range(2):gen(i)time.sleep(1) producer.close()