Java PipedReader类
PipedReader 是字符管道输入流,它继承于Writer。它用于读取对应绑定的管道字符输出流写入其内置字符缓存数组buffer中的字符、借此来实现线程之间的通信、PipedReader中专门有两个方法供PipedWriter调用、receive(char c)、receive(char[] b, int off, intlen)、使得PipedWriter可以将字符或者字符数组写入PipedReader的buffer中。
构造函数
PipedReader(PipedWriter src) 使用默认的buf的大小和传入的PipedWriter构造PipedReader PipedReader(PipedWriter src, int pipeSize) 使用指定的buf的大小和传入的pw构造PipedReader PipedReader() 使用默认大小构造PipedReader PipedReader(int pipeSize) 使用指定大小构造PipedReader
关键字
boolean closedByWriter = false; 标记PipedWriter是否关闭 boolean closedByReader = false; 标记PipedReader是否关闭 boolean connected = false; 标记PipedWriter与标记PipedReader是否关闭的连接是否关闭 Thread readSide; 拥有PipedReader的线程 Thread writeSide; 拥有PipedWriter的线程 private static final int DEFAULT_PIPE_SIZE = 1024; 用于循环存放PipedWriter写入的字符数组的默认大小 char buffer[]; 用于循环存放PipedWriter写入的字符数组 int in = -1; buf中下一个存放PipedWriter调用此PipedReader的receive(int c)时、c在buf中存放的位置的下标。此为初始状态、即buf中没有字符 int out = 0; buf中下一个被读取的字符的下标
方法
void close() 清空buf中数据、关闭此流。 void connect(PipedWriter src) 调用与此流绑定的PipedWriter的connect方法、将此流与对应的PipedWriter绑定 synchronized boolean ready() 查看此流是否可读 synchronized int read() 从buf中读取一个字符、以整数形式返回 synchronized int read(char cbuf[], int off, int len) 将buf中读取一部分字符到cbuf中。 synchronized void receive(int c) PipedWriter调用此流的此方法、向PipedReader的buf以整数形式中写入一个字符。 synchronized void receive(char c[], int off, int len) 将c中一部分字符写入到buf中。 synchronized void receivedLast() 提醒所有等待的线程、已经接收到了最后一个字符。
源码分析
public class PipedReader extends Reader { // “PipedWriter”是否关闭的标记 boolean closedByWriter = false; // “PipedReader”是否关闭的标记 boolean closedByReader = false; // “PipedReader”与“PipedWriter”是否连接的标记 // 它在PipedWriter的connect()连接函数中被设置为true boolean connected = false; Thread readSide; // 读取“管道”数据的线程 Thread writeSide; // 向“管道”写入数据的线程 // “管道”的默认大小 private static final int DEFAULT_PIPE_SIZE = 1024; // 缓冲区 char buffer[]; //下一个写入字符的位置。in==out代表满,说明“写入的数据”全部被读取了。 int in = -1; //下一个读取字符的位置。in==out代表满,说明“写入的数据”全部被读取了。 int out = 0; // 构造函数:指定与“PipedReader”关联的“PipedWriter” public PipedReader(PipedWriter src) throws IOException { this(src, DEFAULT_PIPE_SIZE); } // 构造函数:指定与“PipedReader”关联的“PipedWriter”,以及“缓冲区大小” public PipedReader(PipedWriter src, int pipeSize) throws IOException { initPipe(pipeSize); connect(src); } // 构造函数:默认缓冲区大小是1024字符 public PipedReader() { initPipe(DEFAULT_PIPE_SIZE); } // 构造函数:指定缓冲区大小是pipeSize public PipedReader(int pipeSize) { initPipe(pipeSize); } // 初始化“管道”:新建缓冲区大小 private void initPipe(int pipeSize) { if (pipeSize <= 0) { throw new IllegalArgumentException("Pipe size <= 0"); } buffer = new char[pipeSize]; } // 将“PipedReader”和“PipedWriter”绑定。 // 实际上,这里调用的是PipedWriter的connect()函数 public void connect(PipedWriter src) throws IOException { src.connect(this); } // 接收int类型的数据b。 // 它只会在PipedWriter的write(int b)中会被调用 synchronized void receive(int c) throws IOException { // 检查管道状态 if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByWriter || closedByReader) { throw new IOException("Pipe closed"); } else if (readSide != null && !readSide.isAlive()) { throw new IOException("Read end dead"); } // 获取“写入管道”的线程 writeSide = Thread.currentThread(); // 如果“管道中被读取的数据,等于写入管道的数据”时, // 则每隔1000ms检查“管道状态”,并唤醒管道操作:若有“读取管道数据线程被阻塞”,则唤醒该线程。 while (in == out) { if ((readSide != null) && !readSide.isAlive()) { throw new IOException("Pipe broken"); } /* full: kick any waiting readers */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } if (in < 0) { in = 0; out = 0; } buffer[in++] = (char) c; if (in >= buffer.length) { in = 0; } } // 接收字符数组b。 synchronized void receive(char c[], int off, int len) throws IOException { while (--len >= 0) { receive(c[off++]); } } // 当PipedWriter被关闭时,被调用 synchronized void receivedLast() { closedByWriter = true; notifyAll(); } // 从管道(的缓冲)中读取一个字符,并将其转换成int类型 public synchronized int read() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } readSide = Thread.currentThread(); int trials = 2; while (in < 0) { if (closedByWriter) { /* closed by writer, return EOF */ return -1; } if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { throw new IOException("Pipe broken"); } /* might be a writer waiting */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } int ret = buffer[out++]; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } return ret; } // 从管道(的缓冲)中读取数据,并将其存入到数组b中 public synchronized int read(char cbuf[], int off, int len) throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } if ((off < 0) || (off > cbuf.length) || (len < 0) || ((off + len) > cbuf.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } /* possibly wait on the first character */ int c = read(); if (c < 0) { return -1; } cbuf[off] = (char)c; int rlen = 1; while ((in >= 0) && (--len > 0)) { cbuf[off + rlen] = buffer[out++]; rlen++; if (out >= buffer.length) { out = 0; } if (in == out) { /* now empty */ in = -1; } } return rlen; } // 是否能从管道中读取下一个数据 public synchronized boolean ready() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } if (in < 0) { return false; } else { return true; } } // 关闭PipedReader public void close() throws IOException { in = -1; closedByReader = true; } }
例子
public class Receiver extends Thread { // 管道输入流对象。 // 它和“管道输出流(PipedWriter)”对象绑定, // 从而可以接收“管道输出流”的数据,再让用户读取。 private PipedReader in = new PipedReader(); // 获得“管道输入流对象” public PipedReader getReader() { return in; } @Override public void run(){ readMessageOnce() ; //readMessageContinued() ; } // 从“管道输入流”中读取1次数据 public void readMessageOnce(){ // 虽然buf的大小是2048个字符,但最多只会从“管道输入流”中读取1024个字符。 // 因为,“管道输入流”的缓冲区大小默认只有1024个字符。 char[] buf = new char[2048]; try { int len = in.read(buf); System.out.println(new String(buf,0,len)); in.close(); } catch (IOException e) { e.printStackTrace(); } } // 从“管道输入流”读取>1024个字符时,就停止读取 public void readMessageContinued() { int total=0; while(true) { char[] buf = new char[1024]; try { int len = in.read(buf); total += len; System.out.println(new String(buf,0,len)); // 若读取的字符总数>1024,则退出循环。 if (total > 1024) break; } catch (IOException e) { e.printStackTrace(); } } try { in.close(); } catch (IOException e) { e.printStackTrace(); } } }
总结
PipedReader、PipedWriter两者的结合如鸳鸯一般、离开哪一方都不能继续存在、同时又如连理枝一般、PipedWriter先通过connect(PipedReader sink)来确定关系、并初始化PipedReader状态、告诉PipedReader只能属于这个PipedWriter、connect =true、当想赠与PipedReader字符时、就直接调用receive(char c) 、receive(char[] b, int off, int len)来将字符或者字符数组放入PipedReader的存折buffer中。
版权声明:本文为JAVASCHOOL原创文章,未经本站允许不得转载。