Java PipedReader类

PipedReader 是字符管道输入流,它继承于Writer。它用于读取对应绑定的管道字符输出流写入其内置字符缓存数组buffer中的字符、借此来实现线程之间的通信、PipedReader中专门有两个方法供PipedWriter调用、receive(char c)、receive(char[] b, int off, intlen)、使得PipedWriter可以将字符或者字符数组写入PipedReader的buffer中。

构造函数

PipedReader(PipedWriter src)    使用默认的buf的大小和传入的PipedWriter构造PipedReader 
PipedReader(PipedWriter src, int pipeSize)      使用指定的buf的大小和传入的pw构造PipedReader  
PipedReader()       使用默认大小构造PipedReader
PipedReader(int pipeSize)       使用指定大小构造PipedReader

关键字

boolean closedByWriter = false;     标记PipedWriter是否关闭  
boolean closedByReader = false;      标记PipedReader是否关闭  
boolean connected = false;           标记PipedWriter与标记PipedReader是否关闭的连接是否关闭  
Thread readSide;     拥有PipedReader的线程  
Thread writeSide;    拥有PipedWriter的线程  
private static final int DEFAULT_PIPE_SIZE = 1024;       用于循环存放PipedWriter写入的字符数组的默认大小  
char buffer[];       用于循环存放PipedWriter写入的字符数组  
int in = -1; buf中下一个存放PipedWriter调用此PipedReader的receive(int c)时、c在buf中存放的位置的下标。此为初始状态、即buf中没有字符  
int out = 0; buf中下一个被读取的字符的下标  

方法

void close()    清空buf中数据、关闭此流。  
void connect(PipedWriter src)   调用与此流绑定的PipedWriter的connect方法、将此流与对应的PipedWriter绑定  
synchronized boolean ready()    查看此流是否可读  
synchronized int read()     从buf中读取一个字符、以整数形式返回  
synchronized int read(char cbuf[], int off, int len)    将buf中读取一部分字符到cbuf中。  
synchronized void receive(int c)    PipedWriter调用此流的此方法、向PipedReader的buf以整数形式中写入一个字符。  
synchronized void receive(char c[], int off, int len)   将c中一部分字符写入到buf中。  
synchronized void receivedLast()    提醒所有等待的线程、已经接收到了最后一个字符。  

源码分析

public class PipedReader extends Reader {
    // “PipedWriter”是否关闭的标记
    boolean closedByWriter = false;
    // “PipedReader”是否关闭的标记
    boolean closedByReader = false;
    // “PipedReader”与“PipedWriter”是否连接的标记
    // 它在PipedWriter的connect()连接函数中被设置为true
    boolean connected = false;

    Thread readSide;    // 读取“管道”数据的线程
    Thread writeSide;    // 向“管道”写入数据的线程

    // “管道”的默认大小
    private static final int DEFAULT_PIPE_SIZE = 1024;

    // 缓冲区
    char buffer[];

    //下一个写入字符的位置。in==out代表满,说明“写入的数据”全部被读取了。
    int in = -1;
    //下一个读取字符的位置。in==out代表满,说明“写入的数据”全部被读取了。
    int out = 0;

    // 构造函数:指定与“PipedReader”关联的“PipedWriter”
    public PipedReader(PipedWriter src) throws IOException {
        this(src, DEFAULT_PIPE_SIZE);
    }

    // 构造函数:指定与“PipedReader”关联的“PipedWriter”,以及“缓冲区大小”
    public PipedReader(PipedWriter src, int pipeSize) throws IOException {
        initPipe(pipeSize);
        connect(src);
    }

    // 构造函数:默认缓冲区大小是1024字符
    public PipedReader() {
        initPipe(DEFAULT_PIPE_SIZE);
    }

    // 构造函数:指定缓冲区大小是pipeSize
    public PipedReader(int pipeSize) {
        initPipe(pipeSize);
    }

    // 初始化“管道”:新建缓冲区大小
    private void initPipe(int pipeSize) {
        if (pipeSize <= 0) {
            throw new IllegalArgumentException("Pipe size <= 0");
        }
        buffer = new char[pipeSize];
    }

    // 将“PipedReader”和“PipedWriter”绑定。
    // 实际上,这里调用的是PipedWriter的connect()函数
    public void connect(PipedWriter src) throws IOException {
        src.connect(this);
    }

    // 接收int类型的数据b。
    // 它只会在PipedWriter的write(int b)中会被调用
    synchronized void receive(int c) throws IOException {
        // 检查管道状态
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByWriter || closedByReader) {
            throw new IOException("Pipe closed");
        } else if (readSide != null && !readSide.isAlive()) {
            throw new IOException("Read end dead");
        }

        // 获取“写入管道”的线程
        writeSide = Thread.currentThread();
        // 如果“管道中被读取的数据,等于写入管道的数据”时,
        // 则每隔1000ms检查“管道状态”,并唤醒管道操作:若有“读取管道数据线程被阻塞”,则唤醒该线程。
        while (in == out) {
            if ((readSide != null) && !readSide.isAlive()) {
                throw new IOException("Pipe broken");
            }
            /* full: kick any waiting readers */
            notifyAll();
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
        if (in < 0) {
            in = 0;
            out = 0;
        }
        buffer[in++] = (char) c;
        if (in >= buffer.length) {
            in = 0;
        }
    }

    // 接收字符数组b。
    synchronized void receive(char c[], int off, int len)  throws IOException {
        while (--len >= 0) {
            receive(c[off++]);
        }
    }

    // 当PipedWriter被关闭时,被调用
    synchronized void receivedLast() {
        closedByWriter = true;
        notifyAll();
    }

    // 从管道(的缓冲)中读取一个字符,并将其转换成int类型
    public synchronized int read()  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
            throw new IOException("Pipe closed");
        } else if (writeSide != null && !writeSide.isAlive()
                   && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        readSide = Thread.currentThread();
        int trials = 2;
        while (in < 0) {
            if (closedByWriter) {
                /* closed by writer, return EOF */
                return -1;
            }
            if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
                throw new IOException("Pipe broken");
            }
            /* might be a writer waiting */
            notifyAll();
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
        int ret = buffer[out++];
        if (out >= buffer.length) {
            out = 0;
        }
        if (in == out) {
            /* now empty */
            in = -1;
        }
        return ret;
    }

    // 从管道(的缓冲)中读取数据,并将其存入到数组b中
    public synchronized int read(char cbuf[], int off, int len)  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
            throw new IOException("Pipe closed");
        } else if (writeSide != null && !writeSide.isAlive()
                   && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        if ((off < 0) || (off > cbuf.length) || (len < 0) ||
            ((off + len) > cbuf.length) || ((off + len) < 0)) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }

        /* possibly wait on the first character */
        int c = read();
        if (c < 0) {
            return -1;
        }
        cbuf[off] =  (char)c;
        int rlen = 1;
        while ((in >= 0) && (--len > 0)) {
            cbuf[off + rlen] = buffer[out++];
            rlen++;
            if (out >= buffer.length) {
                out = 0;
            }
            if (in == out) {
                /* now empty */
                in = -1;
            }
        }
        return rlen;
    }

    // 是否能从管道中读取下一个数据
    public synchronized boolean ready() throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
            throw new IOException("Pipe closed");
        } else if (writeSide != null && !writeSide.isAlive()
                   && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }
        if (in < 0) {
            return false;
        } else {
            return true;
        }
    }

    // 关闭PipedReader
    public void close()  throws IOException {
        in = -1;
        closedByReader = true;
    }
}

例子

public class Receiver extends Thread {
    // 管道输入流对象。  
    // 它和“管道输出流(PipedWriter)”对象绑定,  
    // 从而可以接收“管道输出流”的数据,再让用户读取。  
    private PipedReader in = new PipedReader();  
  
    // 获得“管道输入流对象”  
    public PipedReader getReader()  
    {  
        return in;  
    }  
  
    @Override  
    public void run(){  
        readMessageOnce() ;  
        //readMessageContinued() ;  
    }  
  
    // 从“管道输入流”中读取1次数据  
    public void readMessageOnce(){  
        // 虽然buf的大小是2048个字符,但最多只会从“管道输入流”中读取1024个字符。  
        // 因为,“管道输入流”的缓冲区大小默认只有1024个字符。  
        char[] buf = new char[2048];  
        try {  
            int len = in.read(buf);  
            System.out.println(new String(buf,0,len));  
            in.close();  
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  
  
    // 从“管道输入流”读取>1024个字符时,就停止读取  
    public void readMessageContinued()  
    {  
        int total=0;  
        while(true) {  
            char[] buf = new char[1024];  
            try {  
                int len = in.read(buf);  
                total += len;  
                System.out.println(new String(buf,0,len));  
                // 若读取的字符总数>1024,则退出循环。  
                if (total > 1024)  
                        break;  
                } catch (IOException e) {  
                    e.printStackTrace();  
                }  
        }  
  
        try {  
            in.close();  
        } catch (IOException e) {  
                e.printStackTrace();  
        }  
    }  
}

总结

PipedReader、PipedWriter两者的结合如鸳鸯一般、离开哪一方都不能继续存在、同时又如连理枝一般、PipedWriter先通过connect(PipedReader sink)来确定关系、并初始化PipedReader状态、告诉PipedReader只能属于这个PipedWriter、connect =true、当想赠与PipedReader字符时、就直接调用receive(char c) 、receive(char[] b, int off, int len)来将字符或者字符数组放入PipedReader的存折buffer中。

版权声明:本文为JAVASCHOOL原创文章,未经本站允许不得转载。