IoTDB Website 您所在的位置:网站首页 flinksink IoTDB Website

IoTDB Website

2023-03-15 21:22| 来源: 网络整理| 查看: 265

# Flink IoTDB 连接器

IoTDB 与 Apache Flinkopen in new window 的集成。此模块包含了 iotdb sink,允许 flink job 将时序数据写入 IoTDB。

# IoTDBSink

使用 IoTDBSink ,您需要定义一个 IoTDBOptions 和一个 IoTSerializationSchema 实例。 IoTDBSink 默认每次发送一个数据,可以通过调用 withBatchSize(int) 进行调整。

# 示例

该示例演示了如下从一个 Flink job 中发送数据到 IoTDB server 的场景:

一个模拟的 Source SensorSource 每秒钟产生一个数据点。

Flink 使用 IoTDBSink 消费产生的数据并写入 IoTDB 。

import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import com.google.common.collect.Lists; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.security.SecureRandom; import java.util.HashMap; import java.util.Map; import java.util.Random; public class FlinkIoTDBSink { public static void main(String[] args) throws Exception { // run the flink job on local mini cluster StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); IoTDBOptions options = new IoTDBOptions(); options.setHost("127.0.0.1"); options.setPort(6667); options.setUser("root"); options.setPassword("root"); options.setStorageGroup("root.sg"); // If the server enables auto_create_schema, then we do not need to register all timeseries // here. options.setTimeseriesOptionList( Lists.newArrayList( new IoTDBOptions.TimeseriesOption( "root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY))); IoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema(); IoTDBSink ioTDBSink = new IoTDBSink(options, serializationSchema) // enable batching .withBatchSize(10) // how many connectons to the server will be created for each parallelism .withSessionPoolSize(3); env.addSource(new SensorSource()) .name("sensor-source") .setParallelism(1) .addSink(ioTDBSink) .name("iotdb-sink"); env.execute("iotdb-flink-example"); } private static class SensorSource implements SourceFunction { boolean running = true; Random random = new SecureRandom(); @Override public void run(SourceContext context) throws Exception { while (running) { Map tuple = new HashMap(); tuple.put("device", "root.sg.d1"); tuple.put("timestamp", String.valueOf(System.currentTimeMillis())); tuple.put("measurements", "s1"); tuple.put("types", "DOUBLE"); tuple.put("values", String.valueOf(random.nextDouble())); context.collect(tuple); Thread.sleep(1000); } } @Override public void cancel() { running = false; } } } # 运行方法启动 IoTDB server运行 org.apache.iotdb.flink.FlinkIoTDBSink.java 将 Flink job 运行在本地的集群上。


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有