Java IO流 JavaSE

2023-05-28 约 13721 字 阅读时长28 分钟

Java IO流

Java的IO演进

IO模型基本说明

IO模型:就是用什么样的通道或者说是通信模式和架构进行数据的传输和接收,很大程度上决定了程序通信的性能

Java共支持3种网络编程的IO模型

  • BIO:同步并阻塞(传统阻塞型),服务器实现模式为一个连接对应一个线程,即客户端有连接请求时服务端就需要启动一个线程进行处理,如果这个连接不做任何事情就会造成不必要的线程开销

  • NIO:同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有IO请求就进行处理

  • AIO(NIO.2):异步非阻塞,服务器实现模式为一个有效请求对应一个线程,客户端的IO请求都是由OS先完成了再通知服务器应用去启动线程进行处理,一般适用于连接数较多且连接时间较长的应用

实际通信需求下,要根据不同的业务场景和性能需求决定选择不同的IO模型

BIO、NIO、AIO使用场景分析

  • BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用
  • NIO 方式适用于连接数据多且连接比较(轻操作)的架构,比如聊天服务器、弹幕系统、服务器间通讯等,编程较难,JDK1.4开始支持
  • AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如聊天服务器,充分调用OS参与并发操作,编程较难,JDK1.7开始支持

BIO深入剖析

介绍

Java BIO 就是传统的 Java io 编程,其相关类和接口都在 Java.io 包下

BIO(blocking IO):同步阻塞,服务器实现模式为一个连接对应一个线程,即客户端有连接请求时服务端就需要启动一个线程处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善

Java BIO 工作机制

image-20230528144355742

BIO编程实例

网络编程的基本模型都是 Client/Server 模型,也就是两个进程之间进行相互通信,其中服务端提供位置信息(绑定IP地址和端口),客户端通过连接餐桌向服务端坚挺的端口地址发起连接请求,基于TCP协议下进行三次握手连接,连接成功后,双方通过网络套接字(Socket)进行通信

传统的同步阻塞模型开发中,服务端 ServerSocket 负责绑定 IP 地址,启动监听端口;客户端 Scoket 服务发起连接操作。连接成功后,双方通过输入输出流进行同步阻塞式通信

基于 BIO 模式下的通信,客户端-服务端是完全同步的,完全耦合的

客户端

java
 1public class Client {
 2    public static void main(String[] args) {
 3        try {
 4            // 创建 socket 对象
 5            Socket socket = new Socket("127.0.0.1", 9999);
 6            // 从 socket 对象中获取一个字节输出流
 7            OutputStream os = socket.getOutputStream();
 8            // 将字节输出流包装成一个打印流
 9            PrintStream ps=new PrintStream(os);
10            Scanner sc=new Scanner(System.in);
11            while (true){
12                System.out.print("请输入:");
13                String msg = sc.nextLine();
14                if (Objects.equals(msg,"0")) {
15                    break;
16                }
17                ps.println(msg);
18                ps.flush();
19            }
20        } catch (Exception e) {
21            e.printStackTrace();
22        }
23    }
24}

服务端

java
 1public class Server {
 2    public static void main(String[] args) {
 3        System.out.println("=====服务端启动=====");
 4        try {
 5            // 定义一个 ServerSocket 进行服务端注册
 6            ServerSocket serverSocket = new ServerSocket(9999);
 7            // 监听客户端的 Socket 请求
 8            Socket socket = serverSocket.accept();
 9            // 从 socket 获得一个字节输入流
10            InputStream is = socket.getInputStream();
11            // 将字节输入流转换为字符输入流
12            InputStreamReader isr = new InputStreamReader(is);
13            // 将字符输入流包装成一个缓冲字符输入流
14            BufferedReader br = new BufferedReader(isr);
15            String msg;
16            while ((msg = br.readLine()) != null) {
17                System.out.println("服务端读取到的数据:" + msg);
18            }
19        } catch (Exception e) {
20            e.printStackTrace();
21        }
22
23    }
24}

小结

  • 在以上通信中,服务端会一直等待客户端的消息,如果客户端没有消息的发送,服务端将一直进入阻塞状态
  • 同时服务端是按照行获取消息的,这意味着客户端也必须按照行进行消息的发送,否则服务端将进入消息阻塞状态
  • 当客户端关闭 socket ,服务端还在读取流中消息时会抛错 Java.net.SocketException: Connection reset
  • 服务端 serverSocket.accept() 时会阻塞等待客户端 socket 请求

BIO模式下接收多个客户端

客户端每发起一次请求,服务端就创建一个新的线程来处理客户端请求,这样就能实现一个客户端一个线程模型

image-20230528230843135

客户端

客户端代码同上

服务端

java
 1/**
 2 * 可接受多个客户端 Socket 请求的实例
 3 * 服务端每接收到一个客户端 socket 请求时后都交给一个独立的线程来处理
 4 */
 5public class Service {
 6    public static void main(String[] args) {
 7        try {
 8            // 注册端口
 9            ServerSocket serverSocket = new ServerSocket(9999);
10            // 定义一个死循环不断接收不同的客户端请求
11            while (true){
12                Socket socket = serverSocket.accept();
13                // 创建一个独立的线程处理当前接收到的 socket 请求
14                new Thread(()->{
15                    try {
16                        // 线程从 socket 中获取字节输入流
17                        InputStream is = socket.getInputStream();
18                        // 缓冲字符输入流 包装 字节输入流
19                        BufferedReader br=new BufferedReader(new InputStreamReader(is));
20                        // 读取数据
21                        String msg;
22                        while ((msg=br.readLine())!=null){
23                            System.out.println(Thread.currentThread().getName()+"-服务端读取数据:"+msg);
24                        }
25
26                    }catch (Exception e){
27                        e.printStackTrace();
28                    }
29
30                }).start();
31            }
32        }catch (Exception e){
33
34        }
35    }
36}

总结

  • 每个 socket 接收到,都会创建一个线程,线程的竞争、切换上下文影响性能
  • 每个线程都会占用栈空间
  • 并不是每个 socket 请求都进行 IO 操作,无意义的线程处理
  • 客户端的并发访问增加时,服务端将呈现 1:1 的线程开销,访问量过大,系统将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务

伪异步IO编程

伪异步IO通信框架,采用线程池和任务队列实现,当客户端接入时,将客户端的 Socket 封装成一个 Task (该任务实现 Java.lang.Runnable 线程任务接口)交给后端的线程池进行处理。JDK 的线程池维护一个消息队列和 N 个活跃的线程,对消息队列中 Socket 任务进行处理,由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机

image-20230529095138386

客户端

代码同 BIO 编程实例中客户端代码一致

服务端

java
 1/**
 2 * 服务器:伪异步通信架构
 3 *
 4 * @author lei
 5 * @date 2023/05/29
 6 */
 7public class Server {
 8    static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(16));
 9
10    public static void main(String[] args) {
11        try {
12            // 注册端口
13            ServerSocket serverSocket = new ServerSocket(9999);
14            // 接受客户端 socket 请求
15            while (true) {
16                Socket socket = serverSocket.accept();
17                // 将 socket 包装成一个任务对象交给线程池
18                Runnable runnable = () -> {
19                    try {
20                        InputStream is = socket.getInputStream();
21                        BufferedReader bf = new BufferedReader(new InputStreamReader(is));
22                        String msg;
23                        while ((msg = bf.readLine()) != null) {
24                            System.out.println(Thread.currentThread().getName() + "-服务端获取数据:" + msg);
25                        }
26                    } catch (Exception e) {
27                        e.printStackTrace();
28                    }
29                };
30                threadPoolExecutor.execute(runnable);
31            }
32        } catch (Exception e) {
33            e.printStackTrace();
34        }
35    }
36}

小结

  • 伪异步 IO 才用了线程池实现,因此可以避免每个请求创建一个独立线程造成资源耗尽的问题,但是由于底层依然是采用的同步阻塞模型,因此无法从根本上解决问题
  • 如果单个消息处理的缓慢,或者服务器线程池中的全部线程都被阻塞,那么后续 socket 的IO消息都将在队列中等待。新的 socket 请求将会被拒绝,客户端会发生大量连续超时

基于BIO模式下的文件上传

客户端

java
 1/**
 2 * 客户端:可以实现客户端上传任意类型的文件到服务端保存
 3 *
 4 * @author lei
 5 * @date 2023/05/29
 6 */
 7public class Client {
 8    public static void main(String[] args) {
 9        try (
10                FileInputStream is = new FileInputStream("E:\\1\\1\\R-C.jpg")
11        ) {
12            // 请求与服务端的 socket 连接
13            Socket socket = new Socket("127.0.0.1", 9999);
14            // 将字节输出流包装成一个数据输出流
15            DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
16            // 先发送上传文件的后缀
17            dos.writeUTF(".jpg");
18            // 将文件数据发送到服务端
19            byte[] buffer = new byte[1024];
20            int len;
21            while ((len = is.read(buffer)) != -1) {
22                dos.write(buffer, 0, len);
23            }
24            dos.flush();
25            // 通知服务端数据发送完毕
26            socket.shutdownOutput();
27        } catch (Exception e) {
28            e.printStackTrace();
29        }
30    }
31}

服务端

java
 1/**
 2 * 服务器:接收客户端任意类型文件,并保存
 3 *
 4 * @author lei
 5 * @date 2023/05/29
 6 */
 7public class Server {
 8    public static void main(String[] args) {
 9        try {
10            ServerSocket serverSocket = new ServerSocket(9999);
11            while (true) {
12                Socket socket = serverSocket.accept();
13                // 交给一个独立线程处理 socket 通信
14                new Thread(() -> {
15                    try {
16                        // 得到一个数据输入流读取客户端发过来的数据
17                        DataInputStream dis = new DataInputStream(socket.getInputStream());
18                        // 读取客户端发过来的文件类型
19                        String utf = dis.readUTF();
20                        try (
21                                // 定义一个文件输出流负责把客户端发送的数据写进去
22                                FileOutputStream fos = new FileOutputStream("E:\\1\\2\\" + UUID.randomUUID() + utf)
23                        ) {
24                            // 从字节输入流中读取文件数据,并写到文件输出流中去
25                            byte[] buffer = new byte[1024];
26                            int len;
27                            while ((len = dis.read(buffer)) != -1) {
28                                fos.write(buffer, 0, len);
29                            }
30                            fos.flush();
31                        }
32                    } catch (Exception e) {
33                        e.printStackTrace();
34                    }
35
36                }).start();
37            }
38        } catch (Exception e) {
39			e.printStackTrace();
40        }
41    }
42}

BIO模式下的端口转发

需求:实现一个客户端的消息可以发送给所有的客户端去接受(群聊)

image-20230530092015156

BIO模式下的即时通信

基于 BIO 模式下的即时通信,需要解决客户端到客户端通信,也就是实现客户端与客户端的端口消息转发逻辑

实现功能列表

  1. 客户端登录功能
  2. 在线人数实时更新
  3. 离线人数实时更新
  4. 群聊
  5. 私聊
  6. @消息的实现
  7. 消息用户和消息时间点记录

代码结构

image-20230530124648760

服务端代码实现

Server

java
 1public class Server {
 2
 3    /**
 4     * 定义一个map集合存放所有在线 socket
 5     */
 6    public static final Map<Socket,String> onLineSocketMap = new HashMap<>();
 7
 8    public static void main(String[] args) {
 9        try {
10            // 注册端口
11            ServerSocket serverSocket=new ServerSocket(9999);
12            while (true){
13                // 接收客户端 socket 请求
14                Socket socket = serverSocket.accept();
15                // 客户端 socket 管道单独配置一个线程处理
16                new ServerReader(socket).start();
17            }
18        }catch (Exception e){
19            e.printStackTrace();
20        }
21    }
22}

ServerReader

java
 1public class ServerReader extends Thread {
 2    private final Socket socket;
 3
 4    public ServerReader(Socket socket) {
 5        this.socket = socket;
 6    }
 7
 8    @Override
 9    public void run() {
10        DataInputStream dis = null;
11        try {
12            dis = new DataInputStream(socket.getInputStream());
13            // 循环等待客户端的消息
14            while (true) {
15                // 读取当前消息类型 登录、群发、私聊、@
16                int flag = dis.readInt();
17                if (flag == 1) {
18                    String name = dis.readUTF();
19                    System.out.println(name + "-------->" + socket.getRemoteSocketAddress());
20                    Server.onLineSocketMap.put(socket, name);
21                }
22                writeMsg(flag, dis);
23            }
24        } catch (Exception e) {
25            System.out.println("有人下线了-----");
26            // 从在线map中移除
27            Server.onLineSocketMap.remove(socket);
28            try {
29                // 重新更新在线人数发给金客户端
30                writeMsg(1, dis);
31            } catch (Exception e1) {
32                e1.printStackTrace();
33            }
34        }
35    }
36
37    private void writeMsg(int flag, DataInputStream dis) throws IOException {
38        switch (flag) {
39            // 读取在线人数,更新到所有客户端
40            case 1:
41                Collection<String> values = Server.onLineSocketMap.values();
42                String msg1 = String.join(Constants.SPLIT, values);
43                sendMsgToAll(flag, msg1);
44                break;
45            case 2:
46            case 3:
47                // 读取群发或@的消息
48                String msg2 = dis.readUTF();
49                // 发送人姓名
50                String sendName = Server.onLineSocketMap.get(socket);
51                StringBuilder sb = new StringBuilder();
52                LocalDateTime now = LocalDateTime.now();
53                DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
54                sb.append(sendName).append(" ").append(formatter.format(now)).append("\r\n");
55                sb.append(" ").append(msg2).append("\r\n");
56                if (flag == 2) {
57                    sendMsgToAll(flag, sb.toString());
58                } else {
59                    sendMsgToOne(flag, dis.readUTF(), sb.toString());
60                }
61                break;
62        }
63    }
64
65    private void sendMsgToOne(int flag, String destName, String msg) throws IOException {
66        Set<Socket> socketSet = Server.onLineSocketMap.keySet();
67        for (Socket sk : socketSet) {
68            if (Server.onLineSocketMap.get(sk).trim().equals(destName.trim())) {
69                DataOutputStream dos=new DataOutputStream(sk.getOutputStream());
70                dos.writeInt(flag);
71                dos.writeUTF(msg);
72                dos.flush();
73            }
74        }
75    }
76
77    private void sendMsgToAll(int flag, String msg) throws IOException {
78        // 拿到所有客户端 socket 集合,并发送消息
79        Set<Socket> sockets = Server.onLineSocketMap.keySet();
80        for (Socket sk : sockets) {
81            DataOutputStream dos = new DataOutputStream(sk.getOutputStream());
82            dos.writeInt(flag);
83            dos.writeUTF(msg);
84            dos.flush();
85        }
86    }
87}

客户端代码实现

Client

java
  1public class Client implements ActionListener {
  2    // 设计界面
  3    private final JFrame win = new JFrame();
  4    // 消息内容框架
  5    public JTextArea smsContent = new JTextArea(23, 50);
  6    // 消息发送框
  7    public JTextArea smsSend = new JTextArea(4, 40);
  8    // 在线人数区域
  9    // 存放人的数据
 10    // 展示在线人数窗口
 11    public JList<String> onLineUsers = new JList<>();
 12
 13    // 是否私聊按钮
 14    JCheckBox isPrivateBtn = new JCheckBox("私聊");
 15    // 消息发送按钮
 16    JButton sendBtn = new JButton("发送");
 17
 18    // 登录页面
 19    private JFrame loginView;
 20
 21    private JTextField ipEt, nameEt, idEt;
 22
 23    private Socket socket;
 24
 25    public static void main(String[] args) {
 26        new Client().initView();
 27    }
 28
 29    private void initView() {
 30        // 初始化聊天窗口界面
 31        win.setSize(650, 600);
 32
 33        // 展示登录界面
 34        displayLoginView();
 35
 36        // 展示聊天界面
 37//        displayChartView();
 38    }
 39
 40    private void displayLoginView() {
 41        loginView = new JFrame("登录");
 42        loginView.setLayout(new GridLayout(3, 1));
 43        loginView.setSize(400, 230);
 44
 45        JPanel ip = new JPanel();
 46        JLabel label1 = new JLabel("  IP:");
 47        ip.add(label1);
 48        ipEt = new JTextField(20);
 49        ip.add(ipEt);
 50        loginView.add(ip);
 51
 52        JPanel name = new JPanel();
 53        JLabel label2 = new JLabel("  姓名:");
 54        name.add(label2);
 55        nameEt = new JTextField(20);
 56        name.add(nameEt);
 57        loginView.add(name);
 58
 59        JPanel btnView = new JPanel();
 60        JButton login = new JButton("登录");
 61        JButton cancel = new JButton("取消");
 62        btnView.add(login);
 63        btnView.add(cancel);
 64        loginView.add(btnView);
 65        // 关闭窗口,退出当前程序
 66        loginView.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
 67        setWindowCenter(loginView,400,260,true);
 68
 69        // 给登录和取消按钮添加点击事件
 70        login.addActionListener(this);
 71        cancel.addActionListener(this);
 72    }
 73
 74    public static void setWindowCenter(JFrame jFrame, int width, int height, boolean flag) {
 75        // 得到当前品目宽高
 76        Dimension screenSize = jFrame.getToolkit().getScreenSize();
 77
 78        // 拿到电脑屏幕宽高
 79        int height1 = screenSize.height;
 80        int width1 = screenSize.width;
 81
 82        // 设置窗口左上角坐标
 83        jFrame.setLocation(width1 / 2 - width / 2, height1 / 2 - height / 2);
 84        jFrame.setVisible(flag);
 85    }
 86
 87    @Override
 88    public void actionPerformed(ActionEvent e) {
 89        // 得到点击事件源
 90        JButton btn = (JButton) e.getSource();
 91        switch (btn.getText()) {
 92            case "登录":
 93                String ip = ipEt.getText();
 94                String name = nameEt.getText();
 95                String msg = "";
 96                // 校验参数信息
 97                if (ip == null || !ip.matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")) {
 98                    msg = "请输入合法的ip地址";
 99                }
100                if (name == null || !name.matches("\\S{1,20}")) {
101                    msg = "请输入合法姓名";
102                }
103                if (!msg.equals("")) {
104                    // 参数一:弹出窗口放在那个窗口里
105                    JOptionPane.showMessageDialog(loginView, msg);
106                } else {
107                    try {
108                        // 将当前用户展示到页面
109                        win.setTitle(name);
110                        // 去服务端登录,连接一个socket通道
111                        socket = new Socket(ip, Constants.PORT);
112                        // 为客户端的 socket 分配一个线程,专门负责接收消息
113                        new ClientReader(this, socket).start();
114
115                        // 带上用户信息到服务端
116                        DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
117                        dos.writeInt(1);
118                        dos.writeUTF(name.trim());
119                        dos.flush();
120
121                        // 关闭当前窗口 弹出聊天界面
122                        loginView.dispose();
123                        displayChartView();
124                    } catch (Exception err) {
125                        err.printStackTrace();
126                    }
127                }
128                break;
129            case "取消":
130                //退出系统
131                System.exit(0);
132                break;
133            case "发送":
134                //得到发送消息的内容
135                String msgSend = smsSend.getText();
136                if (!msgSend.trim().equals("")) {
137                    //发消息给服务端
138                    try {
139                        //判断是否对谁发消息
140                        String selectName = onLineUsers.getSelectedValue();
141                        int flag = 2;// 群发 @消息
142                        if (selectName != null && !selectName.equals("")) {
143                            msgSend = ("@" + selectName + "," + msgSend);
144                            //判断是否选中私发
145                            if (isPrivateBtn.isSelected()) {
146                                //私发
147                                flag = 3;//私发消息
148                            }
149                        }
150                        DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
151                        dos.writeInt(flag);//群发消息 发送给所有人
152                        dos.writeUTF(msgSend);
153                        if (flag == 3) {
154                            //告诉服务器端我对谁私发
155                            dos.writeUTF(selectName.trim());
156                        }
157                        dos.flush();
158                    } catch (Exception e1) {
159                        e1.printStackTrace();
160                    }
161                }
162                smsSend.setText(null);
163                break;
164        }
165    }
166
167    private void displayChartView() {
168        JPanel bottomPanel = new JPanel(new BorderLayout());
169
170        // 将消息框和按钮添加到窗口底端
171        win.add(bottomPanel, BorderLayout.SOUTH);
172        bottomPanel.add(smsSend);
173        JPanel btns = new JPanel(new FlowLayout(FlowLayout.LEFT));
174        btns.add(sendBtn);
175        btns.add(isPrivateBtn);
176        bottomPanel.add(btns,BorderLayout.EAST);
177        // 添加发送按钮点击监听事件
178        sendBtn.addActionListener(this);
179
180        // 给消息发送按钮绑定点击监听器
181        // 将展示信息区添加到窗口中间
182        smsContent.setBackground(new Color(0xdd, 0xdd, 0xdd));
183        // 展示信息区可以滚动
184        win.add(new JScrollPane(smsContent), BorderLayout.CENTER);
185
186        // 用户列表和是否私聊放在窗口最右边
187        Box rightBox = new Box(BoxLayout.Y_AXIS);
188        onLineUsers.setFixedCellWidth(120);
189        onLineUsers.setVisibleRowCount(13);
190        rightBox.add(new JScrollPane(onLineUsers));
191        win.add(rightBox, BorderLayout.EAST);
192
193        win.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
194        // swing 加上这句 就可以拥有关闭窗口的功能
195        win.pack();
196
197        // 设置窗口居中,显示出来
198        setWindowCenter(win,650,600,true);
199
200    }
201}

ClientReader

java
 1public class ClientReader extends Thread {
 2    private final Socket socket;
 3    private final Client client;
 4
 5    public ClientReader(Client client, Socket socket) {
 6        this.socket = socket;
 7        this.client = client;
 8    }
 9
10    @Override
11    public void run() {
12        try {
13            DataInputStream dis = new DataInputStream(socket.getInputStream());
14            while (true) {
15                int flag = dis.readInt();
16                if (flag == 1) {
17                    // 处理返回的在线人数
18                    String nameDatas = dis.readUTF();
19                    // 展示在线人数到页面
20                    String[] names = nameDatas.split(Constants.SPLIT);
21                    client.onLineUsers.setListData(names);
22                } else if (flag == 2 || flag == 3) {
23                    // 群发 私聊 @ 消息直接展示
24                    String msg = dis.readUTF();
25                    client.smsContent.append(msg);
26                    // 让消息界面滚动到底端
27                    client.smsContent.setCaretPosition(client.smsContent.getText().length());
28                }
29            }
30        } catch (Exception e) {
31            e.printStackTrace();
32        }
33    }
34}

常量类

java
 1public class Constants {
 2    /**
 3     * 端口
 4     */
 5    public static final int PORT = 9999;
 6    /**
 7     * 姓名分隔符
 8     */
 9    public static final String SPLIT = ";;;";
10}

NIO深入剖析

介绍

  • Java NIO(New IO)也有人称为 Java non-blocking IO 是从 Java 1.4 版本开始引入的一个新的 IO API,可以替代标准的 Java IO API。NIO 与原来的 IO 有同样的作用和目的,但是使用方式完全不同,NIO 支持面向缓冲区的、基于通道的IO操作。NIO 将以更加高效的方式进行文件的读写操作。NIO 可以理解为非阻塞 IO,传统的 IO 的 read 和 write 只能阻塞执行,线程在读写 IO 期间不能干其他事情,比如调用 socket.read() 时,如果服务器一直没有数据传输过来,线程就一直阻塞,而 NIO 中可以配置 socket 为非阻塞模式
  • NIO 相关类被放在 Java.nio 包以及其子包下,并且对原 Java.io 包中的很多类进行改写
  • NIO 有三大核心部分:Channel(通道)Buffer(缓冲区)Selector(选择器)
  • Java NIO的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可读取之前,该线程可以继续做其他的事情。非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情
  • 通俗理解:NIO 是可以做到用一个线程来处理多个操作的。假设有 1000 个请求过来,根据实际情况,可以分配 20 或者 80 个线程来处理。不想之前的阻塞 IO 那样,非得分配 1000 个线程

NIO 和 BIO 比较

  • BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,块 IO 的效率比流 IO 高很多
  • BIO 是阻塞的,NIO 则是非阻塞的
  • BIO 基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道事件(比如:连接请求、数据到达),因此使用单个线程就可以监听多个客户端通道
NIOBIO
面向缓冲区(Buffer)面向流(Stream)
非阻塞(Non Blocking IO)阻塞 IO (Blocking IO)
选择器(Selectors)

NIO三大核心原理示意图

NIO 有三大核心部分:Channel(通道)、Buffer(缓冲区)、Selector(选择器)

Buffer 缓冲区

缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成 NIO Buffer 对象,并提供了一组方法,用来方便的访问该块内存。相比较直接对数据的操作,Buffer API 更加容易操作和管理

Channel(通道)

Java NIO 的通道类似流,但又有些不同:既可以从通道中读取数据,又可以往通道中写数据。但流的(input 或 output)读写通常是单向的。通道可以非阻塞读取和写入,通道可以支持读取或写入缓冲区,也支持异步地读写

Selector(选择器)

Selector 是一个 Java NIO 组件,可以能够检查一个或多个 NIO 通道高,并确定那些通道高已经准备好进行读取和写入。这样,一个单独的线程可以管理多个 Channel ,从而管理多个网络连接,提升效率

image-20230530172138091

  • 每个 Channel 都会一个 Buffer
  • 一个线程对应一个 Selector ,一个 Selector 对应多个 Channel(连接)
  • 程序切换到那个 Channel 是由事件决定的
  • Selector 会根据不同的事件,在各个通道上切换
  • Buffer 就是一个内存块,底层是一个数组
  • 数据的读取写入都是通过 Buffer 完成的,BIO 中要么是输入流,或者是输出流,不能双向,但是 NIO 中的 Buffer 是可以读也可以写的
  • Java NIO 系统的核心在于:通道(Chanel)、缓冲区(Buffer)。通道表示打开到 IO 设备(例如:文件、套接字)的连接。若要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理,简而言之,Channel 负责传输,Buffer 负责存取数据

缓冲区(Buffer)

一个用于特定基本数据类型的容器。由 Java.nio 包定义的,所有缓冲区都是 Buffer 抽象类的子类。Java NIO 的 Buffer 主要用于与 NIO 通道进行交互,数据是从通道读入缓冲区,从缓冲区写入通道中的

image-20230530181020515

Buffer 类及其子类

Buffer 就像一个数组,可以保存多个相同类型的数据。根据数据类型不同,常用 Buffer 子类有:

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

上述 Buffer 类,他们都采用相似的方法进行管理数据,只是各自管理的数据类型不同而已。都是通过如下方式获取一个 Buffer 对象:

java
1// 创建一个容量为 capacity 的buffer对象
2public static XxxBuffer allocate(int capacity);

缓冲区基本属性

  • 容量(capacity):作为一个内存块,Buffer具有一定的固定大小,也称为“容量”,缓冲区容量不能为负,并且创建后不能更改

  • 限制(limit):表示缓冲区中可以操作的数据大小(limit后数据不能进行读写)。缓冲区的限制不能为负,且限制不能大于其容量。写入模式,限制等于 buffer 的容量。读取模式下,limit 等于写入的数据量

  • 位置(position):下一个要读取或写入的数据索引,通过 Buffer 中的 mark() 方法指定 Buffer 中一个特定的 position,之后可以通过调用 reset() 方法恢复到这个 position

  • 标记(mark)与重置(reset):标记是一个索引,通过 Buffer 中的 mark() 方法指定 Buffer 中一个特定的 position,之后可以通过调用 reset() 方法恢复到这个 position

    标记、位置、限制、容量遵守一下不变式:0 <= mark <= position <= limit <= capacity

缓冲区常见方法

java
 1Buffer clear();  // 清空缓冲区并返还对缓冲区的引用
 2Buffer flip();  // 将缓冲区的界限(limit)设置为当前位置,并将当前位置(position)设置为0
 3int capactity();  //返回 Buffer 的 capactity 大小
 4boolean hasRemaining();  // 判断缓冲区中是否还有元素
 5int limit();  // 返回 Buffer 界限(limit)的位置
 6Buffer limit(int n);  // 将缓冲区界限设置为n,并返回一个具有新的 limit 的缓冲区对象
 7Buffer mark();  // 对缓冲区设置标记
 8int position();  // 返回缓冲区当前位置 position
 9Buffer position(int n);  // 设置缓冲区当前位置为 n,返回修改后的 Buffer 对象
10int remaining();  // 返回 position 和 limit 之间的元素个数
11Buffer reset();  // 将位置 position 恢复到以前设置的 mark 所在位置
12Buffer rewind();  // 将位置设为 0取消设置的 mark

缓冲区的数据操作

Buffer 所有子类都提供了两个用于数据操作的方法:get() put() 方法

java
1byte get();  // 读取单个字节
2ByteBuffer get(byte[] dst);  // 批量读取多个字节到 dst 中
3byte get(int index);  // 读取索引位置的字节,不会移动 position
4
5ByteBuffer put(byte b);  // 将给定单个字节写入缓冲区的当前位置
6ByteBuffer put(ByteBuffer src);  // 将 src 中的字节写入缓冲区当前位置
7ByteBuffer put(int index, byte b);  // 将给定字节写入缓冲区索引位置不会移动 position

使用 Buffer 读写数据一般遵循以下四个步骤

  1. 写入数据到 Buffer
  2. 调用 flip() 方法,转换为读取模式
  3. 从 Buffer 中读取数据
  4. 调用 buffer.clear() 方法或 buffer.compact() 方法清除缓冲区

buffer实例

java
 1/**
 2 * buffer 常用 API
 3 *
 4 * @author lei
 5 * @date 2023/05/31
 6 */
 7public class BufferDemo {
 8    public static void main(String[] args) {
 9        // 分配一个字节缓冲区,容量为10
10        ByteBuffer buffer = ByteBuffer.allocate(10);
11
12        // 输出 buffer 的当前位置、限制、容量
13        System.out.println(buffer.position());   // 0
14        System.out.println(buffer.limit());  // 10
15        System.out.println(buffer.capacity());  // 10
16
17        // 往缓冲区中添加数据;此时 position:3  limit:10  capacity:10
18        String name = "lei";
19        buffer.put(name.getBytes());
20
21        // 将缓冲区的界限(limit)设置为当前位置,并将当前位置(position)设置为 0(可读模式)
22        // 此时 position:0  limit:3  capacity:10
23        buffer.flip();
24
25        // 从缓冲区读取数据,a1='l'
26        // 此时 position:1  limit:3  capacity:10
27        char a1 = (char) buffer.get();
28
29        // 清除缓冲区,此时 a2='l',说明 clear 只是重置 position、limit,不会清除数据
30        // 此时 position:0  limit:10  capacity:10
31        buffer.clear();
32        char a2 = (char) buffer.get();
33
34        // 判断是否还有数据,有返回true,无返回false
35        boolean b = buffer.hasRemaining();
36
37        // 返回剩余可读取元素个数
38        int i = buffer.remaining();
39
40    }
41}

直接与非直接缓冲区

什么是直接内存与非直接内存

根据官方文档描述:字节缓冲区(byte buffer)可以是两种类型,一种是基于直接内存(也就是非堆内存);另一种是非直接内存(也就是堆内存)。对于直接内存来说,JVM 将会在 IO 操作上具有更高的性能,因为它直接作用域本地系统的 IO 操作。而非直接内存,也就是堆内存中的数据,如果要做 IO 操作,会先从本地进程内存复制到直接内存,再利用本地 IO 处理

  • 直接内存:本地 IO ==> 直接内存 ==> 本地 IO
  • 非直接内存:本地 IO ==> 直接内存 ==> 非直接内存 ==> 直接内存 ==> 本地 IO

很明显,在做 IO 处理时,比如网络发送大量数据时,直接内存会具有更高的效率。直接内存使用 allocateDirect 创建,但是它比申请普通堆内存需要耗费更高的性能。不过,这部分数据是在 JVM 之外的,因此他不会占用英勇的内存。所以,当有大量数据且数据的生命周期很长,那么就比较适合使用直接内存。一般来说,如果不是能带来明显的性能提升,还是推荐使用堆内存。字节缓冲区是直接内存还是非直接内存可以通过调用其 isDirect() 方法来确定

使用场景

  1. 有很大的数据需要存储,它的生命周期又很长
  2. 适合频繁的 IO 操作,比如网络并发场景

通道(Channel)

通道(Channel):由 Java.nio.channels 包定义的。Channel 表示 IO 源与目标打开的连接。Channel 类似于传统的"流",只不过 Channel 本身不能直接访问数据,Channel 只能与 Buffer进行交互

  1. NIO 的通道类似于流,但有些区别如下

    • 通道可以同时进行读写,而流只能读或者只能写
    • 通道可以实现异步读写数据
    • 通道可以从缓冲读数据,也可以写数据到缓冲
  2. BIO 中的 stream 是单向的,例如 FileInputStream 对象只能进行读取数据的操作,而 NIO 中的通道(Channel)是双向的,可以读操作,也可以写操作

  3. Channel 在 NIO 中是一个接口

    java
    1public interface Channel extend closeable{}

常用的 Channel 实现类

  • FileChannel:用于读取、写入、映射和操作文件的通道
  • DatagramChannel:通过 UDP 读写网络中的数据通道
  • SocketChannel:通过 UDP 读写网络中数据
  • ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。【ServerSocketChannel 类似于 ServerSocket,SocketChannel 类似于 Socket】

FileChannel 类

获取通道的一种方式是对支持通道的对象调用 getChammel() 方法。支持通道的类如下:

  • FileInputStream
  • FileOutputStream
  • RandomAccessFile
  • DatagramSocket
  • Socket
  • ServetSocket

获取通道的其他方式是使用 Files 类的静态方法 newByteChannel() 获取字节通道。或者通过通道的静态方法 open() 打开并返回指定通道

FileChannel 的常用方法

java
1int read(ByteBuffer dst);  // 从 Channel 中读取数据到 ByteBuffer 中
2long read(ByteBuffer[] dsts);  // 将 Channel 中数据“分散”到 ByteBuffer[]
3int write(ByteBuffer src);  // 将 ByteBuffer 中数据写到 Channel 中
4long write(ByteBuffer[] srcs);  // 将 ByteBuffer[] 中数据“聚集”到 Channel 中
5long position();  // 返回此通道文件的位置
6FileChannel position(long p);  // 设置此通道的文件位置
7long size();  // 返回此通道文件的当前大小
8FileChannel truncate(long s);  // 将此通道的文件截取为给定的大小
9void force(boolean metaData);  // 强制将所有对此通道的文件更新写入到存储设备中

实例:本地读写文件

java
 1/**
 2 * 通道本地读写文件演示
 3 *
 4 * @author lei
 5 * @date 2023/05/31
 6 */
 7public class ChannelDemo {
 8        /**
 9     * 从文件读取
10     */
11    private static void readFromFile() throws Exception {
12
13        // 获得输入流
14        FileInputStream fis = new FileInputStream("data.txt");
15        // 得到输入流对应的通道
16        FileChannel channel = fis.getChannel();
17        // 分配缓冲区
18        ByteBuffer buffer = ByteBuffer.allocate(1024);
19        // 从通道中读取数据到缓冲区
20        channel.read(buffer);
21        // 将 limit 设置到当前位置,并将缓冲区当前位置重置
22        buffer.flip();
23        System.out.println(new String(buffer.array(), 0, buffer.remaining()));
24
25    }
26
27    /**
28     * 写入文件
29     */
30    private static void writeToFile() throws Exception {
31        // 获取字节输出流
32        FileOutputStream fos = new FileOutputStream("data.txt");
33        // 得到字节输出流对应的通道
34        FileChannel channel = fos.getChannel();
35        // 分配缓冲区
36        ByteBuffer buffer = ByteBuffer.allocate(1024);
37        buffer.put("hello world".getBytes());
38        // 将缓冲区切换为写出模式
39        buffer.flip();
40        channel.write(buffer);
41        channel.close();
42    }
43}

案例:使用 Buffer 完成文件的复制

java
 1public class ChannelDemo {
 2     /**
 3     * 文件复制
 4     */
 5	private static void copy() throws Exception {
 6        File srcFile = new File("H:\\image\\win10_9.20.wim");
 7        File destFile = new File("H:\\image\\win10_9.20_new.wim");
 8
 9        // 获取宿输入输出流
10        FileInputStream fis = new FileInputStream(srcFile);
11        FileOutputStream fos = new FileOutputStream(destFile);
12
13        // 获取管道
14        FileChannel fisChannel = fis.getChannel();
15        FileChannel fosChannel = fos.getChannel();
16
17        // 分配一个大小为 10M 的缓冲区
18        ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024 * 10);
19
20        while (true) {
21            // 清空缓存 position设置为0 limit 与 capacity 大小一致
22            buffer.clear();
23            // 读取数据
24            int flag = fisChannel.read(buffer);
25            if (flag == -1) {
26                break;
27            }
28            // limit 设置到当前位置,然后将 position 设置为0(只读)
29            buffer.flip();
30            // 写出数据
31            fosChannel.write(buffer);
32        }
33
34        // 关闭通道
35        fisChannel.close();
36        fosChannel.close();
37    }
38}

案例:分散(Scatter)和聚集(Gather)

分散读取(Scatter):是把 Channel 通道的数据读入到多个缓冲区去;按顺序依次写满

聚集写入(Gathering):是将多个 Biffer 中的数据“聚集”到 Channel

java
 1public class ChannelDemo {
 2    /**
 3     * 分散和聚集
 4     */
 5    private static void copyA() throws Exception {
 6        File srcFile = new File("data.txt");
 7        File destFile = new File("dataA.txt");
 8
 9        // 获取宿输入输出流
10        FileInputStream fis = new FileInputStream(srcFile);
11        FileOutputStream fos = new FileOutputStream(destFile);
12
13        // 获取管道
14        FileChannel fisChannel = fis.getChannel();
15        FileChannel fosChannel = fos.getChannel();
16
17        // 定义多个缓冲区
18        ByteBuffer[] buffers = new ByteBuffer[]{ByteBuffer.allocate(3), ByteBuffer.allocate(1024)};
19
20        // 分散读取
21        fisChannel.read(buffers);
22        for (ByteBuffer buffer : buffers) {
23            buffer.flip();
24            System.out.println(new String(buffer.array(), 0, buffer.remaining()));
25        }
26
27        // 聚集写入
28        fosChannel.write(buffers);
29
30        // 关闭通道
31        fisChannel.close();
32        fosChannel.close();
33    }
34}

案例:transferFrom() 与 transferTo()

通道之间的数据转换

  • transferFrom() :从目标通道中去复制原通道数据
  • transferTo():把原通道数据复制到目标通道
java
 1public class ChannelDemo {
 2    public static void main(String[] args) {
 3        try {
 4            transferTest();
 5        } catch (Exception e) {
 6            e.printStackTrace();
 7        }
 8
 9    }
10
11    /**
12     * transferFrom 与 transferTo
13     */
14    private static void transferTest() throws Exception {
15        File srcFile = new File("data.txt");
16        File destFile = new File("data2.txt");
17
18        // 获取宿输入输出流
19        FileInputStream fis = new FileInputStream(srcFile);
20        FileOutputStream fos = new FileOutputStream(destFile);
21
22        // 获取管道
23        FileChannel fisChannel = fis.getChannel();
24        FileChannel fosChannel = fos.getChannel();
25
26        // 将 fisChannel 通道中数据直接复制到 fosChannel 中去
27//        fisChannel.transferTo(fisChannel.position(),fisChannel.size(),fosChannel);
28
29        // 从 fisChannel 中复制数据到 fosChannel 中
30        fosChannel.transferFrom(fisChannel, fisChannel.position(), fisChannel.size());
31
32        // 关闭通道
33        fisChannel.close();
34        fosChannel.close();
35    }
36}

选择器(Selector)

选择器(Selector)是SelectableChannel 对象的多路复用器,Selector 可以同时监控多个 SelectableChannel 的 IO 状况,也就是说,利用 Selector 可使一个单独的线程管理多个 Channel。Selector 是非阻塞 IO 的核心

image-20230531173402772

  • Java 的 NIO,用非阻塞的 IO 方式。用一个线程,处理多个客户端连接,就会使用到 Selector(选择器)
  • Selector 能够检测多个注册的通道上是否有事件发生(注意:多个 Channel 以事件的方式可以注册到同一个 Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求
  • 只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程
  • 避免了多线程之间的上线文切换导致的开销

**选择器(Selector)的应用 **

创建 Selector:通过调用 Seleceor.open() 方法创建一个 Selector

java
1Selector selector = Selector.open();

向选择器注册通道:SelectableChannel.register(Selector sel,int pos)

java
 1// 获取通道
 2ServerSocketChannel channel = ServerSocketChannel.open();
 3// 切换非阻塞模式
 4channel.configureBlocking(false);
 5// 绑定连接
 6channel.bind(new InetSocketAddress(9999));
 7// 获取选择器
 8Selector selector=Selector.open();
 9// 将通道注册到选择器上,并指定 "监听接收事件"
10channel.register(selector, SelectionKey.OP_ACCEPT);

当调用 SelectionKey register(Selector sel, int ops) 将通道注册到选择器时,选择器对通道的监听事件需要通过第二个参数 ops 指定。可以监听的事件类型 (SelectionKey 的四个常量)

  • 读:SelectionKey.OP_READ (1)
  • 写:SelectionKey.OP_WRITE(4)
  • 连接:SelectionKey.OP_CONNECT(8)
  • 接收:SelectionKey.OP_ACCEPT(16)

若注册不止监听一个事件,则可以使用“位或操作符连接”

java
1int ops = SelectionKey.OP_CONNECT | SelectionKey.OP_READ;

NIO 网络通信原理分析

服务端流程

  1. 获取通道:当客户端连接服务端时,服务端会通过 ServerSocketChannel 得到 SocketChannel

    java
    1ServerSocketChannel channel = ServerSocketChannel.open();
  2. 切换非阻塞模式

    java
    1channel.configureBlocking(false);
  3. 绑定连接

    java
    1channel.bind(new InetSocketAddress(9999));
  4. 获取选择器

    java
    1Selector selector=Selector.open();
  5. 将通道注册到选择器上,并指定 “监听接收事件”

    java
    1channel.register(selector, SelectionKey.OP_ACCEPT);
  6. 轮询获取选择器上已经“准备就绪”的事件

    java
     1// 轮询获取选择器上已经“准备就绪”的事件,阻塞方法
     2while (selector.select() > 0) {
     3    // 获取当前选择器中所有注册的 “选择键(已经就绪的监听事件)”
     4    Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
     5    while (keyIterator.hasNext()){
     6        // 获取准备就绪的事件
     7        SelectionKey sk = keyIterator.next();
     8        // 判断具体是什么事件准备就绪
     9        if (sk.isAcceptable()) {
    10            // 若“接收准备就绪”,获取客户端连接
    11            SocketChannel channel = ssChannel.accept();
    12            // 切换非阻塞模式
    13            channel.configureBlocking(false);
    14            // 将该通道注册到选择器上
    15            channel.register(selector,SelectionKey.OP_READ);
    16        }else if (sk.isReadable()){
    17            // 获取选择器上"读准备就绪"的通道
    18            SocketChannel channel = (SocketChannel)sk.channel();
    19            // 读取数据
    20            ByteBuffer buffer = ByteBuffer.allocate(1024);
    21            int len=0;
    22            while ((len=channel.read(buffer))>0){
    23                buffer.flip();
    24                System.out.println(new String(buffer.array(),0,len));
    25                buffer.clear();
    26            }
    27        }
    28        // 取消选择键
    29        keyIterator.remove();
    30    }
    31}

    客户端流程

    1. 获取通道

      java
      1InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9999);
      2SocketChannel channel = SocketChannel.open(address);
    2. 切换非阻塞模式

      java
      1channel.configureBlocking(false);
    3. 分配指定大小缓冲区

      java
      1ByteBuffer buffer = ByteBuffer.allocate(1024);
    4. 发送数据给服务端

      java
      1Scanner scan = new Scanner(System.in);
      2while (scan.hasNext()) {
      3    String s = scan.nextLine();
      4    buffer.put((DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now())
      5                + ":" + s).getBytes());
      6    buffer.flip();
      7    channel.write(buffer);
      8    buffer.clear();
      9}
    5. 关闭通道

      java
      1channel.close();

NIO 网络通信入门案例

服务端

java
 1public static void main(String[] args) throws Exception {
 2    System.out.println("------服务端启动------");
 3    // 获取通道
 4    ServerSocketChannel ssChannel = ServerSocketChannel.open();
 5    // 绑定端口
 6    ssChannel.bind(new InetSocketAddress(9999));
 7    // 设置非阻塞模式
 8    ssChannel.configureBlocking(false);
 9    // 获取选择器
10    Selector selector = Selector.open();
11    // 选择器注册通道,并绑定事件
12    ssChannel.register(selector, SelectionKey.OP_ACCEPT);
13
14    // 监听事件 select()是一个阻塞方法,当有事件发生时返回事件个数(可能有多个通道发生事件)
15    while (selector.select() > 0) {
16        System.out.println("一轮事件处理");
17        Iterator<SelectionKey> it = selector.selectedKeys().iterator();
18        while (it.hasNext()) {
19            // 获取事件
20            SelectionKey selectionKey = it.next();
21            // 对不同的事件进行处理
22            if (selectionKey.isAcceptable()) {
23                // 客户端通道
24                SelectableChannel channel = ssChannel.accept();
25                // 设置非阻塞模式
26                channel.configureBlocking(false);
27                // 注册到 selector
28                channel.register(selector, SelectionKey.OP_READ);
29            } else if (selectionKey.isReadable()) {
30                SocketChannel channel = (SocketChannel) selectionKey.channel();
31                // 分配缓冲区
32                ByteBuffer buf = ByteBuffer.allocate(1024);
33
34                while (channel.read(buf) > 0) {
35                    // 切换为读(设置 position 和 limit)
36                    buf.flip();
37                    System.out.println(new String(buf.array(), 0, buf.remaining()));
38                    // 清除缓冲区信息(position limit)
39                    buf.clear();
40                }
41            }
42            // 移除已经处理过了的事件
43            it.remove();
44        }
45    }
46}

客户端

java
 1public static void main(String[] args) throws Exception {
 2    // 获取和服务端的通道
 3    InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9999);
 4    SocketChannel channel = SocketChannel.open(address);
 5    // 设置非阻塞模式
 6    channel.configureBlocking(false);
 7    // 分配缓冲区
 8    ByteBuffer buffer = ByteBuffer.allocate(1024);
 9    Scanner scan = new Scanner(System.in);
10    while (scan.hasNext()) {
11        String s = scan.nextLine();
12        byte[] msgBytes = String.format("%s【%s】:%s", Thread.currentThread().getName(),
13                                        DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss").format(LocalDateTime.now()), s)
14            .getBytes();
15        buffer.put(msgBytes);
16        buffer.flip();
17        // 将缓冲区数据写入通道
18        channel.write(buffer);
19        buffer.clear();
20    }
21}

NIO 实现群聊系统

目标

  • 编写一个 NIO 群聊系统,实现客户端与客户端的通信需求(非阻塞)
  • 服务端:可以监测用户上线、离线,并实现消息转发功能
  • 客户端:通过 Channel 可以无阻塞发送消息给其他所有客户端用户,同时可以接受其他客户端用户通过服务端转发来的消息

服务端代码

java
  1public class Server {
  2    private final ServerSocketChannel serverSocketChannel;
  3    private final Selector selector;
  4    private final static int PORT = 9999;
  5
  6    {
  7        try {
  8            // 打开通道
  9            serverSocketChannel = ServerSocketChannel.open();
 10            // 绑定端口
 11            serverSocketChannel.bind(new InetSocketAddress(PORT));
 12            // 设置非阻塞模式
 13            serverSocketChannel.configureBlocking(false);
 14            // 获取选择器
 15            selector = Selector.open();
 16            // 将服务器通道注册到选择器上,并绑定“接收事件”
 17            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
 18        } catch (Exception e) {
 19            e.printStackTrace();
 20            throw new RuntimeException(e.getMessage());
 21        }
 22    }
 23
 24    public static void main(String[] args) {
 25        Server server = new Server();
 26        server.listen();
 27    }
 28
 29    /**
 30     * 监听处理事件
 31     */
 32    private void listen() {
 33        try {
 34            // 阻塞监听,当选择器有事件时进入
 35            while (selector.select() > 0) {
 36                // 获取已准备就绪的事件的迭代器
 37                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
 38                while (iterator.hasNext()) {
 39                    SelectionKey selectionKey = iterator.next();
 40                    // 根据不同类型处理事件
 41                    if (selectionKey.isAcceptable()) {
 42                        // 客户端接入通道
 43                        SocketChannel socketChannel = serverSocketChannel.accept();
 44                        socketChannel.configureBlocking(false);
 45                        // 注册到选择器 监听读数据的事件
 46                        socketChannel.register(selector, SelectionKey.OP_READ);
 47                    } else if (selectionKey.isReadable()) {
 48                        // 处理客户端的消息,实现转发逻辑
 49                        readClientData(selectionKey);
 50                    }
 51                    // 移除已经处理的事件
 52                    iterator.remove();
 53                }
 54            }
 55        } catch (Exception e) {
 56            e.printStackTrace();
 57        }
 58    }
 59
 60    private void readClientData(SelectionKey selectionKey) {
 61        SocketChannel socketChannel = null;
 62        try {
 63            // 获取客户端通道
 64            socketChannel = (SocketChannel) selectionKey.channel();
 65            // 创建缓冲区
 66            ByteBuffer buffer = ByteBuffer.allocate(1024);
 67            int count = socketChannel.read(buffer);
 68            if (count > 0) {
 69                buffer.flip();
 70                String msg = new String(buffer.array(), 0, buffer.remaining());
 71                System.out.println("服务端接收到:" + msg);
 72                sendMsgToAllClient(msg, socketChannel);
 73            }
 74
 75        } catch (Exception e) {
 76            // 这里如果出异常为,读取数据是通道离线
 77            try {
 78                System.out.println("有人离线了:" + socketChannel.getRemoteAddress());
 79                //当前客户端离线
 80                selectionKey.cancel();//取消注册
 81                socketChannel.close();
 82            } catch (IOException e1) {
 83                e1.printStackTrace();
 84            }
 85
 86        }
 87    }
 88
 89    /**
 90     * 将消息发送给所有人,除了自己
 91     */
 92    private void sendMsgToAllClient(String msg, SocketChannel channel) throws IOException {
 93        for (SelectionKey key : selector.keys()) {
 94            SelectableChannel curSelectChannel = key.channel();
 95            // 不发消息给自己
 96            if (curSelectChannel instanceof SocketChannel && curSelectChannel != channel) {
 97                SocketChannel curChannel = (SocketChannel) curSelectChannel;
 98                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
 99                curChannel.write(buffer);
100            }
101        }
102    }
103}

客户端代码

java
 1public class Client {
 2    // 定义客户端相关属性
 3    private final Selector selector;
 4    private static final int PORT = 9999;
 5    private final SocketChannel socketChannel;
 6    // 初始化客户端信息
 7    {
 8        try {
 9            // 创建选择器
10            selector = Selector.open();
11            // 连接服务器
12            socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",PORT));
13            // 设置非阻塞模式
14            socketChannel.configureBlocking(false);
15            // 注册到选择器
16            socketChannel.register(selector, SelectionKey.OP_READ);
17        } catch (IOException e) {
18            throw new RuntimeException(e);
19        }
20    }
21
22    public static void main(String[] args) {
23        Client client = new Client();
24        // 定义一个线程,专门负责监听服务端发送过来的读消息事件
25        new Thread(() -> {
26            try {
27                client.readInfo();
28            }catch (Exception e){
29                e.printStackTrace();
30            }
31        }).start();
32        // 发消息
33        Scanner sc = new Scanner(System.in);
34        while (sc.hasNextLine()){
35            System.out.println("------------------");
36            String s = sc.nextLine();
37            client.sendToServer(s);
38        }
39    }
40    private void sendToServer(String s) {
41        try {
42            socketChannel.write(ByteBuffer.wrap(("波仔说:" + s).getBytes()));
43        } catch (IOException e) {
44            e.printStackTrace();
45        }
46    }
47    //
48    private void readInfo() throws IOException {
49        while(selector.select() > 0){
50            Iterator<SelectionKey> iterator =
51                    selector.selectedKeys().iterator();
52            while (iterator.hasNext()){
53                SelectionKey key = iterator.next();
54                if(key.isReadable()){
55                    SocketChannel sc = (SocketChannel) key.channel();
56                    ByteBuffer buffer = ByteBuffer.allocate(1024);
57                    sc.read(buffer);
58                    System.out.println(new String(buffer.array()).trim());
59                    System.out.println("-dsd------------------------");
60                }
61                iterator.remove();
62            }
63        }
64    }
65}

AIO 理解

AIO 编程

  • Java AIO(NIO.2):异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是 由OS先完成了再通知服务器应用去启动线程进行处理。

  • AIO 是异步非阻塞,基于 NIO,可以称之为 NIO2.0

BIONIOAIO
SocketSocketChannelAsynchronousSocketChannel
sERVERsOCKETServerSocketChannelAsynchronousServerSocketChannel

与 NIO 不同,当进行读写操作时,只须直接调用 API 的 read 或 write 方法即可,这两种方法均为异步的, 对于读操作而言,当有流可读时,操作系统会将可读的流传入 read 方法的缓冲区,对于写操作而言,当 操作系统将 write 方法传递的流写入完毕时,操作系统主动通知应用程序 。

即可以理解为,read/write 方法都是异步的,完成后会主动调用回调函数。在 JDK1.7 中,这部分内容被 称作 NIO.2,主要在 Java.nio.channel包下增加了下面四个异步通道:

  • AsynchronousSocketChannel
  • AsynchronousServerSocketChannel
  • AsynchronousFileChannel
  • AsynchronousDatagramChannel

BIO、NIO、AIO总结

BIO、NIO、AIO:

  • Java BlO:同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就 需要启动 一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过 线程池机制改善。
  • Java NIO:同步非阻塞,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册 到多路复用器上,多路复用器轮询到连接有 I/O 请求时才启动一个线程进行处理。
  • Java AIO(N 10.2):异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的 I/O 请求都是 由 OS先完成了再通知服务器应用去启动线程进行处理。

BIO、NIO、AIO适用场景分析:

  • BlO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应 用中, JDK1.4 以前的唯一选择,但程序直观简单易理解。

  • NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用 中,编程比较 复杂,JDK1 .4 开始支持。

  • AlO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并 发操作,编 程比较复杂,JDK7 开始支持

使用滚轮缩放
按住拖动