您现在的位置是:亿华云 > 热点
Flink SQL 知其所以然:Group 聚合操作
亿华云2025-10-05 14:43:30【热点】5人已围观
简介Group 聚合Group 聚合定义支持 Batch\Streaming 任务):Flink 也支持 Group 聚合。Group 聚合和上面介绍到的窗口聚合的不同之处,就在于 Group 聚合是按照
tumble window + key
应用场景:一般用于对数据进行分组,合操然后后续使用聚合函数进行 count、知其作sum 等聚合操作。合操那么这时候,知其作小伙伴萌就会问到,我其实可以把窗口聚合的写法也转换为 Group 聚合,只需要把 Group 聚合的 Group By key 换成时间就行,那这两个聚合的区别到底在哪?
首先来举一个例子看看怎么将窗口聚合转换为 Group 聚合。假如一个窗口聚合是源码库按照 1 分钟的粒度进行聚合,如下 SQL:
-- 数据源表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL 5 SECOND
) WITH (
connector = datagen,
rows-per-second = 10,
fields.dim.length = 1,
fields.user_id.min = 1,
fields.user_id.max = 100000,
fields.price.min = 1,
fields.price.max = 100000
)
-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
connector = print
)
-- 数据处理逻辑
insert into sink_table
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval 1 minute) AS STRING)) * 1000 as window_start
from source_table
group by
dim,
-- 按照 Flink SQL tumble 窗口写法划分窗口
tumble(row_time, interval 1 minute)转换为 Group 聚合的写法如下:
Group 聚合
-- 数据源表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL 5 SECOND
) WITH (
connector = datagen,
rows-per-second = 10,
fields.dim.length = 1,
fields.user_id.min = 1,
fields.user_id.max = 100000,
fields.price.min = 1,
fields.price.max = 100000
);
-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
connector = print
);
-- 数据处理逻辑
insert into sink_table
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source_table
group by
dim,
-- 将秒级别时间戳 / 60 转化为 1min
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)确实没错,上面这个转换是一点问题都没有的。
但是窗口聚合和 Group by 聚合的差异在于:
本质区别:窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑 allowLateness)。而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出。
运行层面:窗口聚合是和 时间 绑定的,云服务器窗口聚合其中窗口的计算结果触发都是由时间(Watermark)推动的。Group by 聚合完全由数据推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。
SQL 语义也是拿离线和实时做对比,Orders 为 kafka,target_table 为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子:
数据源算子(From Order):数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的Group 聚合算子,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个 SubTask(并发) 中。
Group 聚合算子(group by key + sum\count\max\min):接收到上游算子发的一条一条的数据,去状态 state 中找这个 key 之前的 sum\count\max\min 结果。如果有结果oldResult,拿出来和当前的香港云服务器数据进行sum\count\max\min 计算出这个 key 的新结果newResult,并将新结果[key, newResult] 更新到 state 中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息-[key, oldResult],然后再将新结果发往下游+[key, newResult];如果 state 中没有当前 key 的结果,则直接使用当前这条数据计算 sum\max\min 结果newResult,并将新结果[key, newResult] 更新到 state 中,当前是第一次往下游发,则不需要先发回撤消息,直接发送+[key, newResult]。
数据汇算子(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中。
这个实时任务也是 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。
特别注意:
Group by 聚合涉及到了回撤流(也叫 retract 流),会产生回撤流是因为从整个 SQL 的语义来看,上游的 Kafk数据是源源不断的,无穷无尽的,那么每次这个 SQL 任务产出的结果都是一个中间结果,所以每次结果发生更新时,都需要将上一次发出的中间结果给撤回,然后将最新的结果发下去。Group by 聚合涉及到了状态:状态大小也取决于不同 key 的数量。为了防止状态无限变大,我们可以设置状态的 TTL。以上面的 SQL 为例,上面 SQL 是按照分钟进行聚合的,理论上到了今天,通常我们就可以不用关心昨天的数据了,那么我们可以设置状态过期时间为一天。关于状态过期时间的设置参数可以参考下文运行时参数 小节。如果这个 SQL 放在 Hive 中执行时,其中 Orders 为 Hive,target_table 也为 Hive,其也会生成三个相同的算子,但是其和实时任务的执行方式完全不同:
数据源算子(From Order):数据源算子从 Order Hive 中读取到所有的数据,然后所有数据发送给下游的Group 聚合算子,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个算子中,然后这个算子就运行结束了,释放资源了。Group 聚合算子(group by + sum\count\max\min):接收到上游算子发的所有数据,然后遍历计算 sum\count\max\min 结果,批量发给下游数据汇算子,这个算子也就运行结束了,释放资源了。数据汇算子(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Hive 中,整个任务也就运行结束了,整个任务的资源也就都释放了。Group 聚合支持 Grouping sets、Rollup、CubeGroup 聚合也支持 Grouping sets、Rollup、Cube。
举一个 Grouping sets 的案例:
SELECT
supplier_id
, rating
, product_id
, COUNT(*)
FROM (VALUES
(supplier1, product1, 4),
(supplier1, product2, 3),
(supplier2, product3, 3),
(supplier2, product4, 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
( supplier_id, product_id, rating ),
( supplier_id, product_id ),
( supplier_id, rating ),
( supplier_id ),
( product_id, rating ),
( product_id ),
( rating ),
( )
)很赞哦!(21)
上一篇: 换新域名(重新来过)
下一篇: 小白注册网站域名该怎么办?有什么步骤?
相关文章
- 5、使用企业名称的英文名称作为域名也是国内许多企业选择域名的一种方式,特别适合一些与计算机、网络和通信相关的行业。
- 投资各类域名就像到处打游击战,结果处处失败。因为这样,对任何一个中国域名市场的走势和价格都没有准确的把握,所以最好缩小范围,准确把握战场态势,埋伏。
- 其次,一般域名注册有一个获取密码的按钮,域名注册商点击后会向您发送密码。在得到域名注册商发送的密码后,将其传输到域名服务提供商网站,然后输入密码,此时域名呈现申请状态。提交申请后,原注册人通常会向您发送一封电子邮件,询问您是否同意转让。此时,您只需点击同意转移按钮,域名注册商就可以成功转移。
- 比较短的域名方便用户记忆和传播,它带来的好处往往会超过其他类型的域名,如果你非要域名短而且还要包含关键词,那么往往会事与愿违,现在这种域名基本上是可遇而不可求的。
- 便宜域名使用如何?小白可以买到便宜域名吗?
- 5. 四种状态过后,域名管理机构释放域名给公众注册。
- 为什么说注册域名注意细节?哪些我们不能忽视?
- 用户邮箱的静态密码可能已被钓鱼和同一密码泄露。在没有收到安全警报的情况下,用户在适当的时间内不能更改密码。在此期间,攻击者可以随意输入帐户。启用辅助身份验证后,如果攻击者无法获取移动电话动态密码,他将无法进行身份验证。这样,除非用户的电子邮件密码和手机同时被盗,否则攻击者很难破解用户的邮箱。
- 四、长串数字域名
- 只要我们做的是从目前的市场情况选择域名,从简单易记,从个性特征上,我们就可以找到一个好域名进行注册。域名注册进行域名记录和解析以及绑定网站后,客户可以通过URL登录您的网站。
热门文章
站长推荐
3、查看排名
要如何了解反向解析和域名解析?新手该怎么去操作?
4.域名的整体品牌营销力
3、考虑出售域名
互联网中的地址是数字的IP地址,域名解析的作用主要就是为了便于记忆。
打开https://www.aizhan.com/输入自己想要查询的域名然后按回车键,如果做过网站都会有数据显示出来
为了避免将来给我们的个人站长带来的麻烦,在选择域名后缀时,我们的站长最好省略不稳定的后缀域名,比如n,因为我们不知道策略什么时候会改变,更不用说我们将来是否还能控制这个域名了。因此,如果站长不是企业,或者有选择的话,如果不能选择域名的cn类,最好不要选择它。
这个不用多说,不同平台的注册价格不同,且不同平台对域名释放交易的把控与曝光不同,当然价格相对便宜且平台渠道广操作便利的平台最好。