if '开始分级日志' in message: #self.process_id_to_bus_seq.clear() #self.gapslist.clear() # 记录加到缓存 #self.gapslist.add(message) date_str = datetime.now().strftime("%Y%m%d") index_name = 'flink-log-clpf-gaps-' + str(date_str) id='0' log_event = LogEvent(id, source, fileTag, fileName, serviceCode, appName, timestamp, offset, message, index_name) yield log_event.to_dict() print('aaaaaaaaaaaaaaaaaaaaaa') return print('bbbbbbbbbbbbbbbbbbbbbb') [root@kafka1 pyflink]# python test.py aaaaaaaaaaaaaaaaaaaaaa {'id': '0', 'source': '10.4.146.9', 'fileTag': 'gaps', 'fileName': '/opt/test/f3s_20230633_104.log', 'serviceCode': '00000', 'appName': 'clpf', 'timestamp': '1687419610160512025', 'offset': '0', 'message': '2023-06-20 20:04:03.472 --------------开始分级日志 PID:11493 PROC[MPP_SDS_F3S] DATE:20230620 TIME:200403---------------', 'index_name': 'flink-log-clpf-gaps-20230622'}
|