How to implement Group Window Function to a “Over Partition By” on Flink SQL?


I'm trying to use time windows over Flink SQL, it has been hard for me to get familiar with the framework, but I have already defined my

<ul><li>StreamExecutionEnvironment</li> <li>StreamTableEnvironment</li> <li>FlinkKafkaConsumer</li> </ul>

Then apply query SQL and group by time windows as follows.

<pre class="lang-scala prettyprint-override"> val stream = env.addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties) ) val parsed: DataStream[Order] = stream.map(x=> .... //then I register a DataStream as a table, (Flink Version: 9.3) tEnv.registerDataStream("OrdersB", parsed, 'user, 'product, 'amount, 'proctime.proctime) //grouping by 5-second window val result2 = tEnv.sqlQuery( """ |SELECT user, SUM(amount) |FROM OrdersB |GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND), user |""".stripMargin)

this is going well, but my goal of usql Flink SQL is because I have the need to use windows functions like in oracle or mysql.

The query I'm interested in is this.

<pre class="lang-scala prettyprint-override">val result = tEnv.sqlQuery( """ |SELECT user, product, c_num |FROM ( | SELECT *, | COUNT(user) OVER (PARTITION BY product ORDER BY proctime ASC) as c_num | FROM Orders) |""".stripMargin)

I tried the query with data inside the code and it works

my challenge is to perform this same query for a 5 second time window as in the first example, without having to group. i just want a new column enriched with the value calculated by this function

COUNT(user) OVER (PARTITION BY product ORDER BY proctime ASC) as c_num

my data source is a kafka topic, the following is an example of the individual events user, product, amount

<pre class="lang-scala prettyprint-override">1,"beer",3 1,"beer",1 2,"beer",3 3,"diaper",4 4,"diaper",1 5,"diaper",5 6,"rubber",2

the result should be as follows, considering that the previous events were inserted in a 5-second window, (user, product, c_num)

<pre class="lang-java prettyprint-override">1,"beer",3 1,"beer",3 2,"beer",3 3,"diaper",3 4,"diaper",3 5,"diaper",3 6,"rubber",1

Thank you!



