学习笔记Flink(八) 您所在的位置:网站首页 flink测试报告 学习笔记Flink(八)

学习笔记Flink(八)

#学习笔记Flink(八)| 来源: 网络整理| 查看: 265

一、背景介绍

信用卡欺诈 信用卡欺诈是指故意使用伪造、作废的信用卡,冒用他人的信用卡骗取财物,或用本人信用卡进行恶意透支的行为。在当今数字时代,信用卡欺诈行为越来越被重视。 罪犯可以通过诈骗或者入侵安全级别较低系统来盗窃信用卡卡号。 用盗得的信用卡进行很小额度的消费进行测试。 如果测试消费成功,那么他们就会用这个信用卡进行大笔消费。

信用卡欺诈行为

交易3和交易4应该被标记为欺诈行为,因为交易3是一个100¥的小额交易,而紧随着的交易4是一个10000¥的大额交易。

另外,交易5、6和交易7就不属于欺诈交易了,因为在交易5这个500¥的小额交易之后,并没有跟随一个大额交易,而是一个金额适中的交易,这使得交易5到交易7不属于欺诈行为。

二、架构设计

架构设计

数据流设计

数据流落地实现

三、Kafka信用卡消费数据 3.1、Kafka Producer

模拟Kafka Producer定时生成消费数据

TransactionData.java:

package fraud_detection; public class TransactionData { private String user; private double money; public TransactionData(){} public TransactionData(String user,double money){ this.user=user; this.money=money; } @Override public String toString(){ return this.user + "," + this.money; } }

TransactionDataGenerator.java:

package fraud_detection; import java.util.Random; public class TransactionDataGenerator { public static final int USER_SIZE = 10; public static final float BIG_MONEY_PERCENT = 0.02f; static Random random = new Random(); public static TransactionData getData(){ return new TransactionData(generateUser() , generateMoney()) ; } private static String generateUser(){ return "user_"+random.nextInt(USER_SIZE); } private static float generateMoney(){ float i = random.nextFloat(); if( i > BIG_MONEY_PERCENT){ return random.nextFloat() * 1000; }else{ return i * 10000000; } } public static void main(String[] args){ TransactionData data = null; for(int i = 10000 ;i >0 ; i--){ data = getData(); System.out.println(data); } } }

TransactionDataProducer.java:

package fraud_detection; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.HashMap; import java.util.Map; public class TransactionDataProducer { public static void main(String[] args) throws InterruptedException { String topic = "fraud00"; Map kafkaProperties = new HashMap(); kafkaProperties.put("bootstrap.servers","node100:9092,node101:9092,node102:9092"); kafkaProperties.put("acks", "all"); kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer(kafkaProperties); int size = 30*1000/10; long interval = 10L; String data = ""; for (int i = 0; i def main(args: Array[String]): Unit = { val topic = "fraud00" val properties = new Properties() properties.setProperty("bootstrap.servers", "node100:9092") properties.setProperty("group.id", "test") val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env .addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)) stream.map(x => {val data = x.split(","); (data(0).trim, data(1).trim.toDouble)}) .keyBy(0) .mapWithState[(String, Double), Double]((in: (String, Double), state: Option[Double]) => state match { case None => (("", 0.0), Some(in._2)) case Some(previous) => if (in._2 > 10000.0 && previous x._2 > 0.0) .print() env.execute("Fraud Detection") } } 测试:

① 先创建fraud00 话题 在这里插入图片描述 将产生的数据存到/tmp目录下(了解) 在这里插入图片描述

② 运行FraudDetection: 在这里插入图片描述 ③ 运行TransactionDataProducer 在这里插入图片描述 结果: 在这里插入图片描述 在这里插入图片描述 在这里插入图片描述



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有