詳解Flink CEP的概念及功能
我們?cè)诳粗辈サ臅r(shí)候,不管對(duì)于主播還是用戶(hù)來(lái)說(shuō),非常重要的一項(xiàng)就是彈幕文化。為了增加直播趣味性和互動(dòng)性, 各大網(wǎng)絡(luò)直播平臺(tái)紛紛采用彈窗彈幕作為用戶(hù)實(shí)時(shí)交流的方式,內(nèi)容豐富且形式多樣的彈幕數(shù)據(jù)中隱含著復(fù)雜的用戶(hù)屬性與用戶(hù)行為, 研究并理解在線(xiàn)直播平臺(tái)用戶(hù)具有彈幕內(nèi)容審核與監(jiān)控、輿論熱點(diǎn)預(yù)測(cè)、個(gè)性化摘要標(biāo)注等多方面的應(yīng)用價(jià)值。
本文不分析彈幕數(shù)據(jù)的應(yīng)用價(jià)值,只通過(guò)彈幕內(nèi)容審核與監(jiān)控案例來(lái)了解下Flink CEP的概念及功能。
在用戶(hù)發(fā)彈幕時(shí),直播平臺(tái)主要實(shí)時(shí)監(jiān)控識(shí)別兩類(lèi)彈幕內(nèi)容:一類(lèi)是發(fā)布不友善彈幕的用戶(hù) ;一類(lèi)是刷屏的用戶(hù)。
我們先記住上述需要實(shí)時(shí)監(jiān)控識(shí)別的兩類(lèi)用戶(hù),接下來(lái)介紹Flink CEP的API,然后使用CEP解決上述問(wèn)題。
Flink CEPFlink CEP 是什么
Flink CEP是一個(gè)基于Flink的復(fù)雜事件處理庫(kù),可以從多個(gè)數(shù)據(jù)流中發(fā)現(xiàn)復(fù)雜事件,識(shí)別有意義的事件(例如機(jī)會(huì)或者威脅),并盡快的做出響應(yīng),而不是需要等待幾天或則幾個(gè)月相當(dāng)長(zhǎng)的時(shí)間,才發(fā)現(xiàn)問(wèn)題。
Flink CEP API
CEP API的核心是Pattern(模式) API,它允許你快速定義復(fù)雜的事件模式。每個(gè)模式包含多個(gè)階段(stage)或者我們也可稱(chēng)為狀態(tài)(state)。從一個(gè)狀態(tài)切換到另一個(gè)狀態(tài),用戶(hù)可以指定條件,這些條件可以作用在鄰近的事件或獨(dú)立事件上。
介紹API之前先來(lái)理解幾個(gè)概念:
1. 模式與模式序列
簡(jiǎn)單模式稱(chēng)為模式,將最終在數(shù)據(jù)流中進(jìn)行搜索匹配的復(fù)雜模式序列稱(chēng)為模式序列,每個(gè)復(fù)雜模式序列是由多個(gè)簡(jiǎn)單模式組成。
匹配是一系列輸入事件,這些事件通過(guò)一系列有效的模式轉(zhuǎn)換,能夠訪(fǎng)問(wèn)復(fù)雜模式圖的所有模式。
每個(gè)模式必須具有唯一的名稱(chēng),我們可以使用模式名稱(chēng)來(lái)標(biāo)識(shí)該模式匹配到的事件。
2. 單個(gè)模式
一個(gè)模式既可以是單例的,也可以是循環(huán)的。單例模式接受單個(gè)事件,循環(huán)模式可以接受多個(gè)事件。
3. 模式示例:
有如下模式:a b+ c?d
其中a,b,c,d這些字母代表的是模式,+代表循環(huán),b+就是循環(huán)模式;?代表可選,c?就是可選模式;
所以上述模式的意思就是:a后面可以跟一個(gè)或多個(gè)b,后面再可選的跟c,最后跟d。
其中a、c? 、d是單例模式,b+是循環(huán)模式。
一般情況下,模式都是單例模式,可以使用量詞(Quantifiers)將其轉(zhuǎn)換為循環(huán)模式。
每個(gè)模式可以帶有一個(gè)或多個(gè)條件,這些條件是基于事件接收進(jìn)行定義的;蛘哒f(shuō),每個(gè)模式通過(guò)一個(gè)或多個(gè)條件來(lái)匹配和接收事件。
了解完上述概念后,接下來(lái)介紹下案例中需要用到的幾個(gè)CEP API:
案例中用到的CEP API:
Begin:定義一個(gè)起始模式狀態(tài)
用法:start = Pattern.<Event>begin("start");
Next:附加一個(gè)新的模式狀態(tài)。匹配事件必須直接接續(xù)上一個(gè)匹配事件
用法:next = start.next("next");
Where:定義當(dāng)前模式狀態(tài)的過(guò)濾條件。僅當(dāng)事件通過(guò)過(guò)濾器時(shí),它才能與狀態(tài)匹配
用法:patternState.where(_.message == "TMD");
Within: 定義事件序列與模式匹配的最大時(shí)間間隔。如果未完成的事件序列超過(guò)此時(shí)間,則將其丟棄
用法:patternState.within(Time.seconds(10));
Times:一個(gè)給定類(lèi)型的事件出現(xiàn)了指定次數(shù)
用法:patternState.times(5);
API 先介紹以上這幾個(gè),接下來(lái)我們解決下文章開(kāi)頭提到的案例:
監(jiān)測(cè)用戶(hù)彈幕行為案例
案例一:監(jiān)測(cè)惡意用戶(hù)
規(guī)則:用戶(hù)如果在10s內(nèi),同時(shí)輸入 TMD 超過(guò)5次,就認(rèn)為用戶(hù)為惡意攻擊,識(shí)別出該用戶(hù)。
使用 Flink CEP 檢測(cè)惡意用戶(hù):
import org.a(chǎn)pache.flink.a(chǎn)pi.scala._
import org.a(chǎn)pache.flink.cep.PatternSelectFunction
import org.a(chǎn)pache.flink.cep.scala.{CEP, PatternStream}
import org.a(chǎn)pache.flink.cep.scala.pattern.Pattern
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.TimeCharacteristic
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.windowing.time.Time
object BarrageBehavior01 {
case class LoginEvent(userId:String, message:String, timestamp:Long){
override def toString: String = userId
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 使用IngestionTime作為EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 用于觀(guān)察測(cè)試數(shù)據(jù)處理順序
env.setParallelism(1)
// 模擬數(shù)據(jù)源
val loginEventStream: DataStream[LoginEvent] = env.fromCollection(
List(
LoginEvent("1", "TMD", 1618498576),
LoginEvent("1", "TMD", 1618498577),
LoginEvent("1", "TMD", 1618498579),
LoginEvent("1", "TMD", 1618498582),
LoginEvent("2", "TMD", 1618498583),
LoginEvent("1", "TMD", 1618498585)
)
).a(chǎn)ssignAscendingTimestamps(_.timestamp * 1000)
//定義模式
val loginEventPattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("begin")
.where(_.message == "TMD")
.times(5)
.within(Time.seconds(10))
//匹配模式
val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream.keyBy(_.userId), loginEventPattern)
import scala.collection.Map
val result = patternStream.select((pattern:Map[String, Iterable[LoginEvent]])=> {
val first = pattern.getOrElse("begin", null).iterator.next()
(first.userId, first.timestamp)
})
//惡意用戶(hù),實(shí)際處理可將按用戶(hù)進(jìn)行禁言等處理,為簡(jiǎn)化此處僅打印出該用戶(hù)

發(fā)表評(píng)論
登錄
手機(jī)
驗(yàn)證碼
立即登錄即可訪(fǎng)問(wèn)所有OFweek服務(wù)
還不是會(huì)員?免費(fèi)注冊(cè)
忘記密碼請(qǐng)輸入評(píng)論內(nèi)容...
請(qǐng)輸入評(píng)論/評(píng)論長(zhǎng)度6~500個(gè)字
圖片新聞
-
機(jī)器人奧運(yùn)會(huì)戰(zhàn)報(bào):宇樹(shù)機(jī)器人摘下首金,天工Ultra搶走首位“百米飛人”
-
存儲(chǔ)圈掐架!江波龍起訴佰維,索賠121萬(wàn)
-
長(zhǎng)安汽車(chē)母公司突然更名:從“中國(guó)長(zhǎng)安”到“辰致科技”
-
豆包前負(fù)責(zé)人喬木出軌BP后續(xù):均被辭退
-
字節(jié)AI Lab負(fù)責(zé)人李航卸任后返聘,Seed進(jìn)入調(diào)整期
-
員工持股爆雷?廣汽埃安緊急回應(yīng)
-
中國(guó)“智造”背后的「關(guān)鍵力量」
-
小米汽車(chē)研發(fā)中心重磅落地,寶馬家門(mén)口“搶人”
最新活動(dòng)更多
-
即日-9.16點(diǎn)擊進(jìn)入 >> 【限時(shí)福利】TE 2025國(guó)際物聯(lián)網(wǎng)展·深圳站
-
10月23日立即報(bào)名>> Works With 開(kāi)發(fā)者大會(huì)深圳站
-
10月24日立即參評(píng)>> 【評(píng)選】維科杯·OFweek 2025(第十屆)物聯(lián)網(wǎng)行業(yè)年度評(píng)選
-
11月27日立即報(bào)名>> 【工程師系列】汽車(chē)電子技術(shù)在線(xiàn)大會(huì)
-
12月18日立即報(bào)名>> 【線(xiàn)下會(huì)議】OFweek 2025(第十屆)物聯(lián)網(wǎng)產(chǎn)業(yè)大會(huì)
-
精彩回顧立即查看>> 【限時(shí)下載】ADI中國(guó)三十周年感恩回饋助力企業(yè)升級(jí)!
推薦專(zhuān)題
- 1 阿里首位程序員,“掃地僧”多隆已離職
- 2 先進(jìn)算力新選擇 | 2025華為算力場(chǎng)景發(fā)布會(huì)暨北京xPN伙伴大會(huì)成功舉辦
- 3 宇樹(shù)機(jī)器人撞人事件的深度剖析:六維力傳感器如何成為人機(jī)安全的關(guān)鍵屏障
- 4 清華跑出具身智能獨(dú)角獸:給機(jī)器人安上眼睛和大腦,融資近20億
- 5 踢館大廠(chǎng)和微軟,剖析WPS靈犀的AI實(shí)用主義
- 6 特朗普要求英特爾首位華人 CEO 辭職
- 7 AI版“四萬(wàn)億刺激”計(jì)劃來(lái)了
- 8 騰訊 Q2 財(cái)報(bào)亮眼:AI 已成第二增長(zhǎng)曲線(xiàn)
- 9 解碼特斯拉新AI芯片戰(zhàn)略 :從Dojo到AI5和AI6推理引擎
- 10 騰訊米哈游押寶的中國(guó)AI應(yīng)用,正在海外悶聲發(fā)財(cái)