chapter05(线程池-多用户服务器)
教学与实践目的
- 学会服务器支持多用户并发访问的程序设计技术。
多用户服务器是指服务器能同时支持多个用户并发访问服务器所提供的服务资源,如聊天服务、文件传输等。
第二讲的TCPServer是单用户版本,每次只能和一个用户对话。(请仔细阅读TCPServer.java 程序,了解其中原理,找出关键语句),只有前一个用户 退出后,后面的用户才能完成服务器连接。
允许多个实例运行
原因
:服务器的主进程一次只能处理一个客户,其它已连接的客户等候在 监听队列中。
设计思路
- 解决思路就是用
多线程
。
服务器可能面临很多客户的并发连接,这种情况的方案一般是:主线程只负责监听客户请求和接受连接请求,用一个线程专门负责和一个客户对话,即一个客户请求成功后,创建一个新线程来专门负责该客户。
对于这种多用户的情况,用第三讲的方式new Thread创建线程,频繁创建大量线程需要消耗大量系统资源。
对于服务器,一般是使用线程池来管理和复用线程。线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。
- ExecutorService 代表线程池,其创建方式常见的有两种:
ExecutorService executorService = Executors.newFixedThreadPool(n);
ExecutorService executorService = Executors. newCachedThreadPool( );
创建后,就可以使用executorService.execute
方法来取出一个线程执行, 该方法的参数就是Runnable接口类型。我们可以将和客户对话部分的代码抽 取到一个Runnable的实现类 Handler(见附录)的run方法中,然后丢给线程 池去执行。方便起见,Handler作为主程序的内部类是个不错的选择。
线程池
Java中的线程池是一种执行器(Executor)
,用于在一个后台线程中执行任务。线程池的主要目的是减少在创建和销毁线程时所产生的性能开销。通过重用已经创建的线程来执行新的任务,线程池提高了程序的响应速度,并且提供了更好的系统资源管理。
Java通过java.util.concurrent
包中的Executor
框架提供了线程池的实现。以下是线程池的一些关键概念:
-
核心线程数(Core Pool Size):线程池中始终保持的线程数量,即使它们处于空闲状态。
-
最大线程数(Maximum Pool Size):线程池中允许的最大线程数量。
-
工作队列(Work Queue):用于存放待执行任务的阻塞队列。
-
线程工厂(Thread Factory):用于创建新线程的工厂。
-
拒绝策略(Rejected Execution Handler):当任务太多,无法被线程池及时处理时,采取的策略。
-
保持活动时间(Keep Alive Time):非核心线程空闲时在终止前等待新任务的最长时间。
-
时间单位(Time Unit):保持活动时间的时间单位。
Java 提供了几种预定义的线程池:
- FixedThreadPool:拥有固定数量线程的线程池。
- CachedThreadPool:根据需要创建新线程的线程池,对于短生命周期的异步任务非常合适。
- SingleThreadExecutor:只有一个线程的线程池,保证所有任务按顺序执行。
- ScheduledThreadPool:用于延迟执行或定期执行任务的线程池。
创建线程池的一般方式是使用Executors
工厂类:
// 创建一个拥有固定线程数量的线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);// 创建一个可根据需要创建新线程的线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();// 创建一个单线程的线程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();// 创建一个可定时执行任务的线程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4);
使用线程池执行任务:
// 提交一个Runnable任务
fixedThreadPool.execute(new Runnable() {public void run() {// 任务代码}
});// 提交一个Callable任务,并获取Future对象
Future<String> future = fixedThreadPool.submit(new Callable<String>() {public String call() {// 任务代码return "result";}
});
关闭线程池:
// 关闭线程池,不接受新任务,但已提交的任务会继续执行
fixedThreadPool.shutdown();// 关闭线程池,不接受新任务,并且会尝试停止所有正在执行的任务
fixedThreadPool.shutdownNow();
线程池是Java并发编程中非常重要的一部分,合理使用线程池可以显著提高程序性能和资源利用率。
实现代码
TCPClientThreadFX.java
import javafx.application.Application;
import javafx.application.Platform;
import javafx.geometry.Insets;
import javafx.geometry.Pos;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.Label;
import javafx.scene.control.TextArea;
import javafx.scene.control.TextField;
import javafx.scene.input.KeyCode;
import javafx.scene.layout.BorderPane;
import javafx.scene.layout.HBox;
import javafx.scene.layout.Priority;
import javafx.scene.layout.VBox;
import javafx.stage.Stage;public class TCPClientThreadFX extends Application {private final Button btnCon = new Button("连接");private final Button btnExit = new Button("退出");private final Button btnSend = new Button("发送");private final TextField IpAdd_input = new TextField();private final TextField Port_input = new TextField();private final TextArea OutputArea = new TextArea();private final TextField InputField = new TextField();private TCPClient tcpClient;private Thread receiveThread;public static void main(String[] args) {launch(args);}public void start(Stage primaryStage) {btnSend.setDisable(true);BorderPane mainPane = new BorderPane();VBox mainVBox = new VBox();HBox hBox = new HBox();hBox.setSpacing(10);hBox.setPadding(new Insets(20, 20, 10, 20));hBox.getChildren().addAll(new Label("IP地址: "), IpAdd_input, new Label("端口: "), Port_input, btnCon);hBox.setAlignment(Pos.TOP_CENTER);VBox vBox = new VBox();vBox.setSpacing(10);vBox.setPadding(new Insets(10, 20, 10, 20));vBox.getChildren().addAll(new Label("信息显示区:"), OutputArea, new Label("信息输入区"), InputField);VBox.setVgrow(OutputArea, Priority.ALWAYS);OutputArea.setEditable(false);OutputArea.setStyle("-fx-wrap-text: true; -fx-font-size: 14px;");InputField.setOnKeyPressed(event -> {if (event.getCode() == KeyCode.ENTER) {btnSend.fire();}});HBox hBox2 = new HBox();hBox2.setSpacing(10);hBox2.setPadding(new Insets(10, 20, 10, 20));btnCon.setOnAction(event -> {String ip = IpAdd_input.getText().trim();String port = Port_input.getText().trim();btnCon.setDisable(true);try {tcpClient = new TCPClient(ip, port);receiveThread = new Thread(() -> {String msg;while ((msg = tcpClient.receive()) != null) {String msgTemp = msg;Platform.runLater(() -> {OutputArea.appendText(msgTemp + "\n");});}Platform.runLater(() -> {OutputArea.appendText("对话已关闭!\n");});});receiveThread.start();btnSend.setDisable(false);} catch (Exception e) {OutputArea.appendText("服务器连接失败!" + e.getMessage() + "\n");}});btnExit.setOnAction(event -> {if (tcpClient != null) {tcpClient.send("bye");try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}tcpClient.close();btnSend.setDisable(true);}System.exit(0);});btnSend.setOnAction(event -> {String sendMsg = InputField.getText();tcpClient.send(sendMsg);InputField.clear();OutputArea.appendText("客户端发送:" + sendMsg + "\n");});hBox2.setAlignment(Pos.CENTER_RIGHT);hBox2.getChildren().addAll(btnSend, btnExit);mainVBox.getChildren().addAll(hBox, vBox, hBox2);VBox.setVgrow(vBox, Priority.ALWAYS);mainPane.setCenter(mainVBox);Scene scene = new Scene(mainPane, 700, 400);IpAdd_input.setText("127.0.0.1");Port_input.setText("8080");primaryStage.setScene(scene);primaryStage.show();}
}
TCPClient.java
import java.io.*;
import java.net.Socket;
import java.nio.charset.StandardCharsets;public class TCPClient {private final Socket socket; // 定义套接字private final PrintWriter pw; // 定义字符输出流private final BufferedReader br; // 定义字符输入流public TCPClient(String ip, String port) throws IOException {// 主动向服务器发起连接,实现TCP的三次握手过程// 如果不成功,则抛出错误信息,其错误信息交由调用者处理socket = new Socket(ip, Integer.parseInt(port));// 得到网络输出字节流地址,并封装成网络输出字符流// 设置最后一个参数为true,表示自动flush数据OutputStream socketOut = socket.getOutputStream();pw = new PrintWriter(new OutputStreamWriter(socketOut, StandardCharsets.UTF_8), true);// 得到网络输入字节流地址,并封装成网络输入字符流InputStream socketIn = socket.getInputStream();br = new BufferedReader(new InputStreamReader(socketIn, StandardCharsets.UTF_8));}public void send(String msg) {// 输出字符流,由Socket调用系统底层函数,经网卡发送字节流pw.println(msg);}public String receive() {String msg = null;try {// 从网络输入字符流中读信息,每次只能接收一行信息// 如果不够一行(无行结束符),则该语句阻塞等待msg = br.readLine();} catch (IOException e) {e.printStackTrace();}return msg;}// 实现close方法以关闭socket连接及相关的输入输出流public void close() {try {if (pw != null) {pw.close(); // 关闭PrintWriter会先flush再关闭底层流}if (br != null) {br.close(); // 关闭BufferedReader}if (socket != null) {socket.close(); // 关闭Socket连接}} catch (IOException e) {e.printStackTrace();}}
}
TCPServer.java(主要修改线程池部分)
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;public class TCPServer {private final int port; // 服务器监听端口号private final ServerSocket serverSocket; //定义服务器套接字// 创建线程池private final ExecutorService executorService = Executors.newCachedThreadPool();public TCPServer() throws IOException {port = 8080; // 例如,使用8080作为默认端口serverSocket = new ServerSocket(port);System.out.println("服务器启动监听在 " + port + " 端口");}public static void main(String[] args) throws IOException {TCPServer server = new TCPServer();System.out.println("服务器将监听端口号: " + server.port);server.Service();}private PrintWriter getWriter(Socket socket) throws IOException {//获得输出流缓冲区的地址OutputStream socketOut = socket.getOutputStream();//网络流写出需要使用flush,这里在PrintWriter构造方法中直接设置为自动flushreturn new PrintWriter(new OutputStreamWriter(socketOut, StandardCharsets.UTF_8), true);}private BufferedReader getReader(Socket socket) throws IOException {//获得输入流缓冲区的地址InputStream socketIn = socket.getInputStream();return new BufferedReader(new InputStreamReader(socketIn, StandardCharsets.UTF_8));}class ThreadHandler implements Runnable {private final Socket socket;public ThreadHandler(Socket socket) {this.socket = socket;}@Overridepublic void run() {//本地服务器控制台显示客户端连接的用户信息System.out.println("New connection accepted: " + socket.getInetAddress().getHostAddress());try {BufferedReader br = getReader(socket);//定义字符串输入流PrintWriter pw = getWriter(socket);//定义字符串输出流//客户端正常连接成功,则发送服务器的欢迎信息,然后等待客户发送信息pw.println("From 服务器:欢迎使用本服务!");String msg = null;//此处程序阻塞,每次从输入流中读入一行字符串while ((msg = br.readLine()) != null) {//如果客户发送的消息为"bye",就结束通信if (msg.equalsIgnoreCase("bye")) {//向输出流中输出一行字符串,远程客户端可以读取该字符串pw.println("From服务器:服务器断开连接,结束服务!");System.out.println("客户端离开");//向输出流中输出一行字符串,远程客户端可以读取该字符串break; //结束循环}pw.println("From服务器:" + msg);}} catch (IOException e) {e.printStackTrace();} finally {try {if (socket != null)socket.close(); //关闭socket连接及相关的输入输出流} catch (IOException e) {e.printStackTrace();}}}}//单客户版本,即每一次只能与一个客户建立通信连接public void Service() {while (true) {Socket socket = null;try {//此处程序阻塞等待,监听并等待客户发起连接,有连接请求就生成一个套接字。socket = serverSocket.accept();Thread t = new Thread(new ThreadHandler(socket));executorService.execute(t);} catch (IOException e) {throw new RuntimeException(e);}}}
}
群组聊天功能
数据结构之Set集合
在 Java 中,Set
是一种集合类型,用于存储不重复的元素。Set
接口是 Java Collections Framework 的一部分,主要用于表示不允许重复元素的集合。Set
接口的主要实现类有:
-
HashSet:最常用的
Set
实现,基于哈希表实现,具有较快的查找速度。它不保证元素的迭代顺序,可能会随时间而变化。Set<String> hashSet = new HashSet<>(); hashSet.add("Apple"); hashSet.add("Banana"); hashSet.add("Orange");
-
LinkedHashSet:继承自
HashSet
,维护插入元素的顺序。这意味着在迭代元素时,会按照插入的顺序返回。Set<String> linkedHashSet = new LinkedHashSet<>(); linkedHashSet.add("Apple"); linkedHashSet.add("Banana"); linkedHashSet.add("Orange");
-
TreeSet:基于红黑树实现的
Set
,它按自然顺序或通过构造函数提供的比较器进行排序。插入和删除操作的时间复杂度为 O(log n)。Set<String> treeSet = new TreeSet<>(); treeSet.add("Apple"); treeSet.add("Banana"); treeSet.add("Orange");
特点
- 集合中的元素是唯一的,不允许重复。
Set
不提供按索引访问元素的功能。- 可以使用迭代器遍历集合中的元素。
使用场景
- 当需要存储不重复的元素时,如用户ID、唯一的商品代码等。
- 常用于需要检索某个元素是否存在的情况。
在 Java 中,Set
接口提供了多种方法来操作集合。以下是一些常用的 Set
方法,以及它们的具体说明和示例:
常用方法
-
add(E e): 将指定元素添加到集合中,如果集合中已存在该元素,则不做任何操作。
Set<String> set = new HashSet<>(); set.add("Apple"); set.add("Banana"); set.add("Apple"); // 不会重复添加
-
remove(Object o): 从集合中移除指定的元素,如果成功移除则返回 true。
set.remove("Banana"); // 移除 "Banana"
-
contains(Object o): 检查集合是否包含指定的元素,如果包含返回 true。
boolean hasApple = set.contains("Apple"); // 返回 true
-
size(): 返回集合中的元素个数。
int size = set.size(); // 返回 2,因为 "Banana" 已被移除
-
isEmpty(): 检查集合是否为空,如果集合没有元素则返回 true。
boolean isEmpty = set.isEmpty(); // 返回 false
-
clear(): 移除集合中的所有元素。
set.clear(); // 清空集合
-
iterator(): 返回集合的迭代器,可以用于遍历集合中的元素。
Iterator<String> iterator = set.iterator(); while (iterator.hasNext()) {System.out.println(iterator.next()); }
-
addAll(Collection<? extends E> c): 将指定集合中的所有元素添加到当前集合中。
Set<String> anotherSet = new HashSet<>(); anotherSet.add("Cherry"); anotherSet.add("Date"); set.addAll(anotherSet); // 将 anotherSet 的元素添加到 set 中
-
retainAll(Collection<?> c): 只保留当前集合中包含的指定集合中的元素,移除其他的元素。
Set<String> keepSet = new HashSet<>(); keepSet.add("Apple"); set.retainAll(keepSet); // 只保留 "Apple"
-
removeAll(Collection<?> c): 从当前集合中移除指定集合中的所有元素。
set.removeAll(anotherSet); // 移除 anotherSet 中的所有元素
示例代码
以下是一个综合实例,展示如何使用 Java 中的 Set
和它的方法:
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;public class SetExample {public static void main(String[] args) {Set<String> set = new HashSet<>();// 添加元素set.add("Apple");set.add("Banana");set.add("Cherry");// 检查是否包含System.out.println("Contains Apple: " + set.contains("Apple"));// 输出集合大小System.out.println("Size: " + set.size());// 遍历集合Iterator<String> iterator = set.iterator();System.out.println("Elements in the set:");while (iterator.hasNext()) {System.out.println(iterator.next());}// 移除元素set.remove("Banana");System.out.println("After removing Banana, size: " + set.size());// 清空集合set.clear();System.out.println("After clearing, is empty: " + set.isEmpty());}
}
线程安全的集合CopyOnWriteArraySet
在Java中,CopyOnWriteArraySet
是 java.util
包下的一个线程安全的变体,它继承自 CopyOnWriteArrayList
。这个集合类适用于读多写少的场景,因为每次修改(添加、删除等)都会复制整个底层数组,这可能会导致写操作变得非常昂贵,尤其是在集合元素很多的情况下。
CopyOnWriteArraySet
维护了一个无序的元素集合,并且不允许元素重复。由于它是基于 CopyOnWriteArrayList
实现的,所以它的方法和 CopyOnWriteArrayList
相似,只是它额外确保了元素的唯一性。
下面是一些使用 CopyOnWriteArraySet
的基本示例:
import java.util.concurrent.CopyOnWriteArraySet;public class Example {public static void main(String[] args) {// 创建一个CopyOnWriteArraySet集合CopyOnWriteArraySet<Socket> sockets = new CopyOnWriteArraySet<>();// 添加元素sockets.add(new Socket(/* 参数 */));sockets.add(new Socket(/* 参数 */));// 迭代集合for (Socket socket : sockets) {// 做一些操作}// 删除元素sockets.remove(new Socket(/* 参数 */));// 检查集合是否包含某个元素boolean contains = sockets.contains(new Socket(/* 参数 */));// 获取集合的大小int size = sockets.size();}
}
请注意,由于 CopyOnWriteArraySet
的写操作性能开销较大,所以它通常适用于以下情况:
- 写操作非常少,而读操作非常多。
- 存储的数据量不大。
- 数据的实时性要求不高,因为读操作可能读取到旧的数据。
如果你的应用场景不满足上述条件,可能需要考虑使用其他的并发集合类,比如 ConcurrentHashMap
键集合或 Collections.synchronizedSet
包装的 HashSet
。
GroupServer.java
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class GroupServer {private final int port; // 服务器监听端口号private final ServerSocket serverSocket; //定义服务器套接字// 创建线程池private final ExecutorService executorService = Executors.newCachedThreadPool();// 线程安全的set集合public static CopyOnWriteArraySet<Socket> socketset = new CopyOnWriteArraySet<>();public GroupServer() throws IOException {port = 8080; // 例如,使用8080作为默认端口serverSocket = new ServerSocket(port);System.out.println("服务器启动监听在 " + port + " 端口");}public static void main(String[] args) throws IOException {GroupServer server = new GroupServer();System.out.println("服务器将监听端口号: " + server.port);server.Service();}private PrintWriter getWriter(Socket socket) throws IOException {//获得输出流缓冲区的地址OutputStream socketOut = socket.getOutputStream();//网络流写出需要使用flush,这里在PrintWriter构造方法中直接设置为自动flushreturn new PrintWriter(new OutputStreamWriter(socketOut, StandardCharsets.UTF_8), true);}private BufferedReader getReader(Socket socket) throws IOException {//获得输入流缓冲区的地址InputStream socketIn = socket.getInputStream();return new BufferedReader(new InputStreamReader(socketIn, StandardCharsets.UTF_8));}private void sendToAllMembers(String msg, String hostAddress) throws IOException {PrintWriter pw;OutputStream out;for (Socket tempSocket : socketset) {out = tempSocket.getOutputStream();pw = new PrintWriter(new OutputStreamWriter(out, "utf-8"), true);pw.println(hostAddress + " 发言:" + msg);}}class ThreadHandler implements Runnable {private final Socket socket;public ThreadHandler(Socket socket) {this.socket = socket;}@Overridepublic void run() {//本地服务器控制台显示客户端连接的用户信息System.out.println("New connection accepted: " + socket.getInetAddress().getHostAddress());try {BufferedReader br = getReader(socket);//定义字符串输入流PrintWriter pw = getWriter(socket);//定义字符串输出流//客户端正常连接成功,则发送服务器的欢迎信息,然后等待客户发送信息pw.println("From 服务器:欢迎使用本服务!");String msg;//此处程序阻塞,每次从输入流中读入一行字符串while ((msg = br.readLine()) != null) {//如果客户发送的消息为"bye",就结束通信if (msg.equalsIgnoreCase("bye")) {//向输出流中输出一行字符串,远程客户端可以读取该字符串pw.println("From服务器:服务器断开连接,结束服务!");System.out.println("客户端离开");//向输出流中输出一行字符串,远程客户端可以读取该字符串break; //结束循环}sendToAllMembers(msg, socket.getInetAddress().getHostAddress());}} catch (IOException e) {e.printStackTrace();} finally {try {socket.close(); //关闭socket连接及相关的输入输出流} catch (IOException e) {e.printStackTrace();}}}}//单客户版本,即每一次只能与一个客户建立通信连接public void Service() {while (true) {Socket socket = null;try {//此处程序阻塞等待,监听并等待客户发起连接,有连接请求就生成一个套接字。socket = serverSocket.accept(); // 从请求队列取一个socket请求socketset.add(socket);System.out.println("添加socket" + socket);Thread t = new Thread(new ThreadHandler(socket));executorService.execute(t);} catch (IOException e) {throw new RuntimeException(e);}}}
}
扩展练习一:自定义线程池
-
前面提到OOM的问题,如果能提供
自行确定最小值和最大值的动态调整的线程池
会更满足要求,大家跟踪Executors. newCachedThreadPool()
方法,观察其源代码,会发现非常简单,而且也会明白为什么会出现OOM错误(Out of Memory内存溢出) -
大家可以尝试将其实现代码拷贝出来稍作修改,封装一个自己版本的 myCachedThreadPool 方法来使用。
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, 1000,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
扩展练习二:简易聊天室设计
- 设计聊天服务器
ChatServer.java
,客户端用学号-姓名的方式登录服务器,实现一对一、一对多私聊及群组广播聊天的功能; - 用户登录时,需要将用户上线的信息广播给所有在线用户;客户端发送特定命令,服务器要能够返回在线用户列表信息;
- 程序设计中,要能显示发言者的学号姓名(例如20181111111-程旭),这种情况可以考虑使用线程安全的HashMap类型(自行搜索应该使用哪个类,这些线程安全的集合类型和普通的集合类型使用方式如出一辙);
- 自行考虑如何设计服务端和客户端之间的交互约定(协议),可以在用户连上服务器时,即要求用户发送学号和姓名信息,并给用户发送相关的使用指南,约定发送指令的作用。
- 编程中要小心处理好各种逻辑关系。
数据结构之HashMap
HashMap 是一种数据结构,属于哈希表的一种实现。它能够以键值对的形式存储数据,具有快速的查找、插入和删除操作。以下是一些关于 HashMap 的基本概念和特性:
- 键值对存储:HashMap 将数据以键值对的方式存储,每个键(Key)唯一对应一个值(Value)。
- 快速访问:通过哈希函数,HashMap 可以快速定位到存储在内部数组中的数据,具有平均 O(1) 的时间复杂度进行查找、插入和删除。
- 允许空值:HashMap 允许一个键为 null,且可以有多个值为 null。
- 不保证顺序:HashMap 并不保证键值对的顺序,因此在迭代时可能不按插入顺序返回元素。
- 线程不安全:HashMap 不是线程安全的,若在多线程环境下使用,可能需要使用
Collections.synchronizedMap
或使用ConcurrentHashMap
。
在 Java 中,HashMap 的基本用法示例如下:
import java.util.HashMap;public class Example {public static void main(String[] args) {HashMap<String, Integer> map = new HashMap<>();// 插入数据map.put("苹果", 1);map.put("香蕉", 2);map.put("橙子", 3);// 访问数据System.out.println("香蕉的数量: " + map.get("香蕉"));// 删除数据map.remove("橙子");// 遍历 HashMapfor (String key : map.keySet()) {System.out.println(key + ": " + map.get(key));}}
}
线程安全的ConcurrentHashMap
ConcurrentHashMap
是 Java 中提供的一个线程安全的 HashMap
实现,它允许多个线程同时访问和修改,而不需要额外的同步控制。以下是一些基本的使用方法:
导入类
首先,你需要导入 ConcurrentHashMap
类:
import java.util.concurrent.ConcurrentHashMap;
创建 ConcurrentHashMap
创建一个 ConcurrentHashMap
实例非常简单,你可以像创建普通的 HashMap
一样创建它:
ConcurrentHashMap<KeyType, ValueType> map = new ConcurrentHashMap<>();
插入元素
向 ConcurrentHashMap
中插入元素,可以使用 putIfAbsent
方法,该方法只有在键不存在时才会插入元素,这有助于避免多线程环境下的冲突:
map.putIfAbsent(key, value);
获取元素
获取元素可以直接通过 get
方法:
ValueType value = map.get(key);
删除元素
删除元素可以使用 remove
方法:
map.remove(key);
遍历
遍历 ConcurrentHashMap
和普通的 HashMap
一样,但是要注意,遍历时对 ConcurrentHashMap
的修改操作可能会影响迭代器的行为:
for (Map.Entry<KeyType, ValueType> entry : map.entrySet()) {KeyType key = entry.getKey();ValueType value = entry.getValue();// 处理键值对
}
原子操作
ConcurrentHashMap
提供了一些原子操作,例如 putIfAbsent
、replace
和 compute
等:
// 如果键存在,则替换旧值,否则插入新值
map.replace(key, oldValue, newValue);// 如果键存在,则根据提供的函数计算新值
map.compute(key, (k, v) -> {// 计算新值的逻辑return newValue;
});
线程安全
ConcurrentHashMap
在多线程环境下不需要额外的同步措施,因为它内部已经处理了线程安全的问题。但是,当涉及到复合操作时(比如先检查某个键是否存在,然后基于这个检查结果执行一些操作),你可能需要使用 computeIfAbsent
或 computeIfPresent
等原子方法来保证操作的原子性。
使用 ConcurrentHashMap
时,你不需要担心线程安全问题,但是要确保你的操作是线程安全的,比如不要在迭代过程中修改 ConcurrentHashMap
,或者在迭代过程中对元素进行修改时,要确保这些修改是线程安全的。
TextArea添加滚轮修改字体大小
// 给文本区添加滚轮事件并且要按住Ctrl键增加字号
OutputArea.setOnScroll(event -> {if (event.isControlDown()) {if (event.getDeltaY() > 0) {OutputArea.setStyle("-fx-font-size: " + (OutputArea.getFont().getSize() + 1) + "px;");} else {OutputArea.setStyle("-fx-font-size: " + (OutputArea.getFont().getSize() - 1) + "px;");}}
});
实现代码
ChatServer
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ChatServer {private final int port;private final ServerSocket serverSocket;private final ExecutorService executorService = Executors.newCachedThreadPool();// public static ConcurrentHashMap<HashMap<String, String>, Socket> hashmap = new ConcurrentHashMap<>();public static ConcurrentHashMap<String, Socket> hashmap = new ConcurrentHashMap<>();public ChatServer() throws IOException {port = 8888;serverSocket = new ServerSocket(port);System.out.println("服务器启动监听在 " + port + " 端口");}public static void main(String[] args) throws IOException {ChatServer server = new ChatServer();System.out.println("服务器将监听端口号: " + server.port);server.Service();}private PrintWriter getWriter(Socket socket) throws IOException {OutputStream socketOut = socket.getOutputStream();return new PrintWriter(new OutputStreamWriter(socketOut, StandardCharsets.UTF_8), true);}private BufferedReader getReader(Socket socket) throws IOException {InputStream socketIn = socket.getInputStream();return new BufferedReader(new InputStreamReader(socketIn, StandardCharsets.UTF_8));}private void sendToAllMembers(String no_name, String msg, int message_type) throws IOException {PrintWriter pw;OutputStream out;for (Map.Entry<String, Socket> entry : hashmap.entrySet()) {if (entry.getKey().equals(no_name)) {continue;}Socket tempSocket = entry.getValue();out = tempSocket.getOutputStream();pw = new PrintWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8), true);if (message_type == 0) { // 聊天消息pw.println(no_name + " 发言:" + msg);} else if (message_type == 1) { // 系统消息pw.println(msg);}}}private void listAllMembers(Socket socket) throws IOException {PrintWriter pr = getWriter(socket);for (Map.Entry<String, Socket> entry : hashmap.entrySet()) {pr.println(entry.getKey());}}class ThreadHandler implements Runnable {private final Socket socket;public ThreadHandler(Socket socket) {this.socket = socket;}@Overridepublic void run() {System.out.println("New connection accepted: " + socket.getInetAddress().getHostAddress());try {PrintWriter pw = getWriter(socket);BufferedReader br = getReader(socket);pw.println("请输入用户名和学号,中间使用-分割");String no_name = br.readLine();// 正则的分割符是\|,所以这里要用\\|while (no_name.split("-").length < 2) {pw.println("请输入正确的用户名和学号,中间使用-分割");no_name = br.readLine();}// 为什么要加\\?因为|在正则表达式中有特殊含义,需要转义String name = no_name.split("-")[0];String no = no_name.split("-")[1];hashmap.put(no_name, socket);pw.println("clearScreen");pw.println("no_name:" + no_name);pw.println("From 服务器:已成功登录!");pw.println("From 服务器:默认是发送给全体用户的广播信息");pw.println("From 服务器:如果要发送私聊信息, 使用【学号1|学号2&私聊信息】方式给指定用户发送,例如发送【20181111111|20182222222&这是我发给你们的私聊信息】");pw.println("From 服务器:发送 #在线用户# 能获得所有在线用户的列表信息");// 处理消息String msg;while ((msg = br.readLine()) != null) {if (msg.equalsIgnoreCase("bye")) {pw.println("From服务器:服务器断开连接,结束服务!");hashmap.remove(no_name);System.out.println("客户端" + no_name + "离开");sendToAllMembers(no_name, "系统消息:-------" + no_name + "离开-------", 1);break;} else if (msg.equals("#在线用户#")) {listAllMembers(socket);continue;}if (msg.matches("^【[0-9]*\\|[0-9]*&.*】$")) {// 私聊消息System.out.println("私聊消息:" + msg);String[] split = msg.split("\\|");String from_no = split[0].substring(1);System.out.println("from_no:" + from_no);if (!from_no.equals(no)) {System.out.println(no);pw.println("From服务器:学号1必须是自己的学号!");continue;}String to_no = split[1].split("&")[0];System.out.println("to_no:" + to_no);String content = split[1].split("&")[1].substring(1, split[1].length() - 1);System.out.println("content:" + content);for (Map.Entry<String, Socket> entry : hashmap.entrySet()) {if (entry.getKey().endsWith(to_no)) {Socket tempSocket = entry.getValue();System.out.println("Socket" + tempSocket);PrintWriter tempPw = getWriter(tempSocket);tempPw.println("From " + no_name + ":" + content);}}continue;}sendToAllMembers(no_name, msg, 0);}} catch (IOException e) {e.printStackTrace();} finally {try {socket.close();} catch (IOException e) {e.printStackTrace();}}}}public void Service() {while (true) {Socket socket;try {socket = serverSocket.accept();System.out.println("添加socket" + socket);Thread t = new Thread(new ThreadHandler(socket));executorService.execute(t);} catch (IOException e) {throw new RuntimeException(e);}}}
}
TCPClient
import java.io.*;
import java.net.Socket;
import java.nio.charset.StandardCharsets;public class TCPClient {private final Socket socket; // 定义套接字private final PrintWriter pw; // 定义字符输出流private final BufferedReader br; // 定义字符输入流public TCPClient(String ip, String port) throws IOException {// 主动向服务器发起连接,实现TCP的三次握手过程// 如果不成功,则抛出错误信息,其错误信息交由调用者处理socket = new Socket(ip, Integer.parseInt(port));// 得到网络输出字节流地址,并封装成网络输出字符流// 设置最后一个参数为true,表示自动flush数据OutputStream socketOut = socket.getOutputStream();pw = new PrintWriter(new OutputStreamWriter(socketOut, StandardCharsets.UTF_8), true);// 得到网络输入字节流地址,并封装成网络输入字符流InputStream socketIn = socket.getInputStream();br = new BufferedReader(new InputStreamReader(socketIn, StandardCharsets.UTF_8));}public void send(String msg) {// 输出字符流,由Socket调用系统底层函数,经网卡发送字节流pw.println(msg);}public String receive() {String msg = null;try {// 从网络输入字符流中读信息,每次只能接收一行信息// 如果不够一行(无行结束符),则该语句阻塞等待msg = br.readLine();} catch (IOException e) {e.printStackTrace();}return msg;}// 实现close方法以关闭socket连接及相关的输入输出流public void close() {try {if (pw != null) {pw.close(); // 关闭PrintWriter会先flush再关闭底层流}if (br != null) {br.close(); // 关闭BufferedReader}if (socket != null) {socket.close(); // 关闭Socket连接}} catch (IOException e) {e.printStackTrace();}}
}
TCPClientThreadFx
import javafx.application.Application;
import javafx.application.Platform;
import javafx.geometry.Insets;
import javafx.geometry.Pos;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.Label;
import javafx.scene.control.TextArea;
import javafx.scene.control.TextField;
import javafx.scene.input.KeyCode;
import javafx.scene.layout.BorderPane;
import javafx.scene.layout.HBox;
import javafx.scene.layout.Priority;
import javafx.scene.layout.VBox;
import javafx.stage.Stage;import java.net.InetAddress;
import java.net.UnknownHostException;public class TCPClientThreadFX extends Application {private final Button btnCon = new Button("连接");private final Button btnExit = new Button("退出");private final Button btnSend = new Button("发送");private final TextField IpAdd_input = new TextField();private final TextField Port_input = new TextField();private final TextArea OutputArea = new TextArea();private final TextField InputField = new TextField();private TCPClient tcpClient;private Thread receiveThread;private String no_name;public static void main(String[] args) {launch(args);}public void start(Stage primaryStage) {btnSend.setDisable(true);BorderPane mainPane = new BorderPane();VBox mainVBox = new VBox();HBox hBox = new HBox();hBox.setSpacing(10);hBox.setPadding(new Insets(20, 20, 10, 20));hBox.getChildren().addAll(new Label("IP地址: "), IpAdd_input, new Label("端口: "), Port_input, btnCon);hBox.setAlignment(Pos.TOP_CENTER);VBox vBox = new VBox();vBox.setSpacing(10);vBox.setPadding(new Insets(10, 20, 10, 20));vBox.getChildren().addAll(new Label("信息显示区:"), OutputArea, new Label("信息输入区"), InputField);// setVgrow()方法用于设置组件的拉伸策略,在这里设置为ALWAYS,即组件将会填充整个区域VBox.setVgrow(OutputArea, Priority.ALWAYS);OutputArea.setEditable(false);OutputArea.setStyle("-fx-wrap-text: true; -fx-font-size: 16px;");InputField.setOnKeyPressed(event -> {if (event.getCode() == KeyCode.ENTER) {btnSend.fire();}});HBox hBox2 = new HBox();hBox2.setSpacing(10);hBox2.setPadding(new Insets(10, 20, 10, 20));btnCon.setOnAction(event -> {String ip = IpAdd_input.getText().trim();String port = Port_input.getText().trim();btnCon.setDisable(true);try {tcpClient = new TCPClient(ip, port);receiveThread = new Thread(() -> {String msg;while ((msg = tcpClient.receive()) != null) {String msgTemp = msg;if (msgTemp.equals("clearScreen")) {OutputArea.clear();continue;} else if (msgTemp.startsWith("no_name:")) {no_name = msgTemp.split(":")[1];continue;}Platform.runLater(() -> {OutputArea.appendText(msgTemp + "\n");});}Platform.runLater(() -> {OutputArea.appendText("对话已关闭!\n");});});receiveThread.start();btnSend.setDisable(false);} catch (Exception e) {OutputArea.appendText("服务器连接失败!" + e.getMessage() + "\n");}});btnExit.setOnAction(event -> {if (tcpClient != null) {tcpClient.send("bye");try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}tcpClient.close();btnSend.setDisable(true);}System.exit(0);});btnSend.setOnAction(event -> {String sendMsg = InputField.getText();if (sendMsg.trim().isEmpty()) {return;}tcpClient.send(sendMsg);InputField.clear();// 获取本机ipString ip;try {ip = InetAddress.getLocalHost().getHostAddress();} catch (UnknownHostException e) {throw new RuntimeException(e);}// 添加窗口标题primaryStage.setTitle(ip + " [" + no_name + "]");OutputArea.appendText("Me: " + sendMsg + "\n");});// 给文本区添加滚轮事件并且要按住Ctrl键增加字号OutputArea.setOnScroll(event -> {if (event.isControlDown()) {if (event.getDeltaY() > 0) {OutputArea.setStyle("-fx-font-size: " + (OutputArea.getFont().getSize() + 1) + "px;");} else {OutputArea.setStyle("-fx-font-size: " + (OutputArea.getFont().getSize() - 1) + "px;");}}});// 文本自动换行OutputArea.setWrapText(true);hBox2.setAlignment(Pos.CENTER_RIGHT);hBox2.getChildren().addAll(btnSend, btnExit);mainVBox.getChildren().addAll(hBox, vBox, hBox2);VBox.setVgrow(vBox, Priority.ALWAYS);mainPane.setCenter(mainVBox);Scene scene = new Scene(mainPane, 800, 550);IpAdd_input.setText("127.0.0.1");Port_input.setText("8888");primaryStage.setScene(scene);primaryStage.show();}
}