我有通过 http 接收的数据,这些数据需要由两个不同的函数处理。重要的是它们由每个函数按顺序处理。在文件中,例如:1,2,3,4,5。而数据库也记录了1,2,3,4,5。作为 fifo 模型。 现在我有这样一个问题......我的数据一直在运行,有时数据库可以满足我更新数据的要求很长时间,因此我无法及时更新文件。 在可能的情况下将数据添加到文件或数据库对我来说很重要。我可以使用缓冲 channel ,但我不知道队列中有多少数据可以等待处理,我不想表明缓冲区的大小肯定很大。 我尝试向 NewData 函数添加更多 goroutine,但在那种情况下,我的数据不是按顺序写入的。
此代码显示了问题。
package main
import (
"fmt"
"time"
)
type procHandler interface {
Start()
NewData(newdata []byte)
}
type fileWriter struct {
Data chan []byte
}
func (proc *fileWriter) Start() {
proc.Data = make(chan []byte)
go func() {
for {
obj := <-proc.Data
fmt.Printf("proc %T ", proc)
fmt.Println(obj)
}
}()
}
func (proc *fileWriter) NewData(newdata []byte) {
proc.Data <- newdata
}
type sqlWriter struct {
Data chan []byte
}
func (proc *sqlWriter) Start() {
proc.Data = make(chan []byte)
go func() {
for {
obj := <-proc.Data
time.Sleep(5 * time.Second)
fmt.Printf("proc %T ", proc)
fmt.Println(obj)
}
}()
}
func (proc *sqlWriter) NewData(newdata []byte) {
proc.Data <- newdata
}
var processors = []procHandler{}
func receiver() {
newDataImitateByteRange := 30
for i := 0; i < newDataImitateByteRange; i++ {
pseudoData := []byte{byte(i)}
for _, handler := range processors {
handler.NewData(pseudoData)
}
}
}
func main() {
// file writer
fileUpdate := &fileWriter{}
processors = append(processors, fileUpdate)
// sql writer
sqlUpdate := &sqlWriter{}
processors = append(processors, sqlUpdate)
sqlUpdate.Start()
fileUpdate.Start()
go receiver()
fmt.Scanln()
}
代码有效:https://play.golang.org/p/rSshsJYZ4h
输出:
proc *main.fileWriter [0]
proc *main.fileWriter [1]
proc *main.sqlWriter [0] (sleep)
proc *main.fileWriter [2] (Display after 5 seconds when the previous channel is processed)
proc *main.sqlWriter [1] (sleep)
proc *main.fileWriter [3] (Display after 5 seconds when the previous channel is processed)
proc *main.sqlWriter [2]
proc *main.fileWriter [4]
proc *main.sqlWriter [3]
proc *main.fileWriter [5]
proc *main.sqlWriter [4]
proc *main.fileWriter [6]
我要:
proc *main.fileWriter [0]
proc *main.fileWriter [1]
proc *main.fileWriter [2]
proc *main.fileWriter [3]
proc *main.fileWriter [4]
proc *main.fileWriter [5]
proc *main.fileWriter [6]
proc *main.sqlWriter [0] (after 5 seconds passed the handler started execution.)
proc *main.sqlWriter [1] (sleep)
proc *main.sqlWriter [2] (sleep)
proc *main.sqlWriter [3] (sleep)
proc *main.sqlWriter [4] (sleep)
proc *main.sqlWriter [5] (sleep)
proc *main.sqlWriter [6] (sleep)
希望得到帮助,谢谢!
最佳答案
听起来您正在寻找的东西就像一个 channel ,可以根据排队的数据调整大小(增长或缩小)。这可以通过在输入和输出 channel 之间设置一个队列来实现,并使用 goroutine 为这些 channel 提供服务。这是这样一个解决方案: https://github.com/gammazero/bigchan#bigchan
我在您的 fileWriter 和 sqlWriter 中使用了 BigChan 作为数据通道,它似乎有您正在寻找的结果。以下是您修改后的代码:
package main
import (
"fmt"
"time"
"github.com/gammazero/bigchan"
)
// Maximum number of items to buffer. set to -1 for unlimited.
const limit = 65536
type procHandler interface {
Start()
NewData(newdata []byte)
}
type fileWriter struct {
Data *bigchan.BigChan
}
func (proc *fileWriter) Start() {
proc.Data = bigchan.New(limit)
go func() {
for {
_obj := <-proc.Data.Out()
obj := _obj.([]byte)
fmt.Printf("proc %T ", proc)
fmt.Println(obj)
}
}()
}
func (proc *fileWriter) NewData(newdata []byte) {
proc.Data.In() <- newdata
}
type sqlWriter struct {
Data *bigchan.BigChan
}
func (proc *sqlWriter) Start() {
proc.Data = bigchan.New(limit)
go func() {
for {
_obj := <-proc.Data.Out()
obj := _obj.([]byte)
time.Sleep(5 * time.Second)
fmt.Printf("proc %T ", proc)
fmt.Println(obj)
}
}()
}
func (proc *sqlWriter) NewData(newdata []byte) {
proc.Data.In() <- newdata
}
var processors = []procHandler{}
func receiver() {
newDataImitateByteRange := 30
for i := 0; i < newDataImitateByteRange; i++ {
pseudoData := []byte{byte(i)}
for _, handler := range processors {
handler.NewData(pseudoData)
}
}
}
func main() {
// file writer
fileUpdate := &fileWriter{}
processors = append(processors, fileUpdate)
// sql writer
sqlUpdate := &sqlWriter{}
processors = append(processors, sqlUpdate)
sqlUpdate.Start()
fileUpdate.Start()
go receiver()
fmt.Scanln()
}
关于asynchronous - 多个函数的数据处理是异步的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46519440/
Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题
我有多个ActiveRecord子类Item的实例数组,我需要根据最早的事件循环打印。在这种情况下,我需要打印付款和维护日期,如下所示:ItemAmaintenancerequiredin5daysItemBpaymentrequiredin6daysItemApaymentrequiredin7daysItemBmaintenancerequiredin8days我目前有两个查询,用于查找maintenance和payment项目(非排他性查询),并输出如下内容:paymentrequiredin...maintenancerequiredin...有什么方法可以改善上述(丑陋的)代
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack
我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何
exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby中使用两个参数异步运行exe吗?我已经尝试过ruby命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何rubygems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除
我想在一个没有Sass引擎的类中使用Sass颜色函数。我已经在项目中使用了sassgem,所以我认为搭载会像以下一样简单:classRectangleincludeSass::Script::FunctionsdefcolorSass::Script::Color.new([0x82,0x39,0x06])enddefrender#hamlengineexecutedwithcontextofself#sothatwithintemlateicouldcall#%stop{offset:'0%',stop:{color:lighten(color)}}endend更新:参见上面的#re
我有一个具有一些属性的模型:attr1、attr2和attr3。我需要在不执行回调和验证的情况下更新此属性。我找到了update_column方法,但我想同时更新三个属性。我需要这样的东西:update_columns({attr1:val1,attr2:val2,attr3:val3})代替update_column(attr1,val1)update_column(attr2,val2)update_column(attr3,val3) 最佳答案 您可以使用update_columns(attr1:val1,attr2:val2
我正在尝试修改当前依赖于定义为activeresource的gem:s.add_dependency"activeresource","~>3.0"为了让gem与Rails4一起工作,我需要扩展依赖关系以与activeresource的版本3或4一起工作。我不想简单地添加以下内容,因为它可能会在以后引起问题:s.add_dependency"activeresource",">=3.0"有没有办法指定可接受版本的列表?~>3.0还是~>4.0? 最佳答案 根据thedocumentation,如果你想要3到4之间的所有版本,你可以这
我正在尝试用ruby中的gsub函数替换字符串中的某些单词,但有时效果很好,在某些情况下会出现此错误?这种格式有什么问题吗NoMethodError(undefinedmethod`gsub!'fornil:NilClass):模型.rbclassTest"replacethisID1",WAY=>"replacethisID2andID3",DELTA=>"replacethisID4"}end另一个模型.rbclassCheck 最佳答案 啊,我找到了!gsub!是一个非常奇怪的方法。首先,它替换了字符串,所以它实际上修改了