草庐IT

第六章 Flink中的时间和窗口

井底蛙蛙呱呱呱 2023-09-28 原文

时间语义


上图是数据流式处理过程,涉及到两个重要的时间点:事件时间(Event Time)和处理时间(Processing Time)。

  • 事件时间(Event Time):即数据产生的时间;
  • 处理时间(Processing Time):即数据真正被处理的时刻;

我们在处理数据时,以哪种时间作为衡量标准,就是所谓的时间语义问题(Notions of Time)。由于分布式系统中网络传输的延迟和时钟漂移,处理时间相对事件发生的时间会有滞后。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要用另外的标志来表示事件时间进展,在 Flink 中把它叫作事件时间的“水位线”(Watermarks)。

水位线(Watermark)

我们把时钟也数据的形式传递出去,告诉下游任务当前时间的进展;而且这个时钟的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接到广播下游,当下游任务收到这个标记,就可以更新自己的时钟了。由于类似流水中用来当做标志的几号,在Flink中,这种用来衡量事件时间(Event Time)进展的标记,就被称作水位线(Watermark)。

水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而他插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。


理想的水位线是有序的,但是现实中由于不可控因素常常会有少量乱序的数据。




周期性生成时间戳,保存区间最大值

水位线代表当前事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢失数据,这一点对于乱序流的正确处理非常重要。水位线的特性:

  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据;
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展;
  • 水位显示基于数据的时间戳生成的;
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进;
  • 水位线可以通过设置延迟,以保证正确处理乱序数据;
  • 一个水位线(Watermark)t表示在当前流中事件时间已经到达了时间戳t,这导表t之前的所有数据都到齐了,之后流中不会出现时间戳t'<t的数据;

水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。

如何生成水位线

在生成水位线的时候,如果希望计算结果更准确,可以将水位线延迟设置得更高一些,等待时间越长,越不容易漏掉数据,但是这样时效性降低了。而如果将等待时间设置过短则会遗漏掉部分数据,虽然Flink提供了处理迟到数据的方法,但是需要分开处理。因此如何设置延迟是一个需要根据实际情况权衡的问题。

在Flink的DataStream API中,有一个单独用于生成水位线的方法:assignTimestampAndWatermarks(),他主要用来为流中的数据分配时间戳,并生成水位线来显示时间。该方法需要传入一个WatermarkStrategy作为参数,WatermarkStrategy 中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator:

  • TimestampAssigner, 主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础;
  • WatermarkGenerator, 主要负责按照既定的方式,基于时间戳生成水位线。在WatermarkGenerator中主要有两个方法:onEvent和onPeriodicEmit;
    • onEvent, 每个事件(数据)到来都会调用的方法,他的参数有当前事件、时间戳,以及允许发出水位线的一个WatermarkOutput,可以基于事件做各种操作;
    • onPeriodiEmit, 周期性调用的方法,可以由WatermarkOutput发出水位线。周期时间为处理时间,可以调用环境配置的setAutoWatermarkInterval()方法来设置,默认为200ms;
代码

Flink提供了内置的水位线生成器:

  • 有序流,对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。
  • 乱序流,由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的时间延迟(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
代码
自定义水位线

在WatermarkStrategy中,时间戳分配器TimestampAssigner都是大同小异的,指定字段提取时间戳就可以了。不同策略的关键在于WatermarkGenerator的实现。整体来说,Flink有两种不同的生产水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated):

  • 周期性水位线生成器(Periodic Generator),周期性生成器一般是通过 onEvent()观察判断输入的事件,而在 onPeriodicEmit()里发出水位线;
  • 断点式水位线生成器(Punctuated Generator),断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时,就立即发出水位线。一般来说,断点式生成器不会通过 onPeriodicEmit()发出水位线。

此外,也可以在自定义数据源中发送水位线,但是这样就不能使用assignTimestampsAndWatermarks 方法来生成水位线了,两者只能二选一。

水位线的传递

在“重分区”(redistributing)的传输模式下,一个任务有可能会收到来自不同分区上游子任务的数据。而不同分区的子任务时钟并不同步,所以同一时刻发给下游任务的水位线可能并不相同。这说明上游各个分区处理得有快有慢,进度各不相同,这时我们应该以最慢的那个时钟,也就是最小的那个水位线为准。

窗口(Window)

Flink是一种流式计算引擎,主要是用来处理无界数据流的。想要更加方便的处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(window)。在Flink中,窗口就是用来处理无界流的核心。

由于存在迟到数据的问题,将窗口视为一个框可能并不是最合适的。我们可以把它理解成一个“桶”(bucket):每个数据都会分发到对应的桶中,当到达窗口的结束时间时,就对每个桶中收集的数据进行计算处理。


窗口的分类

  • 按照驱动类型分类:
    • 1)时间窗口,时间窗口以时间点来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据;
    • 2)计数窗口,计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。每个窗口截取数据的个数,就是窗口的大小。
  • 按照窗口分配数据的规则分类:1)滚动窗口,2)滑动窗口,3)会话窗口,4)全局窗口;





窗口API概览

在定义窗口操作之前,需要先确定到底是基于按键分区的数据流KeyedStream还是在没有按键分区的DataStream上面开窗。也即调用窗口算子之前是否有keyBy操作。

而在API上面的区别也是非常小:

// 按键分区
stream.keyBy(...).window(...)

// 非按键分区
stream.windowAll(...)

窗口分配器(Window Assigner)

定义窗口分配器(Window Assigner)是构建窗口算子的第一步,他的作用就是定义数据应该被分配到哪个窗口。通过向上一节中的window/windowAll函数中传入WindowAssigner参数,返回WindowStream。

不同窗口类型有不同的窗口分配器。

1、时间窗口
// 滚动处理时间窗口
stream.keyBy(...)
  .window(TumblingProcessingTimeWindows.of(Time.seconds(5))
  .aggregate(...)

// 滑动处理时间窗口
stream.keyBy(...)
  .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))
  .aggregate(...)

// 处理时间会话窗口
stream.keyBy(...)
  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
  .aggregate(...)

// 滚动事件时间窗口
stream.keyBy(...)
  .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  .aggregate(...)

// 滑动事件时间窗口
stream.keyBy(...)
  .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  .aggregate(...)

// 事件时间会话窗口
stream.keyBy(...)
  .window(EventTimeSessionWindows.WithGap(Time.seconds(10)))
  .aggregate(...)

// 2、计数窗口
// 滚动计数窗口, 定义一个长度为10的滚动计数窗口
stream.keyBy(...)
  .countWindow(10)

// 滑动计数窗口,长度为10,步长为3
stream.keyBy(...)
  .countWindow(10, 3)

// 3、全局窗口, 全局窗口必须自行定义触发器才能实现窗口计算,否则起不到任何作用
stream.keyBy(...)
  .window(GlobalWindows.create())

窗口函数(Window Functions)

在上面定义了窗口分配器,我们只是知道了数据属于哪个窗口,而本节介绍的窗口函数则是如何将这些窗口中的数据收集起来,即如何处理。

窗口函数是作用在windowStream上面的,返回的是DataStream。各种stream间的转换如下:


1、增量聚合函数
为了提高实时性,我们可以像 DataStream 的简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了;区别只是在于不立即输出结果,而是要等到窗口结束时间。等到窗口到了结束时间需要输出计算结果的时候,我们只需要拿出之前聚合的状态直接输出,这无疑就大大提高了程序运行的效率和实时性。

典型的增量聚合函数有两个:ReduceFunction和AggregateFunction。

ReduceFunction:

package com.whu.chapter06

import com.whu.chapter05.{ClickSource, Event}

import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows

object WindowFunctionDemo {
  def main(args:Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    env.addSource(new ClickSource())
      // 数据源中的时间戳是单调递增的,所以使用下面的方法,只需要抽取时间戳就好了
      // 等同于最大延迟时间是0毫秒
      .assignAscendingTimestamps(_.timeStamp)
      .map(r => (r.user, 1L))
      // 使用用户名对数据流进行分组
      .keyBy(_._1)
      // 设置5秒钟的滚动事件时间窗口
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      // 保留第一个字段,针对第二个字段进行聚合
      .reduce((r1, r2) => (r1._1, r1._2+r2._2))
      .print()
    
    env.execute()
  }
}

AggregateFunction
ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。

AggregateFunction 在源码中的定义如下:

public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable 
{
  // 创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次
  ACC createAccumulator();

  // 将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程
  ACC add(IN value, ACC accumulator);

  // getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出
  OUT getResult(ACC accumulator);

  // 合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用
  ACC merge(ACC a, ACC b);
}

AggregateFunction接受3个数据类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。

    env.addSource(new ClickSource())
      .assignAscendingTimestamps(_.timeStamp)
      // 通过为每条数据分配相同的key,来将数据发送到同一个分区
      .keyBy(_ => "key")
      .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
      .aggregate(new AvgPv)

    env.execute()
  
  class AvgPv extends AggregateFunction[Event,(Set[String], Double), Double] {
    // 创建空累加器,类型是元组,元组的第一个元素类型为Set数据结构,用来对用户名去重
    // 第二个元素用来累加pv操作,也就是没来一条数据就加一
    override def createAccumulator(): (Set[String], Double) = (Set[String](), 0L)

    // 累加规则
    override def add(in: Event, acc: (Set[String], Double)): (Set[String], Double) = {
      (acc._1+in.user, acc._2+1)
    }

    // 获取窗口关闭时向下游发送的结果
    override def getResult(acc: (Set[String], Double)): Double = {
      acc._2/(acc._1.size.toDouble)
    }

    // merge方法只有在事件时间的会话窗口时,才需要实现,这里无需实现
    override def merge(acc: (Set[String], Double), acc1: (Set[String], Double)): (Set[String], Double) = ???
  }

全窗口函数(Full Window Functions)
窗口操作中的另一大类就是全窗口函数,与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。

Flink中的全窗口函数有两种:WindowFunction和ProcessWindowFunction。

// 窗口函数
stream.keyBy(<key selector>)
  .window(<window assigner>)
  .apply(new MyWindowFunction())

处理窗口函数ProcessWindowFunction是Window API中最底层的通用窗口函数接口。除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象(context)”。这个上下文对象不仅有窗口信息,还可以访问当前的时间和状态信息。这里的时间包括了处理时间(process time)和事件时间水位线(event time watermark)。这使得ProcessWindowFunction更加灵活、功能更加丰富,可以认为是一个增强版的WindowFunction。

// Full WindowFunction
    env.addSource(new ClickSource())
      .assignAscendingTimestamps(_.timeStamp)
      // 为所有数据都指定同一个key,可以将所有数据发送到同一个分区
      .keyBy(_ => "key")
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .process(new UvCountByWindow)
      .print()
    
    env.execute()
  
  // 自定义窗口处理函数
  class UvCountByWindow extends ProcessWindowFunction[Event, String, String, TimeWindow]{
    // 
    override def process(key: String, context: Context, elements: Iterable[Event], out: Collector[String]): Unit = {
      // 初始化一个Set数据结构,用来对用户名进行去重
      var userSet = Set[String]()
      // 将所有用户进行去重
      elements.foreach(userSet += _.user)
      // 结合窗口信息,包装输出内容
      val windowStart = context.window.getStart
      val windowEnd = context.window.getEnd
      out.collect(" 窗口:"+ new Timestamp(windowStart) + " ~ "+ new Timestamp(windowEnd) + " 独立访客数为:" + userSet.size)
    }
增量和聚合函数结合使用

增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。而全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作,窗口计算更加灵活,功能更加强大。所以在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。

这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数
据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。

    // 全窗口函数和聚合函数结合使用
    env.addSource(new ClickSource())
      .assignAscendingTimestamps(_.timeStamp)
      // 使用url作为key对数据进行分区
      .keyBy(_.url)
      .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
      // 注意这里调用的是aggregate方法
      // 增量聚合函数和全窗口聚合函数结合使用
      .aggregate(new UrlViewCountAgg, new UrlViewCountResult)
      .print()

  class UrlViewCountAgg extends AggregateFunction[Event, Long, Long] {
    override def createAccumulator(): Long = 0L

    // 每来一个事件就加1
    override def add(in: Event, acc: Long): Long = acc + 1L

    // 窗口闭合时发送的计算结果
    override def getResult(acc: Long): Long = acc

    override def merge(acc: Long, acc1: Long): Long = ???
  }

  case class UrlViewCount(url: String, count: Long, windowStart: Long, windowEnd: Long)

  class UrlViewCountResult extends ProcessWindowFunction[Long, UrlViewCount, String, TimeWindow] {
    // 迭代器中只有一个元素,是增量聚合函数在窗口闭合时发送过来的计算结果
    override def process(key: String, context: Context, elements: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
      out.collect(UrlViewCount(key, elements.iterator.next(), context.window.getStart, context.window.getEnd))
    }
  }

其它API

  • 触发器(Trigger):触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程;Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现;
  • 移除器(Evictor):移除器主要用来定义移除某些数据的逻辑;
  • 允许延迟(Allowed Lateness):为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个“允许的最大延迟”(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算;
  • 将迟到的数据放入侧输出流:Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些本该被丢弃的数据。

窗口的生命周期

熟悉了窗口 API 的使用,这里梳理一下窗口本身的生命周期,这也是对窗口所有操作的一个总结:

  • 窗口的创建;
  • 窗口计算的触发;
  • 窗口的销毁;


迟到数据的处理

所谓的“迟到数据”(late data),是指某个水位线之后到来的数据,它的时间戳其实是在水位线之前的。所以只有在事件时间语义下,讨论迟到数据的处理才是有意义的。

  • 设置水位线延迟时间;
  • 允许窗口处理迟到数据;
  • 将迟到数据放入窗口侧输出流;
package com.whu.chapter06

import com.whu.chapter05.{ClickSource, Event}
import com.whu.chapter06.WindowFunctionDemo.{UrlViewCountAgg, UrlViewCountResult}

import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.sql.Timestamp

object ProcessLateDataDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    
    // 为方便测试,读取socket文本流进行处理
    val stream = env.socketTextStream("localhost", 7777)
      .map(data => {
        val fields = data.split(",")
        Event(fields(0).trim, fields(1).trim, fields(2).trim.toLong)
      })
    
    // 方式1:设置Watermark延迟时间 2秒钟
    val res1 = stream.assignTimestampsAndWatermarks(WatermarkStrategy
      // 最大延迟时间设置为5秒钟
      .forBoundedOutOfOrderness[Event](Duration.ofSeconds(2))
      .withTimestampAssigner( new SerializableTimestampAssigner[Event] {
        override def extractTimestamp(t: Event, l: Long): Long = t.timeStamp
      })
    )
    
    // 定义侧输出流标签
    val outputTag = OutputTag[Event]("late")
    val res2 = stream
      .keyBy(_.url)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      // 方式二:允许窗口处理迟到数据,设置1分钟的等待时间
      .allowedLateness(Time.minutes(1))
      // 方式三:将最后的迟到数据输出到侧输出流
      .sideOutputLateData(outputTag)
      .aggregate(new UrlViewCountAgg, new UrlViewCountResult)
    
    res2.print()
    
    res2.getSideOutput(outputTag).print("late")
    
    // 为方便观察,可以将原始数据也输出
    stream.print("input")
  }
}

有关第六章 Flink中的时间和窗口的更多相关文章

  1. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

    总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

  2. ruby - 其他文件中的 Rake 任务 - 2

    我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时

  3. ruby-on-rails - Ruby net/ldap 模块中的内存泄漏 - 2

    作为我的Rails应用程序的一部分,我编写了一个小导入程序,它从我们的LDAP系统中吸取数据并将其塞入一个用户表中。不幸的是,与LDAP相关的代码在遍历我们的32K用户时泄漏了大量内存,我一直无法弄清楚如何解决这个问题。这个问题似乎在某种程度上与LDAP库有关,因为当我删除对LDAP内容的调用时,内存使用情况会很好地稳定下来。此外,不断增加的对象是Net::BER::BerIdentifiedString和Net::BER::BerIdentifiedArray,它们都是LDAP库的一部分。当我运行导入时,内存使用量最终达到超过1GB的峰值。如果问题存在,我需要找到一些方法来更正我的代

  4. ruby-on-rails - Rails 3 中的多个路由文件 - 2

    Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题

  5. ruby-on-rails - Rails - 一个 View 中的多个模型 - 2

    我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何

  6. ruby-on-rails - Rails 3.2.1 中 ActionMailer 中的未定义方法 'default_content_type=' - 2

    我在我的项目中添加了一个系统来重置用户密码并通过电子邮件将密码发送给他,以防他忘记密码。昨天它运行良好(当我实现它时)。当我今天尝试启动服务器时,出现以下错误。=>BootingWEBrick=>Rails3.2.1applicationstartingindevelopmentonhttp://0.0.0.0:3000=>Callwith-dtodetach=>Ctrl-CtoshutdownserverExiting/Users/vinayshenoy/.rvm/gems/ruby-1.9.3-p0/gems/actionmailer-3.2.1/lib/action_mailer

  7. ruby-on-rails - Rails 应用程序中的 Rails : How are you using application_controller. rb 是新手吗? - 2

    刚入门rails,开始慢慢理解。有人可以解释或给我一些关于在application_controller中编码的好处或时间和原因的想法吗?有哪些用例。您如何为Rails应用程序使用应用程序Controller?我不想在那里放太多代码,因为据我了解,每个请求都会调用此Controller。这是真的? 最佳答案 ApplicationController实际上是您应用程序中的每个其他Controller都将从中继承的类(尽管这不是强制性的)。我同意不要用太多代码弄乱它并保持干净整洁的态度,尽管在某些情况下ApplicationContr

  8. ruby-on-rails - form_for 中不在模型中的自定义字段 - 2

    我想向我的Controller传递一个参数,它是一个简单的复选框,但我不知道如何在模型的form_for中引入它,这是我的观点:{:id=>'go_finance'}do|f|%>Transferirde:para:Entrada:"input",:placeholder=>"Quantofoiganho?"%>Saída:"output",:placeholder=>"Quantofoigasto?"%>Nota:我想做一个额外的复选框,但我该怎么做,模型中没有一个对象,而是一个要检查的对象,以便在Controller中创建一个ifelse,如果没有检查,请帮助我,非常感谢,谢谢

  9. ruby - rspec 需要 .rspec 文件中的 spec_helper - 2

    我注意到像bundler这样的项目在每个specfile中执行requirespec_helper我还注意到rspec使用选项--require,它允许您在引导rspec时要求一个文件。您还可以将其添加到.rspec文件中,因此只要您运行不带参数的rspec就会添加它。使用上述方法有什么缺点可以解释为什么像bundler这样的项目选择在每个规范文件中都需要spec_helper吗? 最佳答案 我不在Bundler上工作,所以我不能直接谈论他们的做法。并非所有项目都checkin.rspec文件。原因是这个文件,通常按照当前的惯例,只

  10. ruby-on-rails - active_admin 目录中的常量警告重新声明 - 2

    我正在使用active_admin,我在Rails3应用程序的应用程序中有一个目录管理,其中包含模型和页面的声明。时不时地我也有一个类,当那个类有一个常量时,就像这样:classFooBAR="bar"end然后,我在每个必须在我的Rails应用程序中重新加载一些代码的请求中收到此警告:/Users/pupeno/helloworld/app/admin/billing.rb:12:warning:alreadyinitializedconstantBAR知道发生了什么以及如何避免这些警告吗? 最佳答案 在纯Ruby中:classA

随机推荐