Flink sink doris案例 您所在的位置:网站首页 flink官网sink Flink sink doris案例

Flink sink doris案例

2023-12-30 01:58| 来源: 网络整理| 查看: 265

添加 flink-doris-connector 和必要的 Flink Maven 依赖

此处参考官网的配置 Flink 1.13.* 及以前的版本

org.apache.flink flink-java ${flink.version} provided org.apache.flink flink-streaming-java_${scala.version} ${flink.version} provided org.apache.flink flink-clients_${scala.version} ${flink.version} provided org.apache.flink flink-table-common ${flink.version} provided org.apache.flink flink-table-api-java-bridge_${scala.version} ${flink.version} provided org.apache.flink flink-table-planner-blink_${scala.version} ${flink.version} provided org.apache.doris flink-doris-connector-1.13_2.12 1.0.3

Flink 1.14.* 版本

org.apache.flink flink-java ${flink.version} provided org.apache.flink flink-streaming-java_${scala.version} ${flink.version} provided org.apache.flink flink-clients_${scala.version} ${flink.version} provided org.apache.flink flink-table-planner_${scala.version} ${flink.version} provided org.apache.doris flink-doris-connector-1.14_2.12 1.0.3

案例是采用1.14版本的,就在今天Flink出了1.15 ,真快 我是紧赶慢赶啊

建表 CREATE TABLE dbname.`worker` ( `startTime` datetime NOT NULL , `id` int NOT NULL, `name` varchar(255) DEFAULT NULL, `age` int DEFAULT NULL, `city` varchar(255) NOT NULL, `salary` int NOT NULL )ENGINE=olap DUPLICATE KEY(startTime,id,name) PARTITION BY RANGE(startTime)() distributed BY HASH(name) PROPERTIES ( ....... );

记得要把分区完善,如果是空分区,会报错,无法导入数据的

模拟数据源

因为用Flink基本都是流式数据,又不想再写个kafka,所以就自己早了个数据源

class MyDataSource extends SourceFunction[String] { var runnning: Boolean = true override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = { val random: Random = new Random() var id: Int = 0 val nameList: util.ArrayList[String] = new util.ArrayList[String]() nameList.addAll(util.Arrays.asList("aa", "bb", "cc", "dd")) val cityList: util.ArrayList[String] = new util.ArrayList[String]() cityList.addAll(util.Arrays.asList("苏州", "无锡", "常州", "南京")) var age: Int = 0 var salary: Int = 0 var r:Int = 0 var name:String = null var city :String = null while (runnning) { id = id + 1 r = random.nextInt(10)%nameList.size() age = age+random.nextInt(20) salary = salary+random.nextInt(5000)+10000 name = nameList.get(r) city = cityList.get(r) // val str: String = JSON.toJSONString(new worker(id, name, age, city, salary),JSON.DEFAULT_GENERATE_FEATURE) val str:String = "{\"startTime\":\"2022-05-06\","+"\"id\":"+id+",\"name\":\""+name+"\",\"age\":"+age+",\"city\":\""+city+"\",\"salary\":"+salary+"}" sourceContext.collect(str) Thread.sleep(1000L) } } override def cancel(): Unit = ??? } sink到Doris public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties pro = new Properties(); pro.setProperty("format", "json"); pro.setProperty("strip_outer_array", "true"); DataStreamSource stream = env.addSource(new MyDataSource()); stream.print(); stream.addSink( DorisSink.sink( DorisReadOptions.builder().build(), DorisExecutionOptions.builder() .setBatchSize(3) .setBatchIntervalMs(1L) .setMaxRetries(3) .setStreamLoadProp(pro).build(), DorisOptions.builder() .setFenodes("xxx.xxx.xxx.xxx.xxx:8030") .setTableIdentifier("dbname.worker") .setUsername("username") .setPassword("password").build() )); try { env.execute("Flink2Doris"); } catch (Exception e) { e.printStackTrace(); } } 封装一下

考虑到使用场景,我就封装了一下,用起来能方便一些

public class MySinkF { public SinkFunction MySinkDoris(String tablename){ Properties pro = new Properties(); pro.setProperty("format", "json"); pro.setProperty("strip_outer_array", "true"); SinkFunction sink = DorisSink.sink( DorisReadOptions.builder().build(), DorisExecutionOptions.builder() .setBatchSize(3) .setBatchIntervalMs(1L) .setMaxRetries(3) .setStreamLoadProp(pro).build(), DorisOptions.builder() .setFenodes("xxx.xxx.xxx.xxx.xxx:8030") .setTableIdentifier("dbname."+tablename) .setUsername("username") .setPassword("password").build() ); return sink; } }

同理,fenodes username password 不常改变的,可以读配置文件 原代码就可以简单一些了

stream.addSink(new MySinkF().MySinkDoris("worker")); 小升级一下

因为我的业务是用AGGREGATE类型,有些字段需要replace,所以我又试了一下,是否正常使用

重新建表 CREATE TABLE test_db.`worker_replace` ( `startTime` datetime NOT NULL , `id` int NOT NULL, `name` varchar(255) DEFAULT NULL, `age` int DEFAULT NULL, `city` varchar(255) NOT NULL, `salary` int REPLACE NOT NULL )ENGINE=olap AGGREGATE KEY(startTime,id,name,age,city) PARTITION BY RANGE(startTime)() distributed BY HASH(name) PROPERTIES ( ...... ); 数据源

直接从文件中拿了

{"startTime" : "2022-05-06 00:00:00","id" : 1,"name" : "dd","age" :14,"city" : "南京","salary" : 888} {"startTime" : "2022-05-06 00:00:00","id" : 2,"name" : "dd","age" :14,"city" : "南京","salary" : 888} {"startTime" : "2022-05-06 00:00:00","id" : 3,"name" : "dd","age" :14,"city" : "南京","salary" : 888} {"startTime" : "2022-05-06 00:00:00","id" : 4,"name" : "dd","age" :14,"city" : "南京","salary" : 888} {"startTime" : "2022-05-06 00:00:00","id" : 5,"name" : "dd","age" :14,"city" : "南京","salary" : 888} {"startTime" : "2022-05-06 00:00:00","id" : 6,"name" : "dd","age" :14,"city" : "南京","salary" : 888} {"startTime" : "2022-05-06 00:00:00","id" : 7,"name" : "dd","age" :14,"city" : "南京","salary" : 888} {"startTime" : "2022-05-06 00:00:00","id" : 8,"name" : "dd","age" :14,"city" : "南京","salary" : 888} {"startTime" : "2022-05-06 00:00:00","id" : 9,"name" : "dd","age" :14,"city" : "南京","salary" : 888} {"startTime" : "2022-05-06 00:00:00","id" : 10,"name" : "dd","age" :14,"city" : "南京","salary" : 888} {"startTime" : "2022-05-06 00:00:00","id" : 1,"name" : "dd","age" :14,"city" : "南京","salary" : 999} {"startTime" : "2022-05-06 00:00:00","id" : 2,"name" : "dd","age" :14,"city" : "南京","salary" : 999} {"startTime" : "2022-05-06 00:00:00","id" : 3,"name" : "dd","age" :14,"city" : "南京","salary" : 999} {"startTime" : "2022-05-06 00:00:00","id" : 4,"name" : "dd","age" :14,"city" : "南京","salary" : 999} 最后代码 public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行为1 效果比较明显 DataStreamSource data = env.readTextFile("your_path").setParallelism(1); data.print(); data.addSink(new MySinkF().MySinkDoris("worker_replace")); try { env.execute("doris repalce"); } catch (Exception e) { e.printStackTrace(); } }

最后去观察表,确实和预期结果一样



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有