您的位置:首页 > 科技 > IT业 > 东台网站建设_深圳装修公司口碑排名_微信小程序开发详细步骤_长春疫情最新情况

东台网站建设_深圳装修公司口碑排名_微信小程序开发详细步骤_长春疫情最新情况

2024/10/31 21:22:56 来源:https://blog.csdn.net/csdnliuxin123524/article/details/143244361  浏览:    关键词:东台网站建设_深圳装修公司口碑排名_微信小程序开发详细步骤_长春疫情最新情况
东台网站建设_深圳装修公司口碑排名_微信小程序开发详细步骤_长春疫情最新情况

背景:读取10G csv文件,然后 根据交易种类分分组,找出每个交易种类的交易最大值,最后给出最大的那个交易信息。

难点:最主要的是怎么快速读文件?

涉及的功能点:MappedByteBuffer 读文件方式、异步返回处理、treeset排序、线程池处理、分段切割排序

处理方式:

1,使用 MappedByteBuffer 读文件 。这里最主要是怎么提取csv中需要的列,怎么划分行,很麻烦(根据\r 字符识别换行,根据,逗号识别列)(漏洞:这里没有处理切割临界数据,太麻烦了

2,多线程分块读取  (可以分块读取的前提是:可以指定文件内容下标读取)

3,把所有文件放到集合中

4,分组处理,异步 在每个分组中找出最大值(如果每个组的数据很多,那么且是单向比较,可以分段找出最大值)

5,最终比较每个分组的最大值。就是最终结果

MappedByteBuffer 可以先获取文件内容的总长度,然后根据机器线程处理核数,把文件分割成对应的核数个数。然后在各自的线程中做单线程处理。

这样就是最快的。重复利用了机器的处理能力。

线程的数量不能多,否则即使多了,只会徒增线程切换带来的消耗,而不能提高性能。

核心代码:

public static void main(String[] args) throws Exception {long startTime = System.currentTimeMillis();if (args.length == 0) {path = "D:\\RaceFile\\0002.csv";
//            path = "D:\\RaceFile\\marketdata\\marketdata.csv";} else {path = args[0];}raf = new RandomAccessFile(new File(path), "r");FileChannel channel = raf.getChannel();//文件内容总长度long fileSize = channel.size();// 获取头文件,把文件头剔除掉MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, 3000);int headIndex = readHead(buffer, fileSize);//分割的每个文件的长度long preSize = (long) Math.ceil((fileSize - headIndex) / threadNum);ArrayList<FutureTask> threadList = new ArrayList(threadNum);// 异步读取每个分区,通过分割个数*文件长度 就可以计算出每个分割块的读取起始下标。for (int i = 0; i < threadNum; i++) {int finalI = i;//异步读文件处理Callable callable1 = () -> {return  read(preSize * finalI, preSize);};FutureTask futureTask1 = new FutureTask(callable1);threadList.add(futureTask1);new Thread(futureTask1).start();}//使用callable获取每个文件的读取结果  汇总到一个集合中List<HashMap<String, Trade>> trade2ListPre =new ArrayList(threadNum);for (int i = 0; i < threadList.size(); i++) {trade2ListPre.add((HashMap<String, Trade>) threadList.get(i).get());}HashMap<String, Trade> allData = new HashMap<>();trade2ListPre.stream().forEach(m ->{allData.putAll(m);});long end = System.currentTimeMillis();
//        System.out.println(end - startTime);// 遍历所有数据,分组处理HashMap<String, TreeSet<Trade>> hashMap = new HashMap();for (HashMap.Entry<String, Trade> entry : allData.entrySet()) {if (entry == null) {continue;}String key = entry.getKey().substring(0, entry.getKey().indexOf(MARK));if (hashMap.containsKey(key) && hashMap.get(key) != null) {hashMap.get(key).add(entry.getValue());} else {TreeSet<Trade> tradeTreeSet = new TreeSet();tradeTreeSet.add(entry.getValue());hashMap.put(key, tradeTreeSet);}}//再次异步,在每个分组数据中,循环找出最大值hashMap.keySet().stream().forEach(m -> {threadPool2.execute(() -> {TreeSet<Trade> treeSet = hashMap.get(m);Trade maxTrade = treeSet.first();Trade preTrade = treeSet.first();for (Trade curTrade : treeSet) {int volume = curTrade.volume - preTrade.volume;preTrade = curTrade;if (volume > maxTrade.volume) {maxTrade = new Trade(curTrade.time, curTrade.updateTime, curTrade.updateMillisec,curTrade.volume, curTrade.exchangeID, curTrade.instrumentID);maxTrade.volume = volume;}}if (result.volume < maxTrade.volume) {result = maxTrade;}});});//线程池为空的时候 关闭线程池,也意味着数据处理完毕threadPool2.shutdown();while (!threadPool2.isTerminated()) {}end = System.currentTimeMillis();
//        System.out.println(end - startTime);//输出最终结果System.out.println(result);}

所有代码:

package test3;import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.*;public class Solution1 {static int threadNum =  Runtime.getRuntime().availableProcessors();static RandomAccessFile raf;static String path;static Trade result = new Trade();static ThreadPoolExecutor threadPool2 =new ThreadPoolExecutor(threadNum, threadNum, 10, TimeUnit.MILLISECONDS, new LinkedBlockingDeque());public static void main(String[] args) throws Exception {long startTime = System.currentTimeMillis();if (args.length == 0) {path = "D:\\RaceFile\\0002.csv";
//            path = "D:\\RaceFile\\marketdata\\marketdata.csv";} else {path = args[0];}raf = new RandomAccessFile(new File(path), "r");FileChannel channel = raf.getChannel();//文件内容总长度long fileSize = channel.size();// 获取头文件,把文件头剔除掉MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, 3000);int headIndex = readHead(buffer, fileSize);//分割的每个文件的长度long preSize = (long) Math.ceil((fileSize - headIndex) / threadNum);ArrayList<FutureTask> threadList = new ArrayList(threadNum);// 异步读取每个分区,通过分割个数*文件长度 就可以计算出每个分割块的读取起始下标。for (int i = 0; i < threadNum; i++) {int finalI = i;//异步读文件处理Callable callable1 = () -> {return  read(preSize * finalI, preSize);};FutureTask futureTask1 = new FutureTask(callable1);threadList.add(futureTask1);new Thread(futureTask1).start();}//使用callable获取每个文件的读取结果  汇总到一个集合中List<HashMap<String, Trade>> trade2ListPre =new ArrayList(threadNum);for (int i = 0; i < threadList.size(); i++) {trade2ListPre.add((HashMap<String, Trade>) threadList.get(i).get());}HashMap<String, Trade> allData = new HashMap<>();trade2ListPre.stream().forEach(m ->{allData.putAll(m);});long end = System.currentTimeMillis();
//        System.out.println(end - startTime);// 遍历所有数据,分组处理HashMap<String, TreeSet<Trade>> hashMap = new HashMap();for (HashMap.Entry<String, Trade> entry : allData.entrySet()) {if (entry == null) {continue;}String key = entry.getKey().substring(0, entry.getKey().indexOf(MARK));if (hashMap.containsKey(key) && hashMap.get(key) != null) {hashMap.get(key).add(entry.getValue());} else {TreeSet<Trade> tradeTreeSet = new TreeSet();tradeTreeSet.add(entry.getValue());hashMap.put(key, tradeTreeSet);}}//再次异步,在每个分组数据中,循环找出最大值hashMap.keySet().stream().forEach(m -> {threadPool2.execute(() -> {TreeSet<Trade> treeSet = hashMap.get(m);Trade maxTrade = treeSet.first();Trade preTrade = treeSet.first();for (Trade curTrade : treeSet) {int volume = curTrade.volume - preTrade.volume;preTrade = curTrade;if (volume > maxTrade.volume) {maxTrade = new Trade(curTrade.time, curTrade.updateTime, curTrade.updateMillisec,curTrade.volume, curTrade.exchangeID, curTrade.instrumentID);maxTrade.volume = volume;}}if (result.volume < maxTrade.volume) {result = maxTrade;}});});//线程池为空的时候 关闭线程池,也意味着数据处理完毕threadPool2.shutdown();while (!threadPool2.isTerminated()) {}end = System.currentTimeMillis();
//        System.out.println(end - startTime);//输出最终结果System.out.println(result);}/**MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, start, end-3);StandardCharsets.UTF_8.decode(buffer).toString()*/public static HashMap<String, Trade> read(long position, long size) throws IOException {FileChannel channel = raf.getChannel();MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, position, size);return readLine(buffer, position, 0, size);}public static HashMap<String, Trade> readLine(MappedByteBuffer buffer, long position, int start, long end) {HashMap<String, Trade> instruTimeGroupMap = new HashMap<>();StringBuilder lineBuilder = new StringBuilder();int countComma = 0;List<String> dataItem = new ArrayList<>();dataItem.add(String.valueOf(position));for (int i = start + 1; i < end - 1; i++) {byte b = buffer.get();if (b == '\n') {try {if (dataItem.size() == 6) {handleData(dataItem, instruTimeGroupMap,i);}} catch (Exception e) {
//                    e.printStackTrace();} finally {countComma = 0;dataItem.clear();dataItem.add(String.valueOf(position));}}//获取需要的列if(b == ','){countComma++;if(countComma==11 || countComma==20|| countComma==21|| countComma==22|| countComma==45){dataItem.add(lineBuilder.toString());}lineBuilder.setLength(0);}else{lineBuilder.append((char) b);}}return instruTimeGroupMap;}static final String MARK = "_$_";public static void handleData(List<String> columns, HashMap<String, Trade> instruTimeGroupMap,int offset) {//0 19 20 21 22 23 24 25 45String minute =columns.get(2).substring(0, 5);StringBuilder sb = new StringBuilder();sb.append(columns.get(4)).append(MARK).append(minute);String key = sb.toString();Trade trade =new Trade(columns.get(2), columns.get(3), Integer.parseInt(columns.get(1)),columns.get(5), columns.get(4), Long.parseLong(columns.get(0)),offset);//同一个合约 保留position最大的instruTimeGroupMap.put(key, trade);}public static int readHead(MappedByteBuffer buffer, long size) throws IOException {for (int i = 0; i < size; i++) {byte b = buffer.get(i);if (b == '\n') {return i;}}return 0;}
}class Trade implements Comparable<Trade> {long position;int offset;String time;String updateTime;String updateMillisec;int volume;String exchangeID;String instrumentID;public Trade(){}public Trade(String updateTime, String updateMillisec, int volume, String exchangeID, String instrumentID) {this.time = updateTime.substring(0, 5);this.updateTime = updateTime;this.updateMillisec = updateMillisec;this.volume = volume;this.exchangeID = exchangeID;this.instrumentID = instrumentID;}public Trade(String updateTime, String updateMillisec, int volume, String exchangeID, String instrumentID,long position,int offset) {this.time = updateTime.substring(0, 5);this.updateTime = updateTime;this.updateMillisec = updateMillisec;this.volume = volume;this.exchangeID = exchangeID;this.instrumentID = instrumentID;this.position=position;this.offset=offset;}public Trade(String time, String updateTime, String updateMillisec, int volume, String exchangeID,String instrumentID) {this.time = time;this.updateTime = updateTime;this.updateMillisec = updateMillisec;this.volume = volume;this.exchangeID = exchangeID;this.instrumentID = instrumentID;}@Overridepublic int compareTo(Trade o) {if(this.position-o.position >0){return 1;}if(this.position-o.position ==0 && this.offset>o.offset){return 1;}return -1;}@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append(this.time).append(",").append(this.exchangeID).append(",").append(this.volume).append(",").append(this.instrumentID);return sb.toString();}
}class BadData implements Comparable<BadData>{long position;int size;@Overridepublic int compareTo(BadData o) {if(this.position>o.position){return 1;}return -1;}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com