草庐IT

Flink实战,实时流量统计 TOPN访问URL

马鹏飞 2023-03-28 原文
跟 https://blog.51cto.com/mapengfei/2580330 类似场景,来从Nginx、Apache等web服务器的日志中读取数据,实时统计出来访问热度最高的TOPN访问URL,并且要确保数据乱序的处理,lag等情况下,还要确认数据的准确性

目标:

从log文件中读取数据(也可以参考上一篇从kakfa中),取http 的method为get的请求,并且把静态文件访问过滤掉,进行实时统计 实现: 1、读取文件 2、做过滤,method=get url不为静态信息 3、生成一个滑动窗口,大小10分钟,每次滑动5s,watermask 5s(为了保险允许数据延迟,allowedLateness 1分钟) 4、进行聚合统计分析排序输出

准备日志文件:

在resource目录下生成一个nginx.log里面内容: 1.1.1.1 - - 23/03/2020:05:06:03 GET /mapengfei/2580330 1.1.1.1 - - 23/03/2020:05:06:05 GET /mapengfei/2572888 1.1.1.3 - - 24/03/2020:05:06:05 GET /mapengfei/2572888

代码:

新建一个HotUrlAnalysis.scal的object文件

/* * * @author mafei * @date 2021/1/3 */ package com.mafei.hotUrlAnalysis import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import java.sql.Timestamp import java.text.SimpleDateFormat import scala.collection.mutable.ListBuffer // 定义要提取的数据格式 case class NginxLog(clientIp: String, userId: String,ts:Long,method:String,url:String) // 定义窗口聚合结果样例类 case class UrlViewCount(url: String, windowEnd: Long, count: Long) object HotUrlAnalysis { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //定义取事件时间 env.setParallelism(1) //防止乱序 //1、从文件中读取数据 val inputStream = env.readTextFile("/opt/java2020_study/UserBehaviorAnalysis/HotUrlAnalysis/src/main/resources/nginx.log") val dataStream = inputStream .map(data=>{ val splitResult = data.split(" ") //因为日志格式是以空格分隔的 //因为日志中格式是字符串的,我们需要的是13位毫秒的时间戳,所以需要转换下 val dateFormatConvert = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss") // 格式: 天/月/年:时:分:秒 val ts = dateFormatConvert.parse(splitResult(3)).getTime NginxLog(splitResult(0), splitResult(1),ts,splitResult(4), splitResult(5)) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[NginxLog](Time.seconds(5)) { //这里设置watermark延迟5秒 override def extractTimestamp(t: NginxLog): Long = t.ts //指定时间事件的时间戳 }) //开窗聚合,排序输出 var aggStream = dataStream .filter(_.method == "GET")//过滤下,只要method是get请求的数据 .filter(data=>{ val pattern= "^((?!\\.(css|js)$).)*$".r (pattern findFirstIn data.url).nonEmpty }) //再过滤下,像css/js之类的url不算 // .keyBy("url") //这样子写返回的是个元组类型 .keyBy(_.url) .timeWindow(Time.minutes(10), Time.seconds(5)) .allowedLateness(Time.minutes(1)) //可以watermark时间设置小一点,到时间先输出,但是窗口先不关,等到allowedLateness的时间了再关 .sideOutputLateData(new OutputTag[NginxLog]("late")) //加一个侧输出流,为了防止数据的乱序超过了1分钟 .aggregate(new PageCountAgg(),new PageViewCountResult()) val resultStream = aggStream .keyBy(_.windowEnd) //按照结束时间进行分组,收集当前窗口内的,取一定时间内的数据 .process(new TopUrl(10)) resultStream.print() aggStream.getSideOutput(new OutputTag[NginxLog]("late")).print("这是1分钟之后的延迟数据。。。。") env.execute("执行热门url访问统计") } } class PageCountAgg() extends AggregateFunction[NginxLog,Long,Long]{ override def createAccumulator(): Long = 0L override def add(in: NginxLog, acc: Long): Long = acc +1 override def getResult(acc: Long): Long = acc override def merge(acc: Long, acc1: Long): Long = acc+acc1 } class PageViewCountResult() extends WindowFunction[Long,UrlViewCount,String,TimeWindow]{ override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[UrlViewCount]): Unit = { out.collect(UrlViewCount(key,window.getEnd,input.iterator.next())) } } /** * 输入参数 * K: 排序的字段类型,这里是WindowEnd时间戳,所以是Long类型 * I: 输入的数据,是上一步PageViewCountResult输出的类型,所以是UrlViewCount * O: 输出的类型,这个自己定义,看实际情况,这里直接打印了,所以String */ class TopUrl(topN:Int) extends KeyedProcessFunction[Long,UrlViewCount,String]{ lazy val urlViewCountListState: ListState[UrlViewCount] = getRuntimeContext.getListState(new ListStateDescriptor[UrlViewCount]("urlViewCountList", classOf[UrlViewCount])) override def processElement(i: UrlViewCount, context: KeyedProcessFunction[Long, UrlViewCount, String]#Context, collector: Collector[String]): Unit = { urlViewCountListState.add(i) //把每次的结果都加到自定义的list里头,方便后边做排序 context.timerService().registerEventTimeTimer(i.windowEnd) //注册一个定时器,在窗口关闭的时候触发 } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = { //为了方便排序,定义另一个ListBuffer,保存ListState的所有数据 val allPageListBuffer: ListBuffer[UrlViewCount] = ListBuffer() val iter = urlViewCountListState.get().iterator() while (iter.hasNext){ allPageListBuffer += iter.next() } //清空ListState的数据,已经放到urlViewCountListState 准备计算了,等下次触发就应该是新的了 urlViewCountListState.clear() // 先按照count,从大到小排序,然后再取前N个 val sortItemViewCounts = allPageListBuffer.sortBy(_.count)(Ordering.Long.reverse).take(topN) //格式化输出数据: val result : StringBuilder = new StringBuilder result.append("当前窗口的结束时间:\t").append(new Timestamp(timestamp)).append("\n") //遍历结果列表中的每个ItemViewCount , 输出到一行 for(i <- sortItemViewCounts.indices){ val currentItemViewCount = sortItemViewCounts(i) result.append("Top").append(i+1).append("\t") .append("URL = ").append(currentItemViewCount.url).append("\t") .append("访问量: ").append(currentItemViewCount.count).append("\n") } result.append("---------------------------------------\n\n\n") Thread.sleep(1000) out.collect(result.toString()) } }

代码结构及输出效果:

问题点

在数据乱序的情况下,虽然能全部输出,但有2个问题点上面的代码, 一个是在TopUrl 中保存数据用的是list,在滑动窗口先到达,延迟数据过会儿到达的时候,数据会重复输出,也就是url会出现2次 第二个问题是在第二次延迟输出的时候,本来应该加上之前的数据,但是没有,而是重新从0开始计算 最终效果:

URL 出现次数 出现原因
/a 3 在5秒内统计数据输出的
/a 1 allowedLateness延迟数据达到产生的

解决办法:

从list改为map,并且因为之前每次都会清空list,可以改为等真正的窗口结束后再清空就可以了

主要改动的地方: processElement 和onTimer这2个方法

/* * * @author mafei * @date 2021/1/3 */ package com.mafei.hotUrlAnalysis import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor} import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import java.sql.Timestamp import java.text.SimpleDateFormat import scala.collection.mutable.ListBuffer /** * import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.api.java.tuple.{Tuple, Tuple1} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.util.Collector import java.sql.Timestamp import java.util.Properties import scala.collection.mutable.ListBuffer * @param clientIp * @param userId * @param ts * @param method * @param url */ // 定义要提取的数据格式 case class NginxLog2(clientIp: String, userId: String,ts:Long,method:String,url:String) // 定义窗口聚合结果样例类 case class UrlViewCount2(url: String, windowEnd: Long, count: Long) object HotUrlAnalysis2 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //定义取事件时间 env.setParallelism(1) //防止乱序 //1、从文件中读取数据 val inputStream = env.readTextFile("/opt/java2020_study/UserBehaviorAnalysis/HotUrlAnalysis/src/main/resources/nginx.log") val dataStream = inputStream .map(data=>{ val splitResult = data.split(" ") //因为日志格式是以空格分隔的 //因为日志中格式是字符串的,我们需要的是13位毫秒的时间戳,所以需要转换下 val dateFormatConvert = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss") // 格式: 天/月/年:时:分:秒 val ts = dateFormatConvert.parse(splitResult(3)).getTime NginxLog2(splitResult(0), splitResult(1),ts,splitResult(4), splitResult(5)) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[NginxLog2](Time.seconds(5)) { //这里设置watermark延迟5秒 override def extractTimestamp(t: NginxLog2): Long = t.ts //指定时间事件的时间戳 }) //开窗聚合,排序输出 var aggStream = dataStream .filter(_.method == "GET")//过滤下,只要method是get请求的数据 .filter(data=>{ val pattern= "^((?!\\.(css|js)$).)*$".r (pattern findFirstIn data.url).nonEmpty }) //再过滤下,像css/js之类的url不算 // .keyBy("url") //这样子写返回的是个元组类型 .keyBy(_.url) .timeWindow(Time.minutes(10), Time.seconds(5)) .allowedLateness(Time.minutes(1)) //可以watermark时间设置小一点,到时间先输出,但是窗口先不关,等到allowedLateness的时间了再关 .sideOutputLateData(new OutputTag[NginxLog2]("late")) //加一个侧输出流,为了防止数据的乱序超过了1分钟 .aggregate(new PageCountAgg2(),new PageViewCountResult2()) val resultStream = aggStream .keyBy(_.windowEnd) //按照结束时间进行分组,收集当前窗口内的,取一定时间内的数据 .process(new TopUrl2(10)) resultStream.print() aggStream.getSideOutput(new OutputTag[NginxLog2]("late")).print("这是1分钟之后的延迟数据。。。。") env.execute("执行热门url访问统计") } } class PageCountAgg2() extends AggregateFunction[NginxLog2,Long,Long]{ override def createAccumulator(): Long = 0L override def add(in: NginxLog2, acc: Long): Long = acc +1 override def getResult(acc: Long): Long = acc override def merge(acc: Long, acc1: Long): Long = acc+acc1 } class PageViewCountResult2() extends WindowFunction[Long,UrlViewCount2,String,TimeWindow]{ override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[UrlViewCount2]): Unit = { out.collect(UrlViewCount2(key,window.getEnd,input.iterator.next())) } } /** * 输入参数 * K: 排序的字段类型,这里是WindowEnd时间戳,所以是Long类型 * I: 输入的数据,是上一步PageViewCountResult2输出的类型,所以是UrlViewCount2 * O: 输出的类型,这个自己定义,看实际情况,这里直接打印了,所以String */ class TopUrl2(topN:Int) extends KeyedProcessFunction[Long,UrlViewCount2,String]{ lazy val UrlViewCount2MapState: MapState[String,Long] = getRuntimeContext.getMapState(new MapStateDescriptor[String,Long]("UrlViewCount2Map",classOf[String],classOf[Long])) override def processElement(i: UrlViewCount2, context: KeyedProcessFunction[Long, UrlViewCount2, String]#Context, collector: Collector[String]): Unit = { UrlViewCount2MapState.put(i.url,i.count) context.timerService().registerEventTimeTimer(i.windowEnd) //注册一个定时器,在窗口关闭的时候触发 //再另外注册一个定时器,1分钟之后触发,这时窗口已经彻底关闭,不再有聚合结果输出,可以清空状态 context.timerService().registerEventTimeTimer(i.windowEnd + 60000L) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount2, String]#OnTimerContext, out: Collector[String]): Unit = { /** * 使用mapState的方式 */ //判断定时器触发时间,如果已经是窗口结束时间1分钟之后,那么直接清空状态 if(timestamp == ctx.getCurrentKey+60000L){ UrlViewCount2MapState.clear() return } val allUrlViewCount2sBuffer: ListBuffer[(String,Long)] = ListBuffer() val iter = UrlViewCount2MapState.entries().iterator() while (iter.hasNext){ val entry = iter.next() allUrlViewCount2sBuffer += ((entry.getKey, entry.getValue)) } // 先按照count,从大到小排序,然后再取前N个 val sortItemViewCounts = allUrlViewCount2sBuffer.sortBy(_._2)(Ordering.Long.reverse).take(topN) //格式化输出数据: val result : StringBuilder = new StringBuilder result.append("当前窗口的结束时间:\t").append(new Timestamp(timestamp)).append("\n") //遍历结果列表中的每个ItemViewCount , 输出到一行 for(i <- sortItemViewCounts.indices){ val currentItemViewCount = sortItemViewCounts(i) result.append("Top").append(i+1).append("\t") .append("URL = ").append(currentItemViewCount._1).append("\t") .append("访问量: ").append(currentItemViewCount._2).append("\n") } result.append("---------------------------------------\n\n\n") Thread.sleep(1000) out.collect(result.toString()) } }

有关Flink实战,实时流量统计 TOPN访问URL的更多相关文章

  1. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  2. ruby-on-rails - 在混合/模块中覆盖模型的属性访问器 - 2

    我有一个包含模块的模型。我想在模块中覆盖模型的访问器方法。例如:classBlah这显然行不通。有什么想法可以实现吗? 最佳答案 您的代码看起来是正确的。我们正在毫无困难地使用这个确切的模式。如果我没记错的话,Rails使用#method_missing作为属性setter,因此您的模块将优先,阻止ActiveRecord的setter。如果您正在使用ActiveSupport::Concern(参见thisblogpost),那么您的实例方法需要进入一个特殊的模块:classBlah

  3. ruby-on-rails - rails : save file from URL and save it to Amazon S3 - 2

    从给定URL下载文件并立即将其上传到AmazonS3的更直接的方法是什么(+将有关文件的一些信息保存到数据库中,例如名称、大小等)?现在,我既不使用Paperclip,也不使用Carrierwave。谢谢 最佳答案 简单明了:require'open-uri'require's3'amazon=S3::Service.new(access_key_id:'KEY',secret_access_key:'KEY')bucket=amazon.buckets.find('image_storage')url='http://www.ex

  4. ruby - 续集在添加关联时访问many_to_many连接表 - 2

    我正在使用Sequel构建一个愿望list系统。我有一个wishlists和itemstable和一个items_wishlists连接表(该名称是续集选择的名称)。items_wishlists表还有一个用于facebookid的额外列(因此我可以存储opengraph操作),这是一个NOTNULL列。我还有Wishlist和Item具有续集many_to_many关联的模型已建立。Wishlist类也有:selectmany_to_many关联的选项设置为select:[:items.*,:items_wishlists__facebook_action_id].有没有一种方法可以

  5. ruby - 如何使用 Ruby aws/s3 Gem 生成安全 URL 以从 s3 下载文件 - 2

    我正在编写一个小脚本来定位aws存储桶中的特定文件,并创建一个临时验证的url以发送给同事。(理想情况下,这将创建类似于在控制台上右键单击存储桶中的文件并复制链接地址的结果)。我研究过回形针,它似乎不符合这个标准,但我可能只是不知道它的全部功能。我尝试了以下方法:defauthenticated_url(file_name,bucket)AWS::S3::S3Object.url_for(file_name,bucket,:secure=>true,:expires=>20*60)end产生这种类型的结果:...-1.amazonaws.com/file_path/file.zip.A

  6. ruby-on-rails - Ruby url 到 html 链接转换 - 2

    我正在使用Rails构建一个简单的聊天应用程序。当用户输入url时,我希望将其输出为html链接(即“url”)。我想知道在Ruby中是否有任何库或众所周知的方法可以做到这一点。如果没有,我有一些不错的正则表达式示例代码可以使用... 最佳答案 查看auto_linkRails提供的辅助方法。这会将所有URL和电子邮件地址变成可点击的链接(htmlanchor标记)。这是文档中的代码示例。auto_link("Gotohttp://www.rubyonrails.organdsayhellotodavid@loudthinking.

  7. ruby-on-rails - 如何生成传递一些自定义参数的 `link_to` URL? - 2

    我正在使用RubyonRails3.0.9,我想生成一个传递一些自定义参数的link_toURL。也就是说,有一个articles_path(www.my_web_site_name.com/articles)我想生成如下内容:link_to'Samplelinktitle',...#HereIshouldimplementthecode#=>'http://www.my_web_site_name.com/articles?param1=value1¶m2=value2&...我如何编写link_to语句“alàRubyonRailsWay”以实现该目的?如果我想通过传递一些

  8. ruby - Rack:如何将 URL 存储为变量? - 2

    我正在编写一个简单的静态Rack应用程序。查看下面的config.ru代码:useRack::Static,:urls=>["/elements","/img","/pages","/users","/css","/js"],:root=>"archive"map'/'dorunProc.new{|env|[200,{'Content-Type'=>'text/html','Cache-Control'=>'public,max-age=6400'},File.open('archive/splash.html',File::RDONLY)]}endmap'/pages/search.

  9. ruby-on-rails - Rails - 使用/自定义 URL : '/dashboard' 指定根路径 - 2

    如何使此根路径转到:“/dashboard”而不仅仅是http://example.com?root:to=>'dashboard#index',:constraints=>lambda{|req|!req.session[:user_id].blank?} 最佳答案 您可以通过以下方式实现:root:to=>redirect('/dashboard')match'/dashboard',:to=>"dashboard#index",:constraints=>lambda{|req|!req.session[:user_id].b

  10. 微信小程序开发入门与实战(Behaviors使用) - 2

    @作者:SYFStrive @博客首页:HomePage📜:微信小程序📌:个人社区(欢迎大佬们加入)👉:社区链接🔗📌:觉得文章不错可以点点关注👉:专栏连接🔗💃:感谢支持,学累了可以先看小段由小胖给大家带来的街舞👉微信小程序(🔥)目录自定义组件-behaviors    1、什么是behaviors    2、behaviors的工作方式    3、创建behavior    4、导入并使用behavior    5、behavior中所有可用的节点    6、同名字段的覆盖和组合规则总结最后自定义组件-behaviors    1、什么是behaviorsbehaviors是小程序中,用于实现

随机推荐