您现在的位置是:亿华云 > 知识
关于 Flink Regular Join 与 TTL 的理解
亿华云2025-10-06 06:44:44【知识】1人已围观
简介对于流查询,Regular Join的语法是最灵活的,它允许任何类型的更新(插入、更新、删除)输入表。Regular Join 包含以下几种以 L 作为左流中的数据标识,R 作为右流中的数据标识):I
对于流查询,关于Regular Join 的理解语法是最灵活的,它允许任何类型的关于更新(插入、更新、理解删除)输入表。关于
Regular Join 包含以下几种(以 L 作为左流中的理解数据标识,R 作为右流中的关于数据标识):
Inner Join(Inner Equal Join):当两条流 Join 到才会输出 +[L, R]Left Join(Outer Equal Join):左流数据到达之后 Join 到 R 流数据则输出 +[L, R],没 Join 到输出 +[L,理解 null])。如果右流之后数据到达之后,关于发现左流之前输出过没有 Join 到的理解数据,则会发起回撤流,关于先输出 -[L,理解 null],然后输出 +[L,关于 R]。Right Join(Outer Equal Join):与 Left Join 逻辑相反。理解Full Join(Outer Equal Join):流任务中,关于左流或者右流的数据到达之后,无论有没有 Join 到另外一条流的数据,都会输出(对右流来说:Join 到输出 +[L, R],没 Join 到输出 +[null, R];对左流来说:Join 到输出 +[L, R],没 Join 到输出 +[L, null])。如果一条流的高防服务器数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流(左流数据到达为例:回撤 -[null, R],输出 +[L, R],右流数据到达为例:回撤 -[L, null],输出 +[L, R])。Regular Inner JoinFlink SQL:
CREATE TABLE matchResult (
guid STRING
) WITH (
connector = kafka,
topic = match_result_log_test,
properties.bootstrap.servers = xxxxxxxxxxxxxxxxxxx,
properties.group.id = flinkTestGroup,
scan.startup.mode = latest-offset,
format = json
);
CREATE TABLE readRecord (
guid STRING,
book_name STRING
) WITH (
connector = kafka,
topic = read_record_log_test,
properties.bootstrap.servers = xxxxxxxxxxxxxxxxxxx,
properties.group.id = flinkTestGroup,
scan.startup.mode = latest-offset,
format = json
);
CREATE TABLE sink_table (
guid STRING,
book_name STRING
) WITH (
connector = print
);
INSERT INTO sink_table
SELECT
matchResult.guid,
readRecord.book_name
FROM matchResult
INNER JOIN readRecord ON matchResult.guid = readRecord.guid;输出结果解析:
-- L 流数据达到,由于没有 Join 到 R 流数据而且是 inner join 便不输出结果
+I[111, book1] -- R 流数据达到, Join 到 L 流数据,便输出 +I[111, book1]
-- R 流数据达到,由于没有 Join 到 L 流数据而且是 inner join 便不输出结果
+I[222, book2] -- L 流数据达到, Join 到 R 流数据便输出结果Regular Left Join(Right join 则相反)Flink SQL:
CREATE TABLE matchResult (
guid STRING
) WITH (
connector = kafka,
topic = match_result_log_test,
properties.bootstrap.servers = xxxxxxxxxxxxxxxxxxx,
properties.group.id = flinkTestGroup,
scan.startup.mode = latest-offset,
format = json
);
CREATE TABLE readRecord (
guid STRING,
book_name STRING
) WITH (
connector = kafka,
topic = read_record_log_test,
properties.bootstrap.servers = xxxxxxxxxxxxxxxxxxx,
properties.group.id = flinkTestGroup,
scan.startup.mode = latest-offset,
format = json
);
CREATE TABLE sink_table (
guid STRING,
book_name STRING
) WITH (
connector = print
);
INSERT INTO sink_table
SELECT
matchResult.guid,
readRecord.book_name
FROM matchResult
LEFT JOIN readRecord ON matchResult.guid = readRecord.guid;输出结果解析:
+I[111, null] -- L 流数据达到,没有 Join 到 R 流数据,便输出 +[L, null]
-D[111, null] -- R 流的数据到达,发现 L 流之前输出过没有 Join 到的数据,则会发起回撤流,先输出 -[L, null]
+I[111, book1] -- 再输出 +[L, R]
-- 这里模拟一条 R 流 guid = 222 的数据到达,由于是 left join 且没有 join 到 L 流,因此不做输出
+I[222, book2] -- 当 L 流 guid = 222 的数据达到 join R 流 后输出结果 +[L, R]Regular Full JoinFlink SQL:
CREATE TABLE matchResult (
guid STRING
) WITH (
connector = kafka,
topic = match_result_log_test,
properties.bootstrap.servers = xxxxxxxxxxxxxxxxxxx,
properties.group.id = flinkTestGroup,
scan.startup.mode = latest-offset,
format = json
);
CREATE TABLE readRecord (
guid STRING,
book_name STRING
) WITH (
connector = kafka,
topic = read_record_log_test,
properties.bootstrap.servers = xxxxxxxxxxxxxxxxxxx,
properties.group.id = flinkTestGroup,
scan.startup.mode = latest-offset,
format = json
);
CREATE TABLE sink_table (
guid STRING,
book_name STRING
) WITH (
connector = print
);
INSERT INTO sink_table
SELECT
matchResult.guid,
readRecord.book_name
FROM matchResult
FULL JOIN readRecord ON matchResult.guid = readRecord.guid;输出结果解析:
+I[111, null] -- L 流数据达到,没有 Join 到 R 流数据,便输出 +I[L, null]
+I[null, book2] -- R 流数据达到,站群服务器没有 Join 到 R 流数据,便输出 +I[null, R]
-D[null, book2] -- L 流新数据到达,发现之前 R 流之前输出过没有 Join 到的数据,则发起回撤流,先输出 -D[null, R]
+I[222, book2] -- 再输出 +I[L, R]
-D[111, null] -- 反之同理
+I[111, book1]TTL 概念在 Regular Join 时 Flink 会将两条没有时间窗口限制的流的所有数据存储在 State 中,由于流是无穷无尽持续流入的,随着时间的不断推进,内存中积累的状态会越来越多。
针对这个问题,Flink 提出了空闲状态保留时间(Idle State Retention Time)的概念。通过为每个状态设置 Timer,如果这个状态中途被访问过,则重新设置 Timer;否则(如果状态一直未被访问,长期处于 Idle 状态)则在 Timer 到期时做状态清理。这样,就可以确保每个状态都能得到及时的清理,可以通过 table.exec.state.ttl 参数进行控制(注意:这同时也会对结果的准确性有所影响,因此需要合理的权衡)。香港云服务器
很赞哦!(7)
相关文章
- 用户邮箱的静态密码可能已被钓鱼和同一密码泄露。在没有收到安全警报的情况下,用户在适当的时间内不能更改密码。在此期间,攻击者可以随意输入帐户。启用辅助身份验证后,如果攻击者无法获取移动电话动态密码,他将无法进行身份验证。这样,除非用户的电子邮件密码和手机同时被盗,否则攻击者很难破解用户的邮箱。
- 详解Spring 如何创建 Bean 对象?
- JavaScript之深入理解this
- 推荐6个Github上超有意思的前端项目
- 小白注册网站域名该怎么办?有什么步骤?
- Windows 10中使用Python碰到的奇怪现象
- 关于软件架构的一切
- 你做过代码 Review 吗?
- 在数以亿计的网站中,我们应该抓住每一个可能带来宣传的机会,域名可以带有企业的名字,一般可以使用汉语拼音或者英语单词或者是相关缩写的形式,只要用户记住了你企业的名字,就能很容易的打出你的网站域名,同样的,记住了网站域名也能很快的记住你公司的名字。
- 被抛弃的WebDAV,从未有过青春!
热门文章
站长推荐
为什么喜欢国外注册域名?国外注册域名注意什么?
手把手教你用Python爬取百度搜索结果并保存
从零搭建开发脚手架之 HttpServletRequest多次读取异常问题的因和果
8 张图带你了解大型应用架构演进历程
用户邮箱的静态密码可能已被钓鱼和同一密码泄露。在没有收到安全警报的情况下,用户在适当的时间内不能更改密码。在此期间,攻击者可以随意输入帐户。启用辅助身份验证后,如果攻击者无法获取移动电话动态密码,他将无法进行身份验证。这样,除非用户的电子邮件密码和手机同时被盗,否则攻击者很难破解用户的邮箱。
针对JavaScript开发人员的Rust简介
Java高并发编程基础三大利器之Semaphore
Java工作中的并发问题处理方法总结