Thursday, June 7, 2012

Stream Madness

People often forget, or don't realize the ramifications of, the fact that Java's I/O streams and readers/writers are decorators. Sure, they're reminded of it every time they type InputStream in = new BufferedInputStream(new FileInputStream("/some/path"));, and that usually leads to grumbling about how verbose Java is and how stupid it is to have to type all of that and, while we're at it, why didn't they just make streams buffered to begin with?

This apparent eccentricity of Java (and many other languages), actually allows for some really neat tricks like transparently decompressing data and converting from one character encoding to another (e.g. with InputStreamReader). The really slick trick, though, is that you can completely change the input or add additional input. For example, you can inject data into a stream based on some value in the stream, as in the following code:
package streams;

import java.io.*;

public class ImportingStream extends InputStream {
  public static final int RECORD_SEPARATOR = 0x1E;

  private InputStream wrapped;
  private InputStream injecting;
 
  public ImportingStream(InputStream wrapped) {
    this.wrapped = wrapped;
  }
 
  @Override
  public int read() throws IOException {
    int retVal = -1;
    if(injecting != null) {
      retVal = injecting.read();
      if(retVal == -1) {
        /* done with the injected stream, null it out & call myself

         *  recursively */
        injecting.close();
        injecting = null;
        return read();
      }
    } else {
      retVal = wrapped.read();
      if(retVal == RECORD_SEPARATOR) {
        // starting a new injection of data
        ByteArrayOutputStream sink = new ByteArrayOutputStream();
        for(int read = wrapped.read(); -1 != read; read = wrapped.read()) {
          if(read == RECORD_SEPARATOR) {
             break;
          }
          sink.write(read);
        }
        

        injecting = new BufferedInputStream(
           new FileInputStream(new String(sink.toByteArray())));
         return read();
       }
    }
    return retVal;
  }
 
  public static void main(String[] args) throws IOException {
    ByteArrayOutputStream sink = new ByteArrayOutputStream();
    sink.write("Some header text ".getBytes());
    sink.write(RECORD_SEPARATOR);
    sink.write("/tmp/inserted_text.txt".getBytes());
    sink.write(RECORD_SEPARATOR);
    sink.write(" and this is some footer text".getBytes("utf8"));
    InputStream in = new ImportingStream(

      new ByteArrayInputStream(sink.toByteArray()));
    sink.reset();
    for(int read = in.read(); -1 != read; read = in.read()) {
      sink.write(read);
    }
    System.out.println(new String(sink.toByteArray()));
  }
}

Another interesting way that this can be used is to switch the method that you use to read the data mid-stream. One example where that might come in handy is parsing XML documents with embedded base-64 encoded binary; e.g. if you have an XML file with an embedded PDF that you want to index for searching in Lucene and you want the PDF's contents to be included inline. You could use a multi-step process where you split the document into sections, run Tika against the binary segments and then re-merge them, or you could just run Tika inline (though you'd probably want to use a co-processes/thread for running Tika in and either an exchanger or pipes, just to keep the code a little simpler). A vastly simpler, if more contrived, use-case for dealing with embedded data would be sending a large chunk of highly compressible data through a system that can only accept ASCII text and requires specific headers & footers on data. Here's example code for an InputStream for just such an occasion.


package streams;

import java.io.*;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.codec.binary.Base64InputStream;
import org.apache.commons.codec.binary.Base64OutputStream;

public class MultipartStream extends InputStream {
  public static final int RECORD_SEPARATOR = 0x1E;
  private InputStream input;
  private boolean inCompressed = false;
  private GZIPInputStream gzIn;

  public MultipartStream(InputStream in) {
    input = in;
  }

  public static void main(String[] args) throws IOException {
    ByteArrayOutputStream sink = new ByteArrayOutputStream();
    sink.write("This is some header text!\n".getBytes("utf8"));
    sink.write((byte) RECORD_SEPARATOR);
    OutputStream temp = new GZIPOutputStream(
      new Base64OutputStream(sink));
    byte[] toWrite = "And this is some compressed body " + 

      "data! \n".getBytes("utf8");
    for(int i = 0; i < 10; ++i) {
      temp.write(toWrite);
    }
    temp.close();
    sink.write(RECORD_SEPARATOR);
    sink.write("\nAnd this is some footer text\n".getBytes("utf8"));
    sink.close();
    System.out.println("Sink:\n" + new String(sink.toByteArray(), "utf8"));
    System.out.println("-----------------------------------------------");
    final MultipartStream inputStream = new MultipartStream(
      new ByteArrayInputStream(sink.toByteArray()));
    Reader in = new InputStreamReader(inputStream, "utf8");
    char[] chars = new char[4096];
    int numRead = 0;
    for(int read = in.read(chars); -1 != read; read = in.read(chars)) {
      numRead += read;
      System.out.print(new String(chars, 0, read));
    }
    System.out.println("Read " + numRead + " characters");
  }

  @Override
  public int read() throws IOException {
    int read = inCompressed ? gzIn.read() : input.read();
    if(inCompressed && read == -1) {
      inCompressed = false;
      read = input.read();
      gzIn.close();
      gzIn = null;
    } else if(read == RECORD_SEPARATOR) {
      inCompressed = true;
      gzIn = new GZIPInputStream(
        new Base64InputStream(
          new RecordSeparatedInputStream(input)));
      return read;
    }
    return read;
  }
 
  private static class RecordSeparatedInputStream extends InputStream {
    private InputStream in;
    public RecordSeparatedInputStream(InputStream in) {
      this.in = in;
    }

    @Override
    public int read() throws IOException {
      int retVal = in.read();
      if(retVal == RECORD_SEPARATOR) {
        retVal = -1;
      }
      return retVal;
    }
  }
}

Of course, in practice you'd probably want to implement FilterInputStream for all of these cases to make it clear to users of the code that it's meant to wrap and modify another InputStream. The reason that I didn't implement that here is that it also requires overriding the read methods taking a byte array to use the no-arg read method.

Custom input streams aren't always the best solution to a problem, since they do add a translation between source data and what the final processor actually sees, but they do provide another avenue for dealing with inconvenient data. Hopefully this has been informative enough to add another tool to your development toolbox, or at least given you an increased appreciation for the capabilities of input streams.

No comments: