Surfing the ReferencePipeline in Java 8

Java 8 includes new a Stream Processing API. At its core is the ReferencePipeline class which gives us a DSL for working with Streams in a functional style. You can get an instance of a ReferencePipeline flowing with a single expression.


IntStream.range(1, 50)
         .mapToObj(i -> new Thread(() -> System.out.println(i)))
         .forEach(thread -> thread.start());

The MapReduce DSL has the essential set of list processing operations such as querying, mapping, and iterating. The operations can be chained to provide the notion of linked inline strategies. When the stream of data is pulled through this pipeline, each data element passes through the operation chain.

Streams of data can be folded or reduced to a single value.

For example, here is how you can query a stream and accumulate the matches into a single value:


String value = Stream.of("foo", "bar", "baz", "quux")
		             .filter(s -> s.contains("a") || s.endsWith("x"))
		             .map(s -> s.toUpperCase())
		             .reduce((acc, s) -> acc + s);

Pipeline Flow For Code Sample
Pipeline Anatomy of Code Sample

These functions are monadic operations that enable us to query, transform, and project objects down the pipeline. The ReferencePipeline contains these functions. It is constructed to return copies of itself after each method invokation, giving us a chainable API. This API can be considered to be an architectural scaffolding for describing how to process Streams of data.

See here how a pipeline can take in lines of CSV and emit structured row objects in the form of Arrays:


//start with a stream

String csv = "a,b,c\n"
		   + "d,e,f\n"
		   + "g,h,i\n";

//process the stream

Stream<String[]> rows = new BufferedReader(
		new InputStreamReader(new ByteArrayInputStream(
				csv.getBytes("UTF-8"))))
		.lines()
		.skip(1)
		.map(line -> line.split(","));

//print it out

rows.forEach(row -> System.out.println(row[0]
									 + row[1]
									 + row[2]));

Notice that the stream processing routine is designed as a single expression. The .lines() method initiates the pipeline. Then we skip the headers (they are not data), and project out an array of fields with the .map(). This is nice. We’re able to use a higher-order thought process when describing the algorithm to the JVM. Instead of diving into procedural control-flow, we simply tell the system what we want it to do by using a step-wise functional prescription. This style of programming leads to more readable and comprehensible code, and as you would expect, behind the scenes, the compiler converts the lambda syntax into Java classes, aka it gets “de-sugared” into Predicates (filters), Comparitors (sorters), BiFunctions (mappers), and Functions (accumulators). The lambda expressions make it so we do not to have to get our hands dirty with the details of Java’s functional programming object model.

Consider the following example.

I want to download historical stock pricing data from Yahoo and turn it into PricingRecord objects in my system.

Yahoo Finance API

It’s data can be acquired with a simple HTTP Get call:


http://real-chart.finance.yahoo.com/table.csv?s=AMD&d=6&e=18&f=2014&g=d&a=7&b=9&c=1996&ignore=.csv

First note the shape of the CSV that Yahoo’s API returns:

Date Open High Low Close Volume Adj Close*
2008-12-29 15.98 16.16 15.98 16.08 158600 16.08

Our CSV looks like this:

2008-12-29,15.98,16.16,15.98,16.08,158600,16.08

Let’s compose a simple recipe that uses the Stream API to pull this data from the web and turn it into objects that we can work with in our system.

Our program should do these things:

  1. Take in a set of stock symbols in the form of a Stream strings.
  2. Transform the Stream of symbols into a new Stream containing Streams of PricingRecords.
    • This will be done by Making a remote call to Yahoo’s API.
    • The CSV returned should be mapped directly into PricingRecords objects.
  3. Since we’re pulling the data for multiple symbols, we should do the API calls for each concurrently. We can achieve this by parallelizing the flow of stream elements through the operation chain.

Here is the solution implemented as single composite expression. Note how we aquire a Stream<String>, process it, and emit a Map<String,List<PricingRecord>>.

Using a ReferencePipeline as a builder:


//start with a stream

Stream<String> stockStream = Stream.of("AMD", "APL", "IBM", "GOOG");

//generate data with a stream processing algorithm

Map<String, List<PricingRecord>> pricingRecords = stockStream
		.parallel()
		.map(symbol -> {

			try {
				String csv = new JdkRequest(String.format("http://real-chart.finance.yahoo.com/table.csv?s=%s&d=6&e=18&f=2014&g=d&a=7&b=9&c=1996&ignore=.csv", symbol))
							 .header("Accept", "text/xml")
						     .fetch()
						     .body();

				return new BufferedReader(new InputStreamReader(new ByteArrayInputStream(csv.getBytes("UTF-8"))))
					   .lines()
					   .skip(1)
					   .map(line -> line.split(","))
					   .map(arr -> new PricingRecord(symbol, arr[0],arr[1], arr[2], arr[3], arr[4], arr[5], arr[6]));

			} catch (Exception e) {
				e.printStackTrace();
			}

			return symbol;
		})
		.flatMap(records -> (Stream<PricingRecord>) records)
		.collect(Collectors.groupingBy(PricingRecord::getSymbol));


//print it out..

pricingRecords.forEach((symbol, records) -> System.out.println(String
		.format("Symbol: %s\n%s",
				symbol,
				records.stream()
					   .parallel()
					   .map(record -> record.toString())
					   .reduce((x, y) -> x + y))));

The elegance of this solution may not at first be apparent, but note a few key characteristics that emerge from a closer look. Notice that we get concurrency for free with the .parallel(). We do this near the beginning of the pipeline, in order to feed the downstream .map() function in a multithreaded manner.

Notice also that we’re projecting a 2-dimensional Stream out of the .map() function. The top-level stream contains a substream of Stream objects. The composite type it returns is actually a Stream<Stream<PricingRecord>>. This is a common scenario in stream-based programming and the solution to unpack and harvest the substream is to use the .flatMap() function. It provides the affordance we need for working with structure in a 2-dimensional stream. Note that the ReferencePipeline also provides us with a .substream(n) function for working with n-dimensional streams. In my example, we use .flatMap() to unpack and return a cast over the elements to coerce them into PricingRecord objects.

Finally, look at the last expression in the chain the .collect(). To collect the stream is to terminate it, which means to enumerate its full contents. Basically this means to load them into memory, however there are many ways in which you might want to do this. For this we have what are called Collectors; they allow us to describe how we want the contents of the stream organized when they are pulled out.

Usage Tip:

If you want a flat list use:
Collectors.toList // ArrayList

If you want a map or dictionary-like structure, use:
Collectors.groupingBy // Map

The .groupingBy() function that I use above allows us to aggregate our stream into groups based on a common characteristic. The Maps that .groupingBy() projects are very useful because you can represent both the input to the function and its output as a “pair”, e.g. Map.SimpleEntry (key value pair).

For completeness I should provide the PricingRecord class:

public class PricingRecord {
	private String symbol;
	private String date;
	private double open;
	private double high;
	private double low;
	private double close;
	private int volume;
	private double adjustedClose;

	public PricingRecord (String symbol, String date, String open, String high, String low, String close, String volume, String adjustedClose){
		this.setSymbol(symbol);
		this.date = date;
		this.open = Double.parseDouble(open);
		this.high = Double.parseDouble(high);
		this.low = Double.parseDouble(low);
		this.close = Double.parseDouble(close);
		this.volume = Integer.parseInt(volume);
		this.adjustedClose = Double.parseDouble(adjustedClose);
	}

	public String toString(){
		return String.format("symbol=%s,date=%s,open=%.2f,close=%.2f,high=%.2f,low=%.2f,volume=%s,adjustedClose=%.2f", this.symbol, this.date, this.open, this.close, this.high, this.low, this.volume, this.adjustedClose);

	}

	public String getDate() {
		return date;
	}
	public void setDate(String date) {
		this.date = date;
	}
	public double getOpen() {
		return open;
	}
	public void setOpen(double open) {
		this.open = open;
	}
	public double getHigh() {
		return high;
	}
	public void setHigh(double high) {
		this.high = high;
	}
	public double getLow() {
		return low;
	}
	public void setLow(double low) {
		this.low = low;
	}
	public double getClose() {
		return close;
	}
	public void setClose(double close) {
		this.close = close;
	}
	public int getVolume() {
		return volume;
	}
	public void setVolume(int volume) {
		this.volume = volume;
	}
	public double getAdjustedClose() {
		return adjustedClose;
	}
	public void setAdjustedClose(double adjustedClose) {
		this.adjustedClose = adjustedClose;
	}
	public String getSymbol() {
		return symbol;
	}
	public void setSymbol(String symbol) {
		this.symbol = symbol;
	}

}

This is a simple entity class. It just serves to represent our logical notion of a PricingRecord, however I want you to notice the .toString(). It simply prints out the object, but notice in the last expression of the code example how we’re able to print a concatenation of these objects out to the console as a String. The .reduce() function allows us to accumulate the result of each symbol’s data and print it out in a logically separated and intelligible way. Reducers are what I like to think of as “distillers” of information in the streams we process. In this way, they can be made to aggregate and refine information from the stream as it passes through the pipeline.

Finally, in order to run the code example as is, you’ll need to pull in the jcabi-http library to get the nice fluent web api that I’m using. Add this to your pom.xml and resolve the imports.

<dependency>
	<groupId>com.jcabi</groupId>
	<artifactId>jcabi-http</artifactId>
	<version>1.8</version>
</dependency>

The introduction of this functional programming model into Java, is a leap forward for the platform. It signals a shift in not only the way we write the code in the language, but also how we think about solving problems with it. So called higher-order problem solving requires higher-order tools. This technology is compelling because it give us these high-level tools and a fun syntax that allows us to focus on problem to be solved, not on the cruft it takes to solve it. This makes life better for everyone..