事件驱动:消耗、处理、发送的数据都视作一次事件
流处理、复杂事件处理平台
siddhi application是类似于sql的脚本。.siddhi脚本中分号表示一句结束。
包括消费者(sources)、生产者(sinks)、流、查询、表和方法及其他必要的约定
可以接受\发送给许多不同类型的事件输入\输出,如tcp,http,kafka,file等。
可以接受并转化为不同的数据格式:json,text,xml,key-value
处理事件并进行转化,分析
过程:接受事件进行消费,将事件传递给相应的查询处理,根据逻辑形成新事件,将新事件发送给流
Stream and Query
define stream InputTemperatureStream (sensorId string, temperature double);-- 定义一个流,参数是String的传感器ID和double的温度
@info(name = 'Pass-through')-- query的可选注解
from InputTemperatureStream
select *
insert into TemperatureAndSensorStream; -- 从 InputTemperatureStream 中消费数据,通过select * 获取所有属性生产新事件,输出到 TemperatureAndSensorStream
@info(name = 'Simple-selection')
from TemperatureAndSensorStream -- 从 TemperatureAndSensorStream中消费事件
select temperature
insert into TemperatureOnlyStream;-- 只将 temperature 输出到 TemperatureOnlyStream
-- 值为['aq-14', 35.4]的事件被输入到InputTemperatureStream
-- InputTemperatureStream : ['aq-14', 35.4]
-- TemperatureAndSensorStream : ['aq-14', 35.4]
-- TemperatureOnlyStream : [35.4]
一共使用了 @info 1个注解 define stream、from、select、insert into这几个关键字
Source and Sink
@source(type='http', receiver.url='http://0.0.0.0:8006/temp',@map(type='json'))
-- 消费者是http,接受的地址是 ,接受json数据
define stream TemperatureStream (sensorId string, temperature double);
-- 定义流TemperatureStream
@sink(type='log') -- 生产者 输出流是log
@sink(type='kafka',topic='temperature',bootstrap.servers='localhost:9092',@map(type='json',@payload("""{"temp":"{{temperature}}"}""")))
-- 生产者 输出流是kafka 输出为json topic为temperature bootstrap.servers temperature的值映射到json名为temp
define stream TemperatureOnlyStream (temperature double); -- 定义输出流
@info(name = 'Simple-selection')
from TemperatureStream
select temperature
insert into TemperatureOnlyStream;
-- query 从TemperatureStream中 查询出 temperature属性 输入到 TemperatureOnlyStream
-- input json类型的消息发送到http://0.0.0.0:8006/temp时,一个事件就会产生
{
"event":{
"sensorId":"aq-14",
"temperature":35.4
}
}
-- 数据处理后 事件就会到达stream TemperatureOnlyStream 通过log 和 kafka的方式发送到 TemperatureOnlyStream stream
-- log 通过passThrough 方式生产输出到控制台
Event{timestamp=1574515771712, data=[35.4], isExpired=false}
-- kafka 把stream映射到json 发送到 temperature topic
{"temp":"35.4"}
一共使用了@source、@sink、@map、@info 【kafka】 [@payload]这几个注解
Table and Store
define stream TemperatureStream (sensorId string, temperature double); -- define stream
define table TemperatureLogTable (sensorId string, roomNo string, temperature double); -- define stream
@store(type="rdbms",jdbc.url="jdbc:mysql://localhost:3306/sid",username="root", password="root",jdbc.driver.name="com.mysql.jdbc.Driver")
-- 存储到 关系型数据库 连接为 用户名 密码 驱动名指定的数据库
define table SensorIdInfoTable (sensorId string, roomNo string); -- 定义表 存储在内存中
@info(name = 'Join-query')
from TemperatureStream as t join SensorIdInfoTable as s
on t.sensorId == s.sensorId
-- 定义 query stream join table on 关键字相等
select t.sensorId as sensorId, s.roomNo as roomNo, t.temperature as temperature
insert into TemperatureLogTable;
-- 将笛卡尔积查询结果存入表中
-- 当 SensorIdInfoTable包含一条记录['aq-14', '789'],且当一个事件 ['aq-14', 35.4] 到达 TemperatureStream时,一个事件就会被产生,转换为['aq-14', '789', 35.4],并添加到TemperatureLogTable中。
-- 查询表中的数据
-- on-demand query
from TemperatureDetailsTable
select *
-- calling query() method of SiddhiAppRuntime
一共使用了@store 1个注解 table 关键字
Siddhi Application
@app:name('Temperature-Processor') -- siddhi app name
@app:description('App for processing temperature data.') -- app description
@source(type='inMemory', topic='SensorDetail')
define stream TemperatureStream (sensorId string, temperature double);
-- 消费者 从内存中读取其他app的事件 定义stream
@sink(type='inMemory', topic='Temperature')
define stream TemperatureOnlyStream (temperature double);
-- 生产者 向内存中其他app公开发布事件 定义stream
@info(name = 'Simple-selection')
from TemperatureStream
select temperature
insert into TemperatureOnlyStream;
-- 定义query 自动从消费者中查询出温度 将事件传递给生产者 由生产者公开事件
-- ['aq-14', 35.4] 从topic SensorDetail中被接收到 就会传递给 TemperatureStream
-- [35.4]到达 TemperatureOnlyStream 通过 topic Temperature 传输给内存中其他订阅了的app
一共使用了 @app:name @app:description 2个关键字
Basic Types:int long float double string object
convert() instanceof()
object又分为 list map 等
define stream PatientRegistrationInputStream (
seqNo long, name string, age int,
height float, weight double, photo object,
isEmployee bool, wardNo object);
define stream PatientRegistrationStream (
seqNo long, name string, age int,
height double, weight double, photo object,
isPhotoString bool, isEmployee bool,
wardNo int);
-- 定义两个stream 数据类型不同 进行转换
@info(name = 'Type-processor')
from PatientRegistrationInputStream
select seqNo, name, age,
convert(height, 'double') as height,
weight, photo,
instanceOfString(photo) as isPhotoString,
isEmployee,
cast(wardNo, 'int') as wardNo
-- convert(value,to_type) 返回转换类型后的值
-- instanceofString(value) 返回是否属于string类型
-- cast(value,to_type) 返回转换类型后的值
-- convert是显式转换,cast是强转
insert into PatientRegistrationStream;
-- [1200098, 'Peter Johnson', 34, 194.3f, 69.6, #Fjoiu59%3hkjnknk$#nFT, true, 34]是PatientRegistrationInputStream event
-- [1200098, 'Peter Johnson', 34, 194.3, 69.6, #Fjoiu59%3hkjnknk$#nFT, false, true, 34] PatientRegistrationStream event
一共使用了 convert() instanceofXXX() cast() convert() 4个方法
Map
define stream CoupleDealInfoStream (item1 string, price1 double,item2 string, price2 double);
@info(name = 'Create-map')
from CoupleDealInfoStream
select map:create(item1, price1, item2, price2)
as itemPriceMap
-- query map:create(key1,value1,...,...) 创建map 键值对为i1:p1 i2:p2
insert into NewMapStream;
@info(name = 'Check-map')
from NewMapStream
select map:isMap(itemPriceMap) as isMap,
map:containsKey(itemPriceMap, 'Cookie')
as isCookiePresent,
map:containsValue(itemPriceMap, 24.0)
as isThereItemWithPrice24,
map:isEmpty(itemPriceMap) as isEmpty,
map:keys(itemPriceMap) as keys,
map:size(itemPriceMap) as size
insert into MapAnalysisStream;
-- map:isMap(map) 判断是否是map
-- map:containsKey(map,key) 判断有无key
-- map:containsValue(map,value) 判断有没有这个值
-- map:isEmpty(map) 判断map是否为空
-- map:keys(map) 返回keyset
-- map:size(map) 返回map的大小
@info(name = 'Clone-and-update')
from NewMapStream
select map:replace(
map:put(map:clone(itemPriceMap),
"Gift",
1.0),
"Cake",
12.0) as itemPriceMap
insert into ItemInsertedMapStream;
-- map:replace(map,key,value) map中key的value替换为新的value
-- map:put(map,key,value) 在map中添加键值对,返回这个map
-- map:clone(map) 返回一个克隆的map
-- CoupleDealInfoStream 收到事件 ['Chocolate', 18.0, 'Ice Cream', 24.0]
-- NewMapStream [{Ice Cream=24.0, Chocolate =18.0}]
-- MapAnalysisStream [true, false, true, false, [Ice Cream, Chocolate], 2]
-- ItemInsertedMapStream [{Ice Cream=12.0, Gift=1.0, Chocolate =18.0}]
List
define stream ProductComboStream (product1 string, product2 string, product3 string);
@info(name = 'Create-list')
from ProductComboStream
select list:create(product1, product2, product3)
as productList
insert into NewListStream;
-- list:create(item1,item2,...)
@info(name = 'Check-list')
from NewListStream
select list:isList(productList) as isList,
list:contains(productList, 'Cake')
as isCakePresent,
list:isEmpty(productList) as isEmpty,
list:get(productList, 1) as valueAt1,
list:size(productList) as size
insert into ListAnalysisStream;
-- list:isList(list) 判断是否是list
-- list:contains(list,item) 判断是否含有元素
-- list:isEmpty(list) 判断是否为空
-- list:get(list,index) 获取下标元素
-- list:size(list) 返回list大小
@info(name = 'Clone-and-update')
from NewListStream
select list:remove(
list:add(list:clone(productList), "Toffee"),
"Cake") as productList
insert into UpdatedListStream;
-- list:remove(list,item) 删除元素
-- list:add(list,item) 添加元素,返回list
-- list:clone(list),返回clone的list
-- ProductComboStream 添加事件 ['Ice Cream', 'Chocolate', 'Cake']
-- NewListStream [[Ice Cream, Chocolate, Cake]]
-- ListAnalysisStream [true, true, false, Chocolate, 3]
-- UpdatedListStream [[Ice Cream, Chocolate, Toffee]]
Null
define stream ProductInputStream (item string, price double);
define Table ProductInfoTable (item string, discount double);
@info(name = 'Check-for-null')
from ProductInputStream [not(item is null)]
-- not(test) 返回表达式的结果true/false
-- [] 相当于if 判断条件
-- object is null 返回是否为null
select item,
price is null as isPriceNull
insert into ProductValidationStream;
@info(name = 'Outer-join-with-table')
from ProductInputStream as s
left outer join ProductInfoTable as t
on s.item == t.item
select s.item, s.price, t.discount,
math:power(t.discount, 2) is null
as isFunctionReturnsNull,
t is null as isTNull,
s is null as isSNull,
t.discount is null as isTDiscountNull,
s.item is null as isSItemNull
insert into DiscountValidationStream;
-- math:power(number,exp) 幂运算
-- ProductInputStream ['Cake', 12.0]
-- ProductValidationStream [Cake, false]
-- DiscountValidationStream [Cake, 12.0, null, true, true, false, true, false]
value based filtering
define stream TemperatureStream (
sensorId string, temperature double);
@info(name = 'EqualsFilter')
from TemperatureStream[ sensorId == 'A1234']
select *
insert into SenorA1234TemperatureStream;
-- [] 判断if
@info(name = 'RangeFilter')
from TemperatureStream[ temperature > -2 and temperature < 40]
select *
insert into NormalTemperatureStream;
-- > < and
@info(name = 'NullFilter')
from TemperatureStream[ sensorId is null ]
select *
insert into InValidTemperatureStream;
-- is null
[] 判断if > < and is null
if-then-else
define stream TemperatureStream
(sensorId string, temperature double);
@info(name = 'SimpleIfElseQuery')
from TemperatureStream
select sensorId,
ifThenElse(temperature > -2, 'Valid', 'InValid') as isValid
-- ifThenElse(test,trueValue,falseValue) 三目表达式
insert into ValidTemperatureStream;
@info(name = 'ComplexIfElseQuery')
from TemperatureStream
select sensorId,
ifThenElse(temperature > -2,
ifThenElse(temperature > 40, 'High', 'Normal'),
'InValid')
as tempStatus
-- 嵌套三目表达式
insert into ProcessedTemperatureStream;
ifThenElse(test,trueValue,falseValue) 三目表达式
regex matching
define stream SweetProductionStream (name string, amount int);
@info(name='ProcessSweetProductionStream')
from SweetProductionStream
select name,
regex:matches('chocolate(.*)', name) as isAChocolateProduct,
regex:group('.*\s(.*)', name, 1) as sweetType
-- regex:matches(regex,string) 判断string是否匹配模式,返回true/false
-- regex:group(regex,string,index) string匹配regex后,划分为group,返回下标为index的元素
-- '.*\s(.*)'任意字符 中间是空格 任意字符
insert into ChocolateProductStream;
-- SweetProductionStream ['chocolate cake', 34]
-- ChocolateProductStream ['chocolate cake', true, 'cake']
default
define stream PatientRegistrationInputStream (
seqNo long, name string, age int,
height float, weight double, photo object,
isEmployee bool, wardNo object);
@info(name = 'SimpleIfElseQuery')
from PatientRegistrationInputStream
select
default(name, 'invalid') as name,
default(seqNo, 0l) as seqNo,
default(weight, 0d) as weight,
default(age, 0) as age,
default(height, 0f) as height
-- default(param,defaultValue) 如果该值不存在,就为默认值
insert into PreprocessedPatientRegistrationInputStream;
-- 各类型的默认值 ['invalid', 0 0.0, 0, 0.0]
type based filtering
define stream SweetProductionStream (name string, amount int);
@info(name='ProcessSweetProductionStream')
from SweetProductionStream
select
instanceOfInteger(amount) as isAIntInstance,
name,
amount
insert into ProcessedSweetProductionStream;
-- 将判断类型的结果放入event作为过滤
remove duplicate event
define stream TemperatureStream
(sensorId string, seqNo string, temperature double);
@info(name = 'Deduplicate-sensorId')
from TemperatureStream#unique:deduplicate(sensorId, 1 min)
-- stream_name#unique:deduplicate(id,time_gap) 根据id去重time gap之间到达的event,超出gap的不算重复
select *
insert into UniqueSensorStream;
@info(name = 'Deduplicate-sensorId-and-seqNo')
from TemperatureStream#unique:deduplicate(
str:concat(sensorId,'-',seqNo), 1 min)
-- str:concat(str1,concat_symbol,str2)
select *
insert into UniqueSensorSeqNoStream;
math & logical operation
define stream TemperatureStream
(sensorId string, temperature double);
@info(name = 'celciusTemperature')
from TemperatureStream
select sensorId,
(temperature * 9 / 5) + 32 as temperature
insert into FahrenheitTemperatureStream;
@info(name = 'Overall-analysis')
from FahrenheitTemperatureStream
select sensorId,
math:floor(temperature) as approximateTemp
-- math:floor(number) 向下取整
insert all events into OverallTemperatureStream;
@info(name = 'RangeFilter')
from OverallTemperatureStream
[ approximateTemp > -2 and approximateTemp < 40]
select *
insert into NormalTemperatureStream;
transform json
define stream InputStream(jsonString string);
from InputStream
select json:toObject(jsonString) as jsonObj
insert into PersonalDetails;
-- json:toObject(string) json->object
from PersonalDetails
select jsonObj,
json:getString(jsonObj,'$.name') as name,
json:isExists(jsonObj, '$.salary') as isSalaryAvailable,
json:toString(jsonObj) as jsonString
-- json:getString(object,"$.item") 返回item名对应的值
-- json:isExists(object,"$.item") 判断item名对应的值是否存在
-- json:toString(object) 返回object对应的字符串
insert into OutputStream;
from OutputStream[isSalaryAvailable == false]
select
json:setElement(jsonObj, '$', 0f, 'salary') as jsonObj
-- json:setElement(object,'$',value,'item') 将item的值赋为value,添加到object
insert into PreprocessedStream;
{
"name" : "siddhi.user",
"address" : {
"country": "Sri Lanka",
},
"contact": "+9xxxxxxxx"
}
OutputStream
[ {"address":{"country":"Sri Lanka"},"contact":"+9xxxxxxxx","name":"siddhi.user"}, siddhi.user, false,
"{\"name\" : \"siddhi.user\", \"address\" : { \"country\": \"Sri Lanka\", }, \"contact\": \"+9xxxxxxxx\"}"]
PreprocessedStream
{
"name" : "siddhi.user",
"salary": 0.0
"address" : {
"country": "Sri Lanka",
},
"contact": "+9xxxxxxxx"
}
sliding time
define stream TemperatureStream
(sensorId string, temperature double);
@info(name = 'Overall-analysis')
from TemperatureStream#window.time(1 min)
# window.time(time_gap) 滑动窗口的时间间隔是1分钟以内
select avg(temperature) as avgTemperature,
max(temperature) as maxTemperature,
count() as numberOfEvents
insert all events into OverallTemperatureStream;
-- avg(number)
-- max(number)
-- count()
-- insert all events into stream 在时间间隔内添加所有事件到stream中,并且过期后删除
@info(name = 'SensorId-analysis')
from TemperatureStream#window.time(30 sec)
select sensorId,
avg(temperature) as avgTemperature,
min(temperature) as maxTemperature
group by sensorId
having avgTemperature > 20.0
-- group by id having 条件
insert into SensorIdTemperatureStream;
-- 筛选后加入到滑动窗口中
stream#method 可以看做是这个method限制了stream的行为,比如去重、滑动窗口聚合等。
batch (tumbling) time
define stream TemperatureStream
(sensorId string, temperature double);
@info(name = 'Overall-analysis')
from TemperatureStream#window.timeBatch(1 min)
select avg(temperature) as avgTemperature,
max(temperature) as maxTemperature,
count() as numberOfEvents
-- stream#window.timeBatch(time_gap) 每time_gap时间视作一个batch,从第一个到达的event开始
insert into OverallTemperatureStream;
@info(name = 'SensorId-analysis')
from TemperatureStream#window.timeBatch(30 sec, 0)
-- stream#window.timeBatch(time_gap,start_timepoint) 每time_gap时间视作一个batch,从开始时刻开始
select sensorId,
avg(temperature) as avgTemperature,
min(temperature) as maxTemperature
group by sensorId
having avgTemperature > 20.0
insert into SensorIdTemperatureStream;
window:time()和window:timeBatch()的区别在于,前者是从时间间隔开始就开始写入stream,后者是时间间隔结束后写入stream。
sliding event count
define stream TemperatureStream
(sensorId string, temperature double);
@info(name = 'Overall-analysis')
from TemperatureStream#window.length(4)
-- stream#window.length(len) 滑动窗口受长度限制,聚合最后len个事件
select avg(temperature) as avgTemperature,
max(temperature) as maxTemperature,
count() as numberOfEvents
insert into OverallTemperatureStream;
@info(name = 'SensorId-analysis')
from TemperatureStream#window.length(5)
select sensorId,
avg(temperature) as avgTemperature,
min(temperature) as maxTemperature
group by sensorId
having avgTemperature >= 20.0
insert into SensorIdTemperatureStream;
batch(tumbling) event count
define stream TemperatureStream
(sensorId string, temperature double);
@info(name = 'Overall-analysis')
from TemperatureStream#window.lengthBatch(4)
-- stream#window.lengthBatch(len) 滑动窗口受长度限制,聚合最后len个事件为batch
select avg(temperature) as avgTemperature,
max(temperature) as maxTemperature,
count() as numberOfEvents
insert into OverallTemperatureStream;
@info(name = 'SensorId-analysis')
from TemperatureStream#window.lengthBatch(5)
select sensorId,
avg(temperature) as avgTemperature,
min(temperature) as maxTemperature
group by sensorId
having avgTemperature >= 20.0
insert into SensorIdTemperatureStream;
session
define stream PurchaseStream
(userId string, item string, price double);
@info(name = 'Session-analysis')
from PurchaseStream#window.session(1 min, userId)
# stream#window.session(time_gap,session_id) 通过id聚合事件,在time_gap时间内
select userId,
count() as totalItems,
sum(price) as totalPrice
group by userId
insert into UserIdPurchaseStream;
@info(name = 'Session-analysis-with-late-event-arrivals')
from PurchaseStream#window.session(1 min, userId, 20 sec)
-- 在基于 userId 的 sessionwindow 上聚合事件,会话间隔为 1 分钟,允许延迟 20 秒以捕获延迟到达。
select userId,
count() as totalItems,
sum(price) as totalPrice
group by userId
insert into OutOfOrderUserIdPurchaseStream;
named window
define stream TemperatureStream
(sensorId string, temperature double);
define window OneMinTimeWindow
(sensorId string, temperature double) time(1 min) ;
-- define window name() time_type(time_gap)
@info(name = 'Insert-to-window')
from TemperatureStream
insert into OneMinTimeWindow;
@info(name = 'Min-max-analysis')
from OneMinTimeWindow
select min(temperature) as minTemperature,
max(temperature) as maxTemperature
insert into MinMaxTemperatureOver1MinStream;
@info(name = 'Per-sensor-analysis')
from OneMinTimeWindow
select sensorId,
avg(temperature) as avgTemperature
group by sensorId
insert into AvgTemperaturePerSensorStream;
stream join
define stream TemperatureStream
(roomNo string, temperature double);
define stream HumidityStream
(roomNo string, humidity double);
@info(name = 'Equi-join')
from TemperatureStream#window.unique:time(roomNo, 1 min) as t
join HumidityStream#window.unique:time(roomNo, 1 min) as h
on t.roomNo == h.roomNo
-- stream#window.unique:time(id,time_gap) 只留下时间间隔内唯一的事件
-- stream join
select t.roomNo, t.temperature, h.humidity
insert into TemperatureHumidityStream;
@info(name = 'Join-on-temperature')
from TemperatureStream as t
left outer join HumidityStream#window.time(1 min) as h
on t.roomNo == h.roomNo
-- stream t 左外连接 stream h 1分钟以内的事件
select t.roomNo, t.temperature, h.humidity
insert into EnrichedTemperatureStream;
partition events by value
define stream LoginStream
( userID string, loginSuccessful bool);
@purge(enable='true', interval='10 sec',
idle.period='1 hour')
partition with ( userID of LoginStream )
-- @purge 每10秒检查一下 1小时之内没有收到过事件的分区实例 并清除
-- partition 按照userID分区
begin
@info(name='Aggregation-query')
from LoginStream#window.length(3)
select userID, loginSuccessful, count() as attempts
group by loginSuccessful
insert into #LoginAttempts;
-- #LoginAttempts 是partition的内部stream,只能在内部访问
@info(name='Alert-query')
from #LoginAttempts[loginSuccessful==false and attempts==3]
select userID, "3 consecutive login failures!" as message
insert into UserSuspensionStream;
end;
scatter and gather(String)
define stream PurchaseStream
(userId string, items string, store string);
@info(name = 'Scatter-query')
from PurchaseStream#str:tokenize(items, ',', true)
select userId, token as item, store
insert into TokenizedItemStream;
-- str:tokenize(items,seperator,boolean) items用分隔符连接
@info(name = 'Transform-query')
from TokenizedItemStream
select userId, str:concat(store, "-", item) as itemKey
insert into TransformedItemStream;
-- str:concat()
@info(name = 'Gather-query')
from TransformedItemStream#window.batch()
select userId, str:groupConcat(itemKey, ",") as itemKeys
insert into GroupedPurchaseItemStream;
-- str:groupConcat(items,seperator) items用分隔符连接
scatter and gather(json)
define stream PurchaseStream
(order string, store string);
@info(name = 'Scatter-query')
from PurchaseStream#json:tokenize(order, '$.order.items')
-- json:tokenize(json,'$.name') 访问name下的值,分隔开
select json:getString(order, '$.order.id') as orderId, -- 拿出order.id
jsonElement as item,
store
insert into TokenizedItemStream;
@info(name = 'Transform-query')
from TokenizedItemStream
select orderId,
ifThenElse(json:getString(item, 'name') == "cake",
json:toString(
json:setElement(item, 'price',
json:getDouble(item, 'price') - 5
)
),
item) as item,
store
insert into DiscountedItemStream;
@info(name = 'Gather-query')
from DiscountedItemStream#window.batch()
select orderId, json:group(item) as items, store
insert into GroupedItemStream;
-- json:group(item) 返回一个item json数组
@info(name = 'Format-query')
from GroupedItemStream
select str:fillTemplate("""
{"discountedOrder":
{"id":"{{1}}", "store":"{{3}}", "items":{{2}} }
}""", orderId, items, store) as discountedOrder
insert into DiscountedOrderStream;
-- str:fillTemplate({{n}}) 类似于format()的占位符 返回一个拼好的string
simple pattern 简单模式 一个或多个事件随时间到达
define stream TemperatureStream(roomNo int, temp double);
@sink(type = 'log')
define Stream HighTempAlertStream(roomNo int,
initialTemp double, finalTemp double);
@info(name='temperature-increase-identifier')
from every( e1 = TemperatureStream ) ->
e2 = TemperatureStream[ e1.roomNo == roomNo
and (e1.temp + 5) <= temp ]
within 10 min
select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp
insert into HighTempAlertStream;
count pattern 计数模式 匹配相同条件收到的多个事件
define stream TemperatureStream (sensorID long, roomNo int,
temp double);
define stream RegulatorStream (deviceID long, roomNo int,
tempSet double, isOn bool);
@sink(type = 'log')
define stream TemperatureDiffStream(roomNo int,
tempDiff double);
from every( e1 = RegulatorStream)
-> e2 = TemperatureStream[e1.roomNo == roomNo] < 1: >
-> e3 = RegulatorStream[e1.roomNo == roomNo]
select e1.roomNo, e2[0].temp - e2[last].temp as tempDiff
insert into TemperatureDiffStream;
logical pattern 逻辑模式 匹配时间到达的顺序和逻辑关系
define stream RegulatorStateChangeStream(deviceID long,
roomNo int, tempSet double, action string);
define stream RoomKeyStream(deviceID long, roomNo int,
action string);
@sink(type='log')
define stream RegulatorActionStream(roomNo int, action string);
from every e1=RegulatorStateChangeStream[ action == 'on' ]
-> e2=RoomKeyStream
[ e1.roomNo == roomNo and action == 'removed' ]
or e3=RegulatorStateChangeStream
[ e1.roomNo == roomNo and action == 'off']
select e1.roomNo,
ifThenElse( e2 is null, 'none', 'stop' ) as action
having action != 'none'
insert into RegulatorActionStream;
non-concurrence pattern 非发生模式 检测事件的发生和过期
define stream RegulatorStateChangeStream(deviceID long,
roomNo int, tempSet double, action string);
define stream TemperatureStream (roomNo int, temp double);
@sink(type='log')
define stream RoomTemperatureAlertStream(roomNo int);
from e1=RegulatorStateChangeStream[action == 'on']
-> not TemperatureStream[e1.roomNo == roomNo and
temp <= e1.tempSet] for 30 sec
select e1.roomNo as roomNo
insert into RoomTemperatureAlertStream;
simple sequence 简单序列 匹配按时间到达的连续事件
define stream StockRateStream (symbol string, price float,
volume int);
@sink(type='log')
define stream PeakStockRateStream (symbol string,
rateAtPeak float);
partition with (symbol of StockRateStream)
begin
from every e1=StockRateStream,
e2=StockRateStream[e1.price < price],
e3=StockRateStream[e2.price > price]
within 10 min
select e1.symbol, e2.price as rateAtPeak
insert into PeakStockRateStream ;
end;
sequence with count 按计数序列排序 可以计数的序列
define stream TemperatureStream(roomNo int, temp double);
@sink(type='log')
define stream PeekTemperatureStream(roomNo int,
initialTemp double, peekTemp double, firstDropTemp double);
partition with (roomNo of TemperatureStream)
begin
@info(name = 'temperature-trend-analyzer')
from every e1=TemperatureStream,
e2=TemperatureStream[ifThenElse(e2[last].temp is null,
e1.temp <= temp, e2[last].temp <= temp)]+,
e3=TemperatureStream[e2[last].temp > temp]
select e1.roomNo, e1.temp as initialTemp,
e2[last].temp as peekTemp, e3.temp as firstDropTemp
insert into PeekTemperatureStream ;
end;
logical sequence 逻辑序列 匹配重复事件序列 并用逻辑排序
define stream TempSensorStream(deviceID long,
isActive bool);
define stream HumidSensorStream(deviceID long,
isActive bool);
define stream RegulatorStream(deviceID long, isOn bool);
@sink(type='log')
define stream StateNotificationStream (deviceID long,
tempSensorActive bool, humidSensorActive bool);
from every e1=RegulatorStream[isOn == true],
e2=TempSensorStream and e3=HumidSensorStream
select e1.deviceID, e2.isActive as tempSensorActive,
e3.isActive as humidSensorActive
insert into StateNotificationStream;
HTTP Service Integration
@sink(type='http-call',
publisher.url='http://localhost:8005/validate-loan',
method='POST', sink.id='loan-validation',
@map(type='json'))
define stream LoanValidationStream (clientId long,
name string, amount double);
-- 生产者 http-call post方式在这个URL上广播json数据
@source(type='http-call-response', sink.id='loan-validation',
http.status.code='2\d+',
@map(type='json', @attributes(customerName='trp:name',
clientId='trp:clientId', loanAmount='trp:amount',
interestRate='validation-response.rate',
totalYears='validation-response.years-approved')))
define stream SuccessLoanRequestStream(clientId long,
customerName string, loanAmount double,
interestRate double, totalYears int);
-- 消费者 http-call-response 只满足状态码是2开头的情况
@source(type='http-call-response', sink.id='loan-validation',
http.status.code='400',
@map(type='json', @attributes(customerName='trp:name',
clientId='trp:clientId',
failureReason='validation-response.reason')))
define stream FailureLoanRequestStream(clientId long,
customerName string, failureReason string);
-- 消费者 http-call-response 只满足状态码是400的情况
define stream LoanRequestStream (clientId long, name string,
amount double);
@sink(type='log')
define stream LoanResponseStream(clientId long,
customerName string, message string);
-- 生产者 log
@info(name = 'attribute-projection')
from LoanRequestStream
select clientId, name, amount
insert into LoanValidationStream;
@info(name = 'successful-message-generator')
from SuccessLoanRequestStream
select clientId, customerName,
"Loan Request is accepted for processing" as message
insert into LoanResponseStream;
@info(name = 'failure-message-generator')
from FailureLoanRequestStream
select clientId, customerName,
str:concat("Loan Request is rejected due to ",
failureReason) as message
insert into LoanResponseStream;
分别定义了http响应情况是正常和400的情况的消费者,通过json验证,结果以log的形式广播
gRPC service integration
define stream TicketBookingStream (name string, phoneNo string,
movie string, ticketClass string, qty int,
bookingTime long);
@sink(type='grpc-call',
publisher.url =
'grpc://localhost:5003/org.wso2.grpc.EventService/process',
sink.id= 'ticket-price', @map(type='json'))
define stream TicketPriceFinderStream (name string,
phoneNo string, movie string, ticketClass string,
qty int, bookingTime long);
-- 生产者
@source(type='grpc-call-response',
receiver.url =
'grpc://localhost:9763/org.wso2.grpc.EventService/process',
sink.id= 'ticket-price',
@map(type='json', @attributes(customerName='trp:name',
phoneNo='trp:phoneNo', movie='trp:movie',
qty='trp:qty', bookingTime='trp:bookingTime',
ticketPrice='price')))
-- 消费者
define stream TicketPriceResponseStream (customerName string,
phoneNo string, movie string, qty int,
ticketPrice double, bookingTime long);
@sink(type='log')
define stream TotalTicketPaymentStream (customerName string,
phoneNo string, movie string, totalAmount double,
bookingTime long);
-- log 生产者
@info(name = 'filter-basic-ticket-bookings')
from TicketBookingStream[ticketClass == "BASIC"]
select name as customerName, phoneNo, movie,
qty * 20.0 as totalAmount, bookingTime
insert into TotalTicketPaymentStream;
@info(name = 'filter-non-basic-tickets')
from TicketBookingStream[ticketClass != "BASIC"]
select *
insert into TicketPriceFinderStream;
@info(name = 'total-price-calculator')
from TicketPriceResponseStream
select customerName, phoneNo, movie,
(qty * ticketPrice) as totalAmount, bookingTime
insert into TotalTicketPaymentStream;
rate limit based on time
define stream APIRequestStream (apiName string, version string,
tier string, user string, userEmail string);
define stream UserNotificationStream (user string,
apiName string, version string, tier string,
userEmail string, throttledCount long);
@info(name='api-throttler')
from APIRequestStream#window.timeBatch(1 min, 0, true)
select apiName, version, user, tier, userEmail,
count() as totalRequestCount
group by apiName, version, user
having totalRequestCount == 3 or totalRequestCount == 0
insert all events into ThrottledStream;
@info(name='throttle-flag-generator')
from ThrottledStream
select apiName, version, user, tier, userEmail,
ifThenElse(totalRequestCount == 0, false, true)
as isThrottled
insert into ThrottleOutputStream;
@info(name='notification-generator')
from ThrottleOutputStream[isThrottled]#window.time(1 hour)
select user, apiName, version, tier, userEmail,
count() as throttledCount
group by user, apiName, version, tier
having throttledCount > 2
output first every 15 min
insert into UserNotificationStream;
logging
define stream GlucoseReadingStream (locationRoom string,
locationBed string, timeStamp string, sensorID long,
patientFirstName string, patientLastName string,
sensorValue double);
@sink(type = 'http', on.error='log',
publisher.url = "http://localhost:8080/logger",
method = "POST",
@map(type = 'json'))
-- log 报错
define stream AbnormalGlucoseReadingStream
(timeStampInLong long, locationRoom string,
locationBed string, sensorID long,
patientFullName string, sensorReadingValue double);
@info(name='abnormal-reading-identifier')
from GlucoseReadingStream[sensorValue > 220]
select math:parseLong(timeStamp) as timeStampInLong,
locationRoom, locationBed, sensorID,
str:concat(patientFirstName, " ", patientLastName)
as patientFullName,
sensorValue as sensorReadingValue
insert into AbnormalGlucoseReadingStream;
wait&retry
define stream GlucoseReadingStream (locationRoom string,
locationBed string, timeStamp string, sensorID long,
patientFirstName string, patientLastName string,
sensorValue double);
@sink(type = 'http', on.error='wait',
publisher.url = "http://localhost:8080/logger",
method = "POST",
@map(type = 'json'))
-- 遇到错误等待 退出并重试
-- on.error 配置处理错误的解决方法
define stream AbnormalGlucoseReadingStream
(timeStampInLong long, locationRoom string,
locationBed string, sensorID long,
patientFullName string, sensorReadingValue double);
@info(name='abnormal-reading-identifier')
from GlucoseReadingStream[sensorValue > 220]
select math:parseLong(timeStamp) as timeStampInLong,
locationRoom, locationBed, sensorID,
str:concat(patientFirstName, " ", patientLastName)
as patientFullName,
sensorValue as sensorReadingValue
insert into AbnormalGlucoseReadingStream;
前面一篇关于智能合约翻译文讲到了,是一种计算机程序,既然是程序,那就可以使用程序语言去编写智能合约了。而若想玩区块链上的项目,大部分区块链项目都是开源的,能看得懂智能合约代码,或找出其中的漏洞,那么,学习Solidity这门高级的智能合约语言是有必要的,当然,这都得在公链``````以太坊上,毕竟国内的联盟链有些是不兼容Solidity。Solidity是一种面向对象的高级语言,用于实现智能合约。智能合约是管理以太坊状态下的账户行为的程序。Solidity是运行在以太坊(Ethereum)虚拟机(EVM)上,其语法受到了c++、python、javascript影响。Solidity是静态类型
Halo,这里是Ppeua。平时主要更新C语言,C++,数据结构算法,Linux…感兴趣就关注我吧!你定不会失望。目录1.ls显示当前目录下的文件内内容2.pwd-显示用户当前所在的目录3.cd-改变工作目录。将当前工作目录改变到指定的目录下1.cd-回到上一次待的工作空间2.cd..返回上一层目录1.相对路径:cd../aurora2.绝对路径:cd/home/aurora/lesson1/aurora3.cd~进入用户家目录4.cd/进入root目录4.mkdir-新建目录5.rmdir/rm-删除1.rmdir删除空文件夹2.rm删除1.rm-f2.rm-i3.rm-r1.ls显示当前目
文章目录问题B:芝华士威士忌和他的小猫咪们代码&注释问题C:愿我的弹雨能熄灭你们的痛苦代码注释问题D:猜糖果游戏代码注释问题E:有趣的次方代码注释问题F:这是一个简单题代码&注释问题G:打印矩阵代码注释问题H:scz的简单考验代码注释问题I:完美区间代码&注释问题J:是狂热的小迷妹一枚吖~代码&注释2022年10月23日周赛ZZULIOJ问题B:芝华士威士忌和他的小猫咪们时间限制:1Sec内存限制:128MB题目描述芝华士威士忌很喜欢带着他的猫咪们一块跑着玩。但是小猫咪们很懒,只有在离他y米以内才愿意和他一块跑。这天他在坐标为x的位置,他想和他的猫咪们一块跑着玩。有n个小猫咪,第i个小猫咪在坐
这篇文章,主要介绍如何使用SpringCloud微服务组件从0到1搭建一个微服务工程。目录一、从0到1搭建微服务工程1.1、基础环境说明(1)使用组件(2)微服务依赖1.2、搭建注册中心(1)引入依赖(2)配置文件(3)启动类1.3、搭建配置中心(1)引入依赖(2)配置文件(3)启动类1.4、搭建API网关(1)引入依赖(2)配置文件(3)启动类1.5、搭建服务提供者(1)引入依赖(2)配置文件(3)启动类1.6、搭建服务消费者(1)引入依赖(2)配置文件(3)启动类1.7、运行测试一、从0到1搭建微服务工程1.1、基础环境说明(1)使用组件这里主要是使用的SpringCloudNetflix
我在Twitter上找到了以下代码片段(查看帖子历史以获取来源)。[5]pry(main)>Date.parse('3Dogs')ArgumentError:invaliddate[6]pry(main)>Date.parse('23Dogs')=>Mon,23Nov2015这只是一个偷偷摸摸的彩蛋吗?如果是这样,为什么这个特定的日期和结果?如果不是彩蛋,为什么23Dogs解析为日期,但3Dogs不解析? 最佳答案 这与Pry无关。如果您检查Date::parse的文档你会看到,“如果可选的第二个参数[comp]为真[默认值]并且检
我尝试安装RubyVersionManager(RVM)但没有成功。如何解决我遇到的这个错误?ubuntu@ip-172-31-0-20:/tmp$rvminstallruby-1.9.3-p484Searchingforbinaryrubies,thismighttakesometime.Nobinaryrubiesavailablefor:ubuntu/14.04/x86_64/ruby-1.9.3-p484.Continuingwithcompilation.Pleaseread'rvmhelpmount'togetmoreinformationonbinaryrubies.Ch
gtest是Google开发的一个开源单元测试框架,代码提供丰富的注释和实例,参考实际用例可以很快上手基本单元测试,丰富的代码注释能够让有兴趣的开发者深入了解gtest的代码结构并做部分针对性的二次开发。gtest主要针对c/c++提供了针对函数接口和类方法丰富测试方法,针对单元测试特有的数据或者代码反复编写的这种特性做了集成和优化,满足当前绝大部分对于单元测试的需求。其有如下特点:自动收集测试用例,无需开发者再次组织提供强大的断言集,支持包括布尔、整型、浮点型、字符串等。提供断言方法自定义扩展提供死亡测试功能使用参数化自动生成多个相似的测试用例可以将公共的用例初始化和清理工作放入测试夹具中,
作者:翟天保Steven版权声明:著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处一、设计模式是什么? 设计模式是为了解决在软件开发过程中遇到的某些问题而形成的思想。同一场景有多种设计模式可以应用,不同的模式有各自的优缺点,开发者可以基于自身需求选择合适的设计模式,去解决相应的工程难题。 良好的软件设计和架构,可以让代码具备良好的可读性、可维护性、可扩展性、可复用性,让整个系统具备较强的鲁棒性和性能,减少屎山代码出现的概率。 想要熟练运用设计模式,提高自己的编程能力和架构能力,只有在自己工作中,结合自身工作内容,多思考多实践。本文只能通过举一些通俗的例子,来
一鸿蒙简介HarmonyOS是一款面向万物互联时代的、全新的分布式操作系统。在传统的单设备系统能力基础上,HarmonyOS提出了基于同一套系统能力、适配多种终端形态的分布式理念,能够支持手机、平板、智能穿戴、智慧屏、车机等多种终端设备,提供全场景(移动办公、运动健康、社交通信、媒体娱乐等)业务能力。HarmonyOS提供了支持多种开发语言的API,供开发者进行应用开发。支持的开发语言包括Java、XML(ExtensibleMarkupLanguage)、C/C++、JS(JavaScript)、CSS(CascadingStyleSheets)和HML(HarmonyOSMarkupLan
本文章仅供学习 一、青龙面板是什么?青龙面板可以运行某东脚本,你在某宝、某度等各个渠道搜索“京东代挂”,都是用青龙面板。二、搭建宝塔面板1.更新yum包首先下载finalshell通过账号密码连接服务器,然后输入yumupdate-y2.拉取宝塔面板镜像并安装#CentOS7以上:yuminstall-ywget&&wget-Oinstall.shhttp://download.bt.cn/install/install_6.0.sh&&shinstall.sh#CentOS7以下:#yuminstall-ywget&&wget-Oinstall.shhttp://download.bt.c