书接上回 大数据量、高并发业务怎么优化?(一) 文章中介绍了异步批处理的三种方式,本文继续深入针对前两种进行讲解,并给出代码示例:

ArrayBlockingQueue使用普通方式能够直接基于JDK中现成的并发包 ArrayBlockingQueue 提供的 offer(E e, long timeout, TimeUnit unit)(添加元素到队列尾部,如果队列已满则等待参数指定时间后返回false)方法 和 poll(long timeout, TimeUnit unit)(从队列头部获取元素,如果队列为空则等待参数指定时间后返回null)方法,来达到异步批处理效果
生产者代码:由于采用内存队列,最好在创建 ArrayBlockingQueue 时指定队列大小,防止队列无界,导致内存溢出
/**
* 生产者
*/
@Component
@Slf4j
public class MonitorQueue {
private BlockingQueue<List<NodeCollectDTO>> queue = new ArrayBlockingQueue<>(10000000);
public void put(List<NodeCollectDTO> list) {
try {
queue.put(list);
} catch (InterruptedException e) {
log.error(String.format("队列put异常:%s", e.getMessage()), e);
}
}
public void offer(List<NodeCollectDTO> list, long timeout, TimeUnit unit) throws InterruptedException {
queue.offer(list, timeout, unit);
}
public List<NodeCollectDTO> poll(long timeout, TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
}
}
消费者代码:在创建生产者时开启一个子线程在死循环中一直读取队列元素,直到队列元素超过我们的 maxNum 时,将临时列表元素插入数据库中
/**
* 消费者
*/
@Slf4j
@Component
public class MonitorConsumer implements Runnable {
@Autowired
private MonitorQueue queue;
@Autowired
private MonitorService monitorService;
@PostConstruct
public void init() {
new Thread(this, "monitor-collect").start();
}
// 临时列表大小限制
private int maxNum = 2000;
@SuppressWarnings("InfiniteLoopStatement")
@Override
public void run() {
while (true) {
handler();
}
}
private void handler() {
try {
List<NodeCollectDTO> temp = new ArrayList<>(maxNum);
while (temp.size() <= maxNum) {
List<NodeCollectDTO> list = queue.poll(20, TimeUnit.SECONDS);
if (CollectionUtil.isNotEmpty(list)) {
temp.addAll(list);
} else {
break;
}
}
if (CollectionUtil.isEmpty(temp)) {
return;
}
int i = monitorService.batchSave(temp);
log.debug("----------------------------batchSave num:{}, collect.size:{}", i, collect.size());
} catch (Exception e) {
log.error(String.format("消费者异常: %s", e.getMessage()), e);
}
}
}
可以看到采用该种方式实现的异步批量入库代码比较简单,便于理解,在性能上,基本都能够满足日常普通业务存在的批量入库场景
Disruptor 队列,本文基于 Disruptor 最新4.0版本先给出 Disruptor 官网简介
Disruptor 是一个提供并发环形缓冲区数据结构的库。它旨在在异步事件处理架构中提供低延迟、高吞吐量的工作队列。
为了理解 Disruptor 的好处,我们可以将它与一些很好理解且目的非常相似的东西进行比较。在 Disruptor 的情况下,这将是 Java 的 BlockingQueue。与队列一样,Disruptor 的目的是在同一进程内的线程之间移动数据(例如消息或事件)。然而,Disruptor 提供的一些关键特性使其有别于队列。他们是:
向消费者多播事件,带有消费者依赖图。
为事件预分配内存。
可选无锁
Disruptor 给我们在项目中实现异步批处理提供了另一种方式,一种无锁、延迟更低、吞吐量更高、提供消费者多播等等的内存队列
下面介绍如何使用
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>4.0.0.RC1</version>
</dependency>
Disruptor 使用代码如下:public class LongEvent{
private long value;
public void set(long value){
this.value = value;
}
@Override
public String toString(){
return "LongEvent{" + "value=" + value + '}';
}
}
@Slf4j
public class LongEventMain {
public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) {
log.info("event: " + event + ", sequence:" + sequence + ", endOfBatch:" + endOfBatch);
}
public static void translate(LongEvent event, long sequence, ByteBuffer buffer) {
event.set(buffer.getLong(0));
}
public static void main(String[] args) throws Exception {
int bufferSize = 128;
// 1. 创建Disruptor对象
Disruptor<LongEvent> disruptor =
new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());
// 2. 添加事件处理类(消费者)
disruptor.handleEventsWith(LongEventMain::handleEvent);
// 3. 开启事件处理线程
disruptor.start();
// 4. 获取ringBuffer
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
// 5. 发布事件(生产者)
ringBuffer.publishEvent(LongEventMain::translate, bb);
Thread.sleep(1);
}
}
}
@Slf4j
public class LongEventBatch implements EventHandler<LongEvent> {
private static final int MAX_BATCH_SIZE = 20;
private final List<LongEvent> batch = new ArrayList<>();
public LongEventBatch() {
// 虚拟机关闭处理
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("------------------ShutdownHook-DataEventHandler,上报tempList");
if (batch.size() > 0) {
// 批量入库伪代码
int i = xxxService.batchSave(temp);
}
}));
}
@Override
public void onEvent(final LongEvent event, final long sequence, final boolean endOfBatch) {
log.info("event: " + event + ", sequence:" + sequence + ", endOfBatch:" + endOfBatch);
batch.add(event);
if (batch.size() >= MAX_BATCH_SIZE) {
processBatch(batch);
}
}
private void processBatch(final List<LongEvent> batch) {
// 批量入库伪代码
int i = xxxService.batchSave(temp);
// 记得清空batch列表
batch.clear();
}
}
由此,我们就实现了基于 Disruptor 的异步批处理逻辑,该方式会比普通版本性能高出一个数量级,大家在工作中可以尝试使用一番
附博主 github 地址 https://github.com/wayn111
Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack
exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby中使用两个参数异步运行exe吗?我已经尝试过ruby命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何rubygems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除
1.postman介绍Postman一款非常流行的API调试工具。其实,开发人员用的更多。因为测试人员做接口测试会有更多选择,例如Jmeter、soapUI等。不过,对于开发过程中去调试接口,Postman确实足够的简单方便,而且功能强大。2.下载安装官网地址:https://www.postman.com/下载完成后双击安装吧,安装过程极其简单,无需任何操作3.使用教程这里以百度为例,工具使用简单,填写URL地址即可发送请求,在下方查看响应结果和响应状态码常用方法都有支持请求方法:getpostputdeleteGet、Post、Put与Delete的作用get:请求方法一般是用于数据查询,
在VMware16.2.4安装Ubuntu一、安装VMware1.打开VMwareWorkstationPro官网,点击即可进入。2.进入后向下滑动找到Workstation16ProforWindows,点击立即下载。3.下载完成,文件大小615MB,如下图:4.鼠标右击,以管理员身份运行。5.点击下一步6.勾选条款,点击下一步7.先勾选,再点击下一步8.去掉勾选,点击下一步9.点击下一步10.点击安装11.点击许可证12.在百度上搜索VM16许可证,复制填入,然后点击输入即可,亲测有效。13.点击完成14.重启系统,点击是15.双击VMwareWorkstationPro图标,进入虚拟机主
1.1.1 YARN的介绍 为克服Hadoop1.0中HDFS和MapReduce存在的各种问题⽽提出的,针对Hadoop1.0中的MapReduce在扩展性和多框架⽀持⽅⾯的不⾜,提出了全新的资源管理框架YARN. ApacheYARN(YetanotherResourceNegotiator的缩写)是Hadoop集群的资源管理系统,负责为计算程序提供服务器计算资源,相当于⼀个分布式的操作系统平台,⽽MapReduce等计算程序则相当于运⾏于操作系统之上的应⽤程序。 YARN被引⼊Hadoop2,最初是为了改善MapReduce的实现,但是因为具有⾜够的通⽤性,同样可以⽀持其他的分布式计算模
在我做的一些网络开发中,我有多个操作开始,比如对外部API的GET请求,我希望它们同时开始,因为一个不依赖另一个的结果。我希望事情能够在后台运行。我找到了concurrent-rubylibrary这似乎运作良好。通过将其混合到您创建的类中,该类的方法具有在后台线程上运行的异步版本。这导致我编写如下代码,其中FirstAsyncWorker和SecondAsyncWorker是我编写的类,我在其中混合了Concurrent::Async模块,并编写了一个名为“work”的方法来发送HTTP请求:defindexop1_result=FirstAsyncWorker.new.async.
我对图像处理完全陌生。我对JPEG内部是什么以及它是如何工作一无所知。我想知道,是否可以在某处找到执行以下简单操作的ruby代码:打开jpeg文件。遍历每个像素并将其颜色设置为fx绿色。将结果写入另一个文件。我对如何使用ruby-vips库实现这一点特别感兴趣https://github.com/ender672/ruby-vips我的目标-学习如何使用ruby-vips执行基本的图像处理操作(Gamma校正、亮度、色调……)任何指向比“helloworld”更复杂的工作示例的链接——比如ruby-vips的github页面上的链接,我们将不胜感激!如果有ruby-
我有一个super简单的脚本,它几乎包含了FayeWebSocketGitHub页面上用于处理关闭连接的内容:ws=Faye::WebSocket::Client.new(url,nil,:headers=>headers)ws.on:opendo|event|p[:open]#sendpingcommand#sendtestcommand#ws.send({command:'test'}.to_json)endws.on:messagedo|event|#hereistheentrypointfordatacomingfromtheserver.pJSON.parse(event.d
我正在尝试解析网页,但有时会收到404错误。这是我用来获取网页的代码:result=Net::HTTP::getURI.parse(URI.escape(url))如何测试result是否为404错误代码? 最佳答案 像这样重写你的代码:uri=URI.parse(url)result=Net::HTTP.start(uri.host,uri.port){|http|http.get(uri.path)}putsresult.codeputsresult.body这将打印状态码和正文。
我想在Windows7上安装带有ruby1.9.3的rspec-railsgem。我收到一些错误消息,提示无法安装某些json库。所以,我使用下面的说明来解决它。来源=The'json'nativegemrequiresinstalledbuildtools从[rubyinstaller.org][3]下载[Ruby1.9.3][2]从[rubyinstaller.org][3]下载DevKit文件对于Ruby1.9.3,使用[DevKit-tdm-32-4.5.2-20110712-1620-sfx.exe][4]将DevKit解压到路径C:\Ruby193\DevKit运行cd