日志分析(二)
要求分析日志统计出:
c.y.c.p.s.impl.UserService.apply:130-2172517-48,696 KB
可以直观看出这行日志打印了多少数据
LogParAnalyzer2
public class LogParAnalyzer2 {//日志原始文件private File log;private List<Pattern> list;private ExecutorService executorService;//生成的分割文件private String subPath = "D:\\split\\";private List<File> files;public LogParAnalyzer2(File log, List<String> patterns) {this.log = log;executorService = Executors.newFixedThreadPool(30);list = new ArrayList<>();try {for (String pattern : patterns) {Pattern p = Pattern.compile(pattern);list.add(p);}} catch (Exception e) {throw new RuntimeException(e);}}public void analyze() throws Exception {// 使用 try-with-resources 自动关闭 BufferedReaderint chunkSize = 100000;try (BufferedReader reader = new BufferedReader(new FileReader(log))) {File file = new File(subPath);if (!file.exists()) {file.mkdirs();}String line;List<CompletableFuture<?>> task = new ArrayList<>();int cur = 0;List<String> list = new ArrayList<>();AtomicInteger batch = new AtomicInteger(0);while ((line = reader.readLine()) != null) {//sb 会通过Arrays.copy复制字节数组,内存频繁复制list.add(line);cur++;if ((cur % chunkSize) == 0) {//深拷贝List<String> tt = list.stream().map(String::new).collect(Collectors.toList());list.clear();CompletableFuture f =CompletableFuture.runAsync(() -> processChunk(tt, batch.get()), executorService);task.add(f);batch.incrementAndGet();}}if (list.size()>0) {CompletableFuture f =CompletableFuture.runAsync(() -> processChunk(list, batch.get()), executorService);task.add(f);}//等待所有任务结束CompletableFuture.allOf(task.toArray(new CompletableFuture[0])).get();System.out.println("task execute finished");}}private void processChunk(List<String> lines, int batch) {try {System.out.println(Thread.currentThread().getName()+" execute "+ batch+".txt start");Map<String, LogLine> map = new HashMap<>();try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(subPath + batch + ".txt"))) {lines.forEach(line -> {for (Pattern pattern : list) {Matcher matcher = pattern.matcher(line);if (matcher.find()) {String method = matcher.group(1);String message = matcher.group(2);LogLine ll = map.computeIfAbsent(method, k -> {LogLine logLine = new LogLine();logLine.setCnt(new AtomicInteger(0));logLine.setSize(0);return logLine;});ll.getCnt().incrementAndGet();ll.setSize(ll.getSize()+message.length());}}if (map.size() > 0) {//每个文件只保存100前100条writeBatchToFile(writer, map);}});}System.out.println(Thread.currentThread().getName()+" execute "+ batch+".txt end");} catch (Exception e) {e.printStackTrace();}}private void writeBatchToFile(BufferedWriter writer, Map<String, LogLine> map) {Map<String, LogLine> limit = limit(map, 100);try {for (Map.Entry<String, LogLine> entry : limit.entrySet()) {LogLine value = entry.getValue();writer.write(entry.getKey() + "=" + value.getCnt()+"="+value.getSize());writer.newLine();}} catch (IOException e) {e.printStackTrace();}//清除缓存map.clear();limit.clear();}public void mergeAndSort() throws Exception {files = Files.list(Paths.get(subPath)).map(Path::toFile).filter(f -> f.length() > 0).collect(Collectors.toList());// 创建 ForkJoinPoolForkJoinPool forkJoinPool = new ForkJoinPool();MergeFileTask2 mergeFileTask = new MergeFileTask2(files.toArray(new File[0]), forkJoinPool);Path finalPath = mergeFileTask.invoke();System.out.println("final path: " + finalPath.toAbsolutePath());try (BufferedReader reader = Files.newBufferedReader(finalPath)) {String line;while ((line = reader.readLine()) != null) {String[] split = line.split("=");long l = Long.valueOf(split[2]) / 1024;System.out.println(MessageFormat.format("{0}-{1}-{2} KB", split[0],split[1],l));}}mergeFileTask.finished();}public void finished() throws IOException {if (!CollectionUtils.isEmpty(files)){files.stream().parallel().forEach(File::delete);}Files.deleteIfExists(Paths.get(subPath));}public Map<String, LogLine> limit(Map<String, LogLine> map, int limit) {return map.entrySet().stream().sorted(Map.Entry.comparingByValue((o1, o2) -> o2.getCnt().get()-o1.getCnt().get())).limit(limit).collect(Collectors.toMap(Map.Entry::getKey,Map.Entry::getValue,(oldValue, newValue) -> oldValue, // 解决键冲突LinkedHashMap::new));}}
MergeFileTask2
public class MergeFileTask2 extends RecursiveTask<Path> {private File[] files;private ForkJoinPool forkJoinPool;private String tmp = "d:\\cc\\";public MergeFileTask2(File[] files, ForkJoinPool forkJoinPool) {this.files = files;this.forkJoinPool = forkJoinPool;File file = new File(tmp);if (!file.exists()) {file.mkdir();}}@Overrideprotected Path compute() {if (files.length <= 1 && files.length > 0) {//只有一个文件时就返回路径就行return files[0].toPath();} else {//如果大于两个文件就合并int mid = files.length / 2;MergeFileTask2 left = new MergeFileTask2(Arrays.copyOfRange(files, 0, mid), forkJoinPool);MergeFileTask2 right = new MergeFileTask2(Arrays.copyOfRange(files, mid, files.length), forkJoinPool);invokeAll(left, right);Path leftResult = left.join();Path rightResult = right.join();//合并两个文件return mergeTwoFiles(leftResult, rightResult);}}private Path mergeTwoFiles(Path leftResult, Path rightResult) {try {Path tempFile = Files.createTempFile(Paths.get(tmp), "merged-", ".txt");mergeToOne(leftResult, rightResult, tempFile);return tempFile;} catch (Exception e) {throw new RuntimeException(e);}}private void mergeToOne(Path leftResult, Path rightResult, Path tempFile) throws Exception {Map<String, LogLine> map = new ConcurrentHashMap<>();try (BufferedReader leftReader = Files.newBufferedReader(leftResult)) {mergeReader(map, leftReader);}try (BufferedReader rightReader = Files.newBufferedReader(rightResult)) {mergeReader(map, rightReader);}//排序取前100条,写入临时文件Map<String, LogLine> limit = limit(map, 100);try (BufferedWriter writer = Files.newBufferedWriter(tempFile)) {writeBatchToFile(writer, limit);}}/*** 写入临时文件** @param writer* @param map* @throws Exception*/private void writeBatchToFile(BufferedWriter writer, Map<String, LogLine> map) throws Exception {for (Map.Entry<String, LogLine> entry : map.entrySet()) {writer.write(entry.getKey() + "=" + entry.getValue().getCnt().get() + "=" + entry.getValue().getSize());writer.newLine();}}/*** 排序取前 limit** @param map* @param limit* @return*/public Map<String, LogLine> limit(Map<String, LogLine> map, int limit) {if (map.size() <= limit) {return map;}// 排序并过滤结果return map.entrySet().stream().sorted(Map.Entry.comparingByValue((o1, o2) -> o2.getCnt().get() - o1.getCnt().get())).limit(limit).collect(Collectors.toMap(Map.Entry::getKey,Map.Entry::getValue,(oldValue, newValue) -> oldValue, // 解决键冲突LinkedHashMap::new));}private static void mergeReader(Map<String, LogLine> map, BufferedReader reader) throws IOException {String line = null;while ((line = reader.readLine()) != null) {String[] split = line.split("=");int val = Integer.parseInt(split[1]);int size = Integer.parseInt(split[2]);//合并重复的keymap.merge(split[0], new LogLine(new AtomicInteger(val), size), (a, b) -> {a.getCnt().addAndGet(b.getCnt().get());a.setSize(a.getSize() + b.getSize());return a;});}}public void finished() {for (File file : new File(tmp).listFiles()) {file.delete();}new File(tmp).delete();}
}
LogLine
@Data
@NoArgsConstructor
@AllArgsConstructor
public class LogLine {//计数private AtomicInteger cnt;//日志大小private long size;
}
测试
@Testpublic void ccd() throws Exception {StopWatch stopWatch = new StopWatch();stopWatch.start();//(com.example.[\w*.*]*:\d*)File log = new File("E:\\log.log");//2023-09-26 11:10:00.123 INFO - none --- [main] com.example.service.UserService.create:42 - User service started successfully.//配置出 com.example.service.UserService.create:42 和 - User service started successfully.List<String> list = Arrays.asList("(com\\.abc\\.[\\w\\.\\*]*:\\d*) (.*)", "(c\\.y\\.c\\.[\\w\\.\\*]*:\\d*) (.*)");LogParAnalyzer2 logAnalyzer = new LogParAnalyzer2(log, list);logAnalyzer.analyze();logAnalyzer.mergeAndSort();logAnalyzer.finished();stopWatch.stop();System.out.println(stopWatch.prettyPrint());//c.y.c.s.service.impl.UserService.apply:98 count: 6: 3 KB}