Flink是一种流式计算框架. 流计算与传统的离线计算相比, 其主要特征是: 需要对随时进入系统的数据进行实时计算. 流计算处理的数据集是无界的, 数据来源可能是非持久化的. 基于以上特征, 流计算需要满足处理结果立即可用, 并随着数据的到来, 持续的进行更新. 由于数据非持久化, 因此对于容错也有较高的要求.
Flink基本概念 Flink的执行过程可以抽象的分割为 接收数据 -> 处理数据 -> 输出结果. 具体来说,
接收数据: 从一个或多个数据源(例如kafka)接收数据
处理数据: 使用Flink提供的各类算子, 对收到的数据进行处理
输出结果: 将计算后的结果输出(例如输出到Redis)
从计算逻辑的角度来看, 输入和输出都是外部的固定操作, 只有核心的数据处理过程是需要根据需求不断变化的.
Flink安装和配置 Flink在生产环境一般以集群的方式运行, 将相关的任务打包后上传到集群执行. 对于本地学习, Flink也支持直接在IDEA中执行.
对于本地执行, 需要安装Java 11及以上的版本(推荐安装长期支持版, 例如Java 17), 并下载Flink的代码包, 具体可参考官方安装指引 .
在本地执行时, 可直接创建一个Maven项目 , 并在运行前在IDEA的配置中, 进行如下配置, 使得代码能够直接运行.
IntelliJ IDEA: Go to Run > Edit Configurations > Modify options > Select include dependencies with “Provided” scope.
代码基本结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package spendreport;import com.fasterxml.jackson.databind.ObjectMapper;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.KeyedProcessFunction;import org.apache.flink.util.Collector;public class WordCountJob { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.readTextFile("input.json" ).map((MapFunction<String, LogRecord>) s -> { ObjectMapper objectMapper = new ObjectMapper (); return objectMapper.readValue(s, LogRecord.class); }).keyBy(LogRecord::getAction) .process(new KeyedProcessFunction <String, LogRecord, Tuple2<String, Integer>>() { @Override public void processElement (LogRecord value, KeyedProcessFunction<String, LogRecord, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception { out.collect(Tuple2.of(value.getAction(), 1 )); } }).keyBy(t -> t.f0).sum(1 ) .print(); env.execute(); } }
可以注意到, Flink操作过程与Java的Stream API具有高度的相似性, 许多概念都是对应存在的.
时间语义 流计算对于时效性非常敏感, 因此在处理时采取不同的时间计算方式, 对于算子的结算结果也有不同的影响. Flink支持三种不同的计算时间
处理时间: 即计算节点所在的机器的本地时间. 由于分布式系统无法保证时间完全一致,且出现故障进行恢复时可能有数据堆积, 因此处理时间通常无法准确反应实际的时间, 通常仅用于不关系发生事件的场景.
事件事件: 每个独立事件发生时所在设备上的时间. 事件时间通常在事件进入Flink之前就已经被内嵌在事件中. 事件时间能够保证正确性, 哪怕事件是无序的, 延迟的甚至是从持久层的日志或者备份中恢复的.
摄入时间: 事件进入Flink的时间. 作业在执行时, 每个事件以执行source运算符对应的任务的节点的当前时钟作为时间戳.
水位线 在Flink计算引擎中度量事件时间进度的机制被称为水位线. 水位线作为特殊的事件被注入到事件流中流向下游.
时间窗口 窗口将无界流切片成一系列有界的数据集.
固定窗口: 固定窗口按固定的时间段或长度(比如小时或元素个数)来分片数据集
滑动窗口: 由窗口大小以及滑动周期构成(比如以小时作为窗口大小,分钟作为滑动周期)
聚合查询 Java模式 例如想要按照事件查询某个滑动窗口内的次数, 使用Java的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Transaction> transactions = env .addSource(new TransactionSource (10 , 3 , 1 )) .name("transactions" ); DataStream<Alert> alerts = transactions .assignTimestampsAndWatermarks( WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(3 )) .withTimestampAssigner((event, timestamp) -> event.getTimestamp())) .keyBy(Transaction::getAccountId) .window(SlidingEventTimeWindows.of(Duration.ofSeconds(10 ), Duration.ofSeconds(2 ))) .aggregate( new AggregateFunction <Transaction, Alert, Alert>() { @Override public Alert createAccumulator () { return new Alert ("" , 0 ); } @Override public Alert add (Transaction transaction, Alert o) { return new Alert (transaction.getAccountId(), o.getCount() + 1 ); } @Override public Alert getResult (Alert o) { return o; } @Override public Alert merge (Alert o, Alert acc1) { return new Alert (o.getUid(), o.getCount() + acc1.getCount()); } } ); alerts .addSink(new SinkFunction <>() { public void invoke (Alert alert, Context ctx) { LoggerFactory.getLogger(Alert.class).info(alert.toString()); } }) .name("send-alerts" ); env.execute("Fraud Detection" );
逻辑并不复杂, 但由于Java的强类型要求, 这里面涉及大量的类型问题, 泛型代码写起来非常的难受.
SQL模式 相较于Java写的头疼的类型转换, 使用SQL可以极大的简化代码, 因此官方也更推荐使用SQL来进行处理.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<Transaction> transactions = env .addSource(new TransactionSource (10 , 3 , 1 )) .name("transactions" ); WatermarkStrategy<Transaction> watermarkStrategy = WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(3 )) .withTimestampAssigner((event, timestamp) -> event.getTimestamp()); DataStream<Transaction> watermarkedTransactions = transactions .assignTimestampsAndWatermarks(watermarkStrategy); tableEnv.createTemporaryView("transactions" , watermarkedTransactions, $("accountId" ), $("timestamp" ).rowtime().as("rowtime" ) ); Table resultTable = tableEnv.sqlQuery( "SELECT accountId, COUNT(*) AS cnt " + "FROM transactions " + "GROUP BY accountId, " + "HOP(rowtime, INTERVAL '2' SECOND, INTERVAL '10' SECOND)" ); DataStream<Row> resultStream = tableEnv.toDataStream(resultTable); resultStream.print(); env.execute("SQL Window Aggregation" );
SQL可以解决类型问题, 但水位线还是要手动设置一下
窗口函数速查表
窗口类型
SQL 语法
参数说明
示例
等价 DataStream API
滚动窗口
TUMBLE(rowtime, INTERVAL '时长' 单位)
• rowtime: 事件时间字段 • 时长: 窗口大小 • 单位: SECOND/MINUTE/HOUR等
TUMBLE(rowtime, INTERVAL '10' SECOND)
TumblingEventTimeWindows.of(Duration.ofSeconds(10))
滑动窗口
HOP(rowtime, INTERVAL '滑动步长' 单位, INTERVAL '窗口大小' 单位)
• rowtime: 事件时间字段 • 滑动步长: 窗口滑动间隔 • 窗口大小: 窗口长度
HOP(rowtime, INTERVAL '2' SECOND, INTERVAL '10' SECOND)
SlidingEventTimeWindows.of(Duration.ofSeconds(10), Duration.ofSeconds(2))
会话窗口
SESSION(rowtime, INTERVAL '超时时间' 单位)
• rowtime: 事件时间字段 • 超时时间: 会话不活动间隔
SESSION(rowtime, INTERVAL '5' MINUTE)
EventTimeSessionWindows.withGap(Duration.ofMinutes(5))
累积窗口
CUMULATE(rowtime, INTERVAL '步长' 单位, INTERVAL '最大时长' 单位)
• rowtime: 事件时间字段 • 步长: 每次累积间隔 • 最大时长: 累积上限
CUMULATE(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY)
需自定义 Trigger + AggregateFunction
时间单位速查
单位
SQL 语法
Java Duration 等价
秒
INTERVAL 'X' SECOND
Duration.ofSeconds(X)
分钟
INTERVAL 'X' MINUTE
Duration.ofMinutes(X)
小时
INTERVAL 'X' HOUR
Duration.ofHours(X)
天
INTERVAL 'X' DAY
Duration.ofDays(X)
完整 SQL 查询模板 1. 滚动窗口模板 1 2 3 4 5 6 7 8 9 SELECT user_id, COUNT (* ) as event_count, TUMBLE_START(rowtime, INTERVAL '10' MINUTE ) as window_start, TUMBLE_END(rowtime, INTERVAL '10' MINUTE ) as window_end FROM eventsGROUP BY user_id, TUMBLE(rowtime, INTERVAL '10' MINUTE )
2. 滑动窗口模板 1 2 3 4 5 6 7 8 9 SELECT user_id, COUNT (* ) as event_count, HOP_START(rowtime, INTERVAL '1' MINUTE , INTERVAL '5' MINUTE ) as window_start, HOP_END(rowtime, INTERVAL '1' MINUTE , INTERVAL '5' MINUTE ) as window_end FROM eventsGROUP BY user_id, HOP(rowtime, INTERVAL '1' MINUTE , INTERVAL '5' MINUTE )
3. 会话窗口模板 1 2 3 4 5 6 7 8 9 SELECT user_id, COUNT (* ) as event_count, SESSION_START(rowtime, INTERVAL '10' MINUTE ) as window_start, SESSION_END(rowtime, INTERVAL '10' MINUTE ) as window_end FROM eventsGROUP BY user_id, SESSION(rowtime, INTERVAL '10' MINUTE )
4. 累积窗口模板 1 2 3 4 5 6 7 8 9 SELECT user_id, SUM (amount) as total_amount, CUMULATE_START(rowtime, INTERVAL '1' HOUR , INTERVAL '24' HOUR ) as window_start, CUMULATE_END(rowtime, INTERVAL '1' HOUR , INTERVAL '24' HOUR ) as window_end FROM transactionsGROUP BY user_id, CUMULATE(rowtime, INTERVAL '1' HOUR , INTERVAL '24' HOUR )
⚠️ 重要前置步骤:定义事件时间 在使用任何窗口前,必须先定义事件时间属性:
方法1:DDL 方式
1 2 3 4 5 CREATE TABLE events ( user_id STRING, event_time TIMESTAMP (3 ), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) WITH (...);
方法2:DataStream 转换方式
1 2 3 4 5 Table eventsTable = tableEnv.fromDataStream( eventsStream, $("user_id" ), $("event_time" ).rowtime() );
💡 使用技巧
窗口函数必须在 GROUP BY 子句中使用
TUMBLE_START/HOP_START 等函数用于获取窗口边界时间
累积窗口特别适合实时更新累计指标的场景
会话窗口无需指定固定大小,根据数据活跃度自动划分
恰好一次处理
最多一次: 事件可能会丢失但不会被重复传递
至少一次: 事件不会丢失但可能会被重复传递
恰好一次: 事件既不会丢失也不会被重复传递
Flink的分布式异步快照机制支持”恰好一次”语义. 但同样提供了对”至少一次”语义的支持,这给予了用户根据不同场景(比如允许数据重复,但希望延迟尽可能低)进行合理选择的灵活性
参考资料