我有一个 Java 应用程序,我在其中获取非常小的文件 (1KB),但在一分钟内获取大量小文件,即我在一分钟内获取 20000 个文件。 我正在获取文件并上传到 S3。
我在 10 个并行线程中运行它。 我还必须持续运行这个应用程序。
当这个应用程序运行几天后,我得到了内存不足的错误。
这是我得到的确切错误
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 347376 bytes for Chunk::new
# Possible reasons:
# The system is out of physical RAM or swap space
# In 32 bit mode, the process size limit was hit
# Possible solutions:
# Reduce memory load on the system
# Increase physical memory or swap space
# Check if swap backing store is full
# Use 64 bit Java on a 64 bit OS
# Decrease Java heap size (-Xmx/-Xms)
# Decrease number of Java threads
# Decrease Java thread stack sizes (-Xss)
# Set larger code cache with -XX:ReservedCodeCacheSize=
# This output file may be truncated or incomplete.
#
# Out of Memory Error (allocation.cpp:390), pid=6912, tid=0x000000000003ec8c
#
# JRE version: Java(TM) SE Runtime Environment (8.0_181-b13) (build 1.8.0_181-b13)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.181-b13 mixed mode windows-amd64 compressed oops)
# Core dump written. Default location: d:\S3FileUploaderApp\hs_err_pid6912.mdmp
#
这是我的 java 类。 我正在复制所有类,以便于调查。
这是我的 Java Visual VM 报告图像
添加我的示例输出
更新元空间图像
这是我的主课
public class UploadExecutor {
private static Logger _logger = Logger.getLogger(UploadExecutor.class);
public static void main(String[] args) {
_logger.info("----------STARTING JAVA MAIN METHOD----------------- ");
/*
* 3 C:\\Users\\u6034690\\Desktop\\TWOFILE\\xml
* a205381-tr-fr-production-us-east-1-trf-auditabilty
*/
final int batchSize = 100;
while (true) {
String strNoOfThreads = args[0];
String strFileLocation = args[1];
String strBucketName = args[2];
int iNoOfThreads = Integer.parseInt(strNoOfThreads);
S3ClientManager s3ClientObj = new S3ClientManager();
AmazonS3Client s3Client = s3ClientObj.buildS3Client();
try {
FileProcessThreads fp = new FileProcessThreads();
File[] files = fp.getFiles(strFileLocation);
try {
_logger.info("No records found will wait for 10 Seconds");
TimeUnit.SECONDS.sleep(10);
files = fp.getFiles(strFileLocation);
ArrayList<File> batchFiles = new ArrayList<File>(batchSize);
if (null != files) {
for (File path : files) {
String fileType = FilenameUtils.getExtension(path.getName());
long fileSize = path.length();
if (fileType.equals("gz") && fileSize > 0) {
batchFiles.add(path);
}
if (batchFiles.size() == batchSize) {
BuildThread BuildThreadObj = new BuildThread();
BuildThreadObj.buildThreadLogic(iNoOfThreads, s3Client, batchFiles, strFileLocation,
strBucketName);
_logger.info("---Batch One got completed---");
batchFiles.clear();
}
}
}
// to consider remaining or files with count<batch size
if (!batchFiles.isEmpty()) {
BuildThread BuildThreadObj = new BuildThread();
BuildThreadObj.buildThreadLogic(iNoOfThreads, s3Client, batchFiles, strFileLocation,
strBucketName);
batchFiles.clear();
}
} catch (InterruptedException e) {
_logger.error("InterruptedException: " + e.toString());
}
} catch (Throwable t) {
_logger.error("InterruptedException: " + t.toString());
}
}
}
}
这是我构建线程和关闭执行程序的类。 因此,对于每次运行,我都会创建新的 Executor 服务。
public class BuildThread {
private static Logger _logger = Logger.getLogger(BuildThread.class);
public void buildThreadLogic(int iNoOfThreads,AmazonS3Client s3Client, List<File> records,String strFileLocation,String strBucketName) {
_logger.info("Calling buildThreadLogic method of BuildThread class");
final ExecutorService executor = Executors.newFixedThreadPool(iNoOfThreads);
int recordsInEachThraed = (int) (records.size() / iNoOfThreads);
int threadIncr=2;
int recordsInEachThreadStart=0;
int recordsInEachThreadEnd=0;
for (int i = 0; i < iNoOfThreads; i++) {
if (i==0){
recordsInEachThreadEnd=recordsInEachThraed;
}
if (i==iNoOfThreads-1){
recordsInEachThreadEnd=records.size();
}
Runnable worker = new UploadObject(records.subList(recordsInEachThreadStart, recordsInEachThreadEnd), s3Client,strFileLocation,strBucketName);
executor.execute(worker);
recordsInEachThreadStart=recordsInEachThreadEnd;
recordsInEachThreadEnd=recordsInEachThraed*(threadIncr);
threadIncr++;
}
executor.shutdown();
while (!executor.isTerminated()) {
}
_logger.info("Existing buildThreadLogic method");
}
}
这是我将文件上传到 S3 并运行方法的类
public class UploadObject implements Runnable {
private static Logger _logger;
List<File> records;
AmazonS3Client s3Client;
String fileLocation;
String strBucketName;
UploadObject(List<File> list, AmazonS3Client s3Client, String fileLocation, String strBucketName) {
this.records = list;
this.s3Client = s3Client;
this.fileLocation=fileLocation;
this.strBucketName=strBucketName;
_logger = Logger.getLogger(UploadObject.class);
}
public void run() {
uploadToToS3();
}
public void uploadToToS3() {
_logger.info("Number of record to be uploaded in current thread: : " + records.size());
TransferManager tm = new TransferManager(s3Client);
final MultipleFileUpload upload = tm.uploadFileList(strBucketName, "", new File(fileLocation), records);
try {
upload.waitForCompletion();
} catch (AmazonServiceException e1) {
_logger.error("AmazonServiceException " + e1.getErrorMessage());
System.exit(1);
} catch (AmazonClientException e1) {
_logger.error("AmazonClientException " + e1.getMessage());
System.exit(1);
} catch (InterruptedException e1) {
_logger.error("InterruptedException " + e1.getMessage());
System.exit(1);
} finally {
_logger.info("--Calling TransferManager ShutDown--");
tm.shutdownNow(false);
}
CleanUp CleanUpObj=new CleanUp();
CleanUpObj.deleteUploadedFile(upload,records);
}
}
这个类用来创建S3客户端管理器
public class S3ClientManager {
private static Logger _logger = Logger.getLogger(S3ClientManager.class);
public AmazonS3Client buildS3Client() {
_logger.info("Calling buildS3Client method of S3ClientManager class");
AWSCredentials credential = new ProfileCredentialsProvider("TRFAuditability-Prod-ServiceUser").getCredentials();
AmazonS3Client s3Client = (AmazonS3Client) AmazonS3ClientBuilder.standard().withRegion("us-east-1")
.withCredentials(new AWSStaticCredentialsProvider(credential)).withForceGlobalBucketAccessEnabled(true)
.build();
s3Client.getClientConfiguration().setMaxConnections(5000);
s3Client.getClientConfiguration().setConnectionTimeout(6000);
s3Client.getClientConfiguration().setSocketTimeout(30000);
_logger.info("Exiting buildS3Client method of S3ClientManager class");
return s3Client;
}
}
这是我获取文件的地方。
public class FileProcessThreads {
public File[] getFiles(String fileLocation) {
File dir = new File(fileLocation);
File[] directoryListing = dir.listFiles();
if (directoryListing.length > 0)
return directoryListing;
return null;
}
}
最佳答案
很抱歉没有解决关于内存泄漏的原始问题,但你的方法对我来说似乎完全有缺陷。 System.exit() 对 UploadObject 的调用可能是资源泄漏的原因,但这仅仅是个开始。 Amazon S3 TransferManager 已经有一个内部执行器服务,因此您不需要自己的多线程 Controller 。我看不出您如何同意每个文件只上传一次。您进行多次上传调用,然后删除所有文件,而不管上传过程中是否出现故障,因此文件不在 S3 中。您尝试在执行者之间分发文件,这是不必要的。在 TransferManager ExecutorService 之上添加更多线程不会提高您的性能,只会导致抖动。
我会采用不同的方法。
首先是一个非常简单的主类,它只启动一个工作线程并等待它完成。
public class S3Uploader {
public static void main(String[] args) throws Exception {
final String strNoOfThreads = args[0];
final String strFileLocation = args[1];
final String strBucketName = args[2];
// Maximum number of file names that are read into memory
final int maxFileQueueSize = 5000;
S3UploadWorkerThread worker = new S3UploadWorkerThread(strFileLocation, strBucketName, Integer.parseInt(strNoOfThreads), maxFileQueueSize);
worker.run();
System.out.println("Uploading files, press any key to stop.");
System.in.read();
// Gracefully halt the worker thread waiting for any ongoing uploads to finish
worker.finish();
// Exit the main thread only after the worker thread has terminated
worker.join();
}
}
工作线程将使用Semaphore 来限制发送到TransferManager 的上传数量,这是一个自定义文件名队列FileEnqueue不断从源目录中读取文件,以及一个 ProgressListener 来跟进每次上传的进度。如果循环用完了要从源目录读取的文件,它会等待十秒钟并重试。甚至文件队列也可能是不必要的。只需在工作线程的 while 循环中列出文件就足够了。
import java.io.File;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.Upload;
public class S3UploadWorkerThread extends Thread {
private final String sourceDir;
private final String targetBucket;
private final int maxQueueSize;
private final AmazonS3Client s3Client;
private Semaphore uploadLimiter;
private boolean running;
public final long SLEEP_WHEN_NO_FILES_AVAILABLE_MS = 10000l; // 10 seconds
public S3UploadWorkerThread(final String sourceDir, final String targetBucket, final int maxConcurrentUploads, final int maxQueueSize) {
this.running = false;
this.sourceDir = sourceDir.endsWith(File.separator) ? sourceDir: sourceDir + File.separator;
this.targetBucket = targetBucket;
this.maxQueueSize = maxQueueSize;
this.s3Client = S3ClientManager.buildS3Client();
this.uploadLimiter = new Semaphore(maxConcurrentUploads);
}
public void finish() {
running = false;
}
@Override
public void run() {
running = true;
final Map<String, Upload> ongoingUploads = new ConcurrentHashMap<>();
final FileEnqueue queue = new FileEnqueue(sourceDir, maxQueueSize);
final TransferManager tm = TransferManagerBuilder.standard().withS3Client(s3Client).build();
while (running) {
// Get a file name from the in memory queue
final String fileName = queue.poll();
if (fileName!=null) {
try {
// Limit the number of concurrent uploads
uploadLimiter.acquire();
File fileObj = new File(sourceDir + fileName);
// Create an upload listener
UploadListener onComplete = new UploadListener(fileObj, queue, ongoingUploads, uploadLimiter);
try {
Upload up = tm.upload(targetBucket, fileName, fileObj);
up.addProgressListener(onComplete);
// ongoingUploads is used later to wait for ongoing uploads in case a finish() is requested
ongoingUploads.put(fileName, up);
} catch (AmazonClientException e) {
System.err.println("AmazonClientException " + e.getMessage());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
// poll() returns null when the source directory is empty then wait for a number of seconds
try {
Thread.sleep(SLEEP_WHEN_NO_FILES_AVAILABLE_MS);
} catch (InterruptedException e) {
e.printStackTrace();
}
} // fi
} // wend
// Wait for ongoing uploads to finish before exiting ending the worker thread
for (Map.Entry<String,Upload> e : ongoingUploads.entrySet()) {
try {
e.getValue().waitForCompletion();
} catch (AmazonClientException | InterruptedException x) {
System.err.println(x.getClass().getName() + " at " + e.getKey());
}
} // next
tm.shutdownNow();
}
}
UploadListener 从 Semaphore 释放许可,在上传完成时通知文件队列,并跟踪正在进行的上传,如果有命令则必须等待用户请求停止。使用 ProgressListener,您可以单独跟踪每个成功或失败的上传。
import java.io.File;
import java.util.Map;
import java.util.concurrent.Semaphore;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.transfer.Upload;
public class UploadListener implements ProgressListener {
private final File fileObj;
private final FileEnqueue queue;
private final Map<String, Upload> ongoingUploads;
private final Semaphore uploadLimiter;
public UploadListener(File fileObj, FileEnqueue queue, Map<String, Upload> ongoingUploads, Semaphore uploadLimiter) {
this.fileObj = fileObj;
this.queue = queue;
this.ongoingUploads = ongoingUploads;
this.uploadLimiter = uploadLimiter;
}
@Override
public void progressChanged(ProgressEvent event) {
switch(event.getEventType()) {
case TRANSFER_STARTED_EVENT :
System.out.println("Started upload of file " + fileObj.getName());
break;
case TRANSFER_COMPLETED_EVENT:
/* Upon a successful upload:
* 1. Delete the file from disk
* 2. Notify the file name queue that the file is done
* 3. Remove it from the map of ongoing uploads
* 4. Release the semaphore permit
*/
fileObj.delete();
queue.done(fileObj.getName());
ongoingUploads.remove(fileObj.getName());
uploadLimiter.release();
System.out.println("Successfully finished upload of file " + fileObj.getName());
break;
case TRANSFER_FAILED_EVENT:
queue.done(fileObj.getName());
ongoingUploads.remove(fileObj.getName());
uploadLimiter.release();
System.err.println("Failed upload of file " + fileObj.getName());
break;
default:
// do nothing
}
}
}
这是文件队列的样例:
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.ConcurrentSkipListSet;
public class FileEnqueue {
private final String sourceDir;
private final ConcurrentSkipListSet<FileItem> seen;
private final ConcurrentSkipListSet<String> processing;
private final int maxSeenSize;
public FileEnqueue(final String sourceDirectory, int maxQueueSize) {
sourceDir = sourceDirectory;
maxSeenSize = maxQueueSize;
seen = new ConcurrentSkipListSet<FileItem>();
processing = new ConcurrentSkipListSet<>();
}
public synchronized String poll() {
if (seen.size()==0)
enqueueFiles();
FileItem fi = seen.pollFirst();
if (fi==null) {
return null;
} else {
processing.add(fi.getName());
return fi.getName();
}
}
public void done(final String fileName) {
processing.remove(fileName);
}
private void enqueueFiles() {
final FileFilter gzFilter = new GZFileFilter();
final File dir = new File(sourceDir);
if (!dir.exists() ) {
System.err.println("Directory " + sourceDir + " not found");
} else if (!dir.isDirectory() ) {
System.err.println(sourceDir + " is not a directory");
} else {
final File [] files = dir.listFiles(gzFilter);
if (files!=null) {
// How many more file names can we read in memory
final int spaceLeft = maxSeenSize - seen.size();
// How many new files will be read into memory
final int maxNewFiles = files.length<maxSeenSize ? files.length : spaceLeft;
for (int f=0, enqueued=0; f<files.length && enqueued<maxNewFiles; f++) {
File fl = files[f];
FileItem fi = new FileItem(fl);
// Do not put into the queue any file which has been already seen or is processing
if (!seen.contains(fi) && !processing.contains(fi.getName())) {
seen.add(fi);
enqueued++;
}
} // next
}
} // fi
}
private class GZFileFilter implements FileFilter {
@Override
public boolean accept(File f) {
final String fname = f.getName().toLowerCase();
return f.isFile() && fname.endsWith(".gz") && f.length()>0L;
}
}
}
最后是你的 S3ClientManager:
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
public class S3ClientManager {
public static AmazonS3Client buildS3Client() {
AWSCredentials credential = new ProfileCredentialsProvider("TRFAuditability-Prod-ServiceUser").getCredentials();
AmazonS3Client s3Client = (AmazonS3Client) AmazonS3ClientBuilder.standard().withRegion("us-east-1")
.withCredentials(new AWSStaticCredentialsProvider(credential)).withForceGlobalBucketAccessEnabled(true)
.build();
s3Client.getClientConfiguration().setMaxConnections(5000);
s3Client.getClientConfiguration().setConnectionTimeout(6000);
s3Client.getClientConfiguration().setSocketTimeout(30000);
return s3Client;
}
}
更新 30/04/2019 添加 FileItem 类
import java.io.File;
import java.util.Comparator;
public class FileItem implements Comparable {
private final String name;
private final long dateSeen;
public FileItem(final File file) {
this.name = file.getName();
this.dateSeen = System.currentTimeMillis();
}
public String getName() {
return name;
}
public long getDateSeen() {
return dateSeen;
}
@Override
public int compareTo(Object otherObj) {
FileItem otherFileItem = (FileItem) otherObj;
if (getDateSeen()==otherFileItem.getDateSeen())
return getName().compareTo(otherFileItem.getName());
else if (getDateSeen()<otherFileItem.getDateSeen())
return -1;
else
return 1;
}
@Override
public boolean equals(Object otherFile) {
return getName().equals(((FileItem) otherFile).getName());
}
@Override
public int hashCode() {
return getName().hashCode();
}
public static final class CompareFileItems implements Comparator {
@Override
public int compare(Object fileItem1, Object fileItem2) {
return ((FileItem) fileItem1).compareTo(fileItem2);
}
}
}
关于Java : Out Of Memory Error when my application runs for longer time,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55823313/
我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/
我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www
我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我
什么是ruby的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht
这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/
HashMap中为什么引入红黑树,而不是AVL树呢1.概述开始学习这个知识点之前我们需要知道,在JDK1.8以及之前,针对HashMap有什么不同。JDK1.7的时候,HashMap的底层实现是数组+链表JDK1.8的时候,HashMap的底层实现是数组+链表+红黑树我们要思考一个问题,为什么要从链表转为红黑树呢。首先先让我们了解下链表有什么不好???2.链表上述的截图其实就是链表的结构,我们来看下链表的增删改查的时间复杂度增:因为链表不是线性结构,所以每次添加的时候,只需要移动一个节点,所以可以理解为复杂度是N(1)删:算法时间复杂度跟增保持一致查:既然是非线性结构,所以查询某一个节点的时候
遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg
我基本上来自Java背景并且努力理解Ruby中的模运算。(5%3)(-5%3)(5%-3)(-5%-3)Java中的上述操作产生,2个-22个-2但在Ruby中,相同的表达式会产生21个-1-2.Ruby在逻辑上有多擅长这个?模块操作在Ruby中是如何实现的?如果将同一个操作定义为一个web服务,两个服务如何匹配逻辑。 最佳答案 在Java中,模运算的结果与被除数的符号相同。在Ruby中,它与除数的符号相同。remainder()在Ruby中与被除数的符号相同。您可能还想引用modulooperation.
Java的Collections.unmodifiableList和Collections.unmodifiableMap在Ruby标准API中是否有等价物? 最佳答案 使用freeze应用程序接口(interface):Preventsfurthermodificationstoobj.ARuntimeErrorwillberaisedifmodificationisattempted.Thereisnowaytounfreezeafrozenobject.SeealsoObject#frozen?.Thismethodretur
在Java中,可以像这样从一个字符串创建一个IO流:Readerr=newStringReader("mytext");我希望能够在Ruby中做同样的事情,这样我就可以获取一个字符串并将其视为一个IO流。 最佳答案 r=StringIO.new("mytext")和here'sthedocumentation. 关于java-Java的StringReader的Ruby等价物是什么?,我们在StackOverflow上找到一个类似的问题: https://st